In [2]:
import pyspark 
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
import elasticsearch as es

<h3>data from mongo 

In [3]:
from pyspark.sql import SparkSession
my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/spark_mongo.data") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/spark_mongo.data") \
    .config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.12:3.0.0')\
    .getOrCreate()

<h3>data from csv in local

In [None]:
df = spark.read.csv('datasets/spark_datasets/train_data.csv', header = True, inferSchema=True)

In [5]:
df = my_spark.read.format('com.mongodb.spark.sql.DefaultSource').load()

In [6]:
df.printSchema()

root
 |-- RUL: double (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- engine_no: double (nullable = true)
 |-- op_setting_1: double (nullable = true)
 |-- op_setting_2: double (nullable = true)
 |-- op_setting_3: double (nullable = true)
 |-- sensor_1: double (nullable = true)
 |-- sensor_10: double (nullable = true)
 |-- sensor_11: double (nullable = true)
 |-- sensor_12: double (nullable = true)
 |-- sensor_13: double (nullable = true)
 |-- sensor_14: double (nullable = true)
 |-- sensor_15: double (nullable = true)
 |-- sensor_16: double (nullable = true)
 |-- sensor_17: double (nullable = true)
 |-- sensor_18: double (nullable = true)
 |-- sensor_19: double (nullable = true)
 |-- sensor_2: double (nullable = true)
 |-- sensor_20: double (nullable = true)
 |-- sensor_21: double (nullable = true)
 |-- sensor_22: double (nullable = true)
 |-- sensor_23: double (nullable = true)
 |-- sensor_24: double (nullable = true)
 |-- sensor_25: d

In [4]:
df.show()

+-----+--------------------+---------+------------+------------+------------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------+--------------+
|  RUL|                 _id|engine_no|op_setting_1|op_setting_2|op_setting_3|sensor_1|sensor_10|sensor_11|sensor_12|sensor_13|sensor_14|sensor_15|sensor_16|sensor_17|sensor_18|sensor_19|sensor_2|sensor_20|sensor_21|sensor_22|sensor_23|sensor_24|sensor_25|sensor_26|sensor_27|sensor_3|sensor_4|sensor_5|sensor_6|sensor_7|sensor_8|sensor_9|time_in_cycles|
+-----+--------------------+---------+------------+------------+------------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+--------

In [22]:
df.select('RUL','engine_no').show()

+-----+---------+
|  RUL|engine_no|
+-----+---------+
|339.0|      0.0|
|338.0|      0.0|
|337.0|      0.0|
|336.0|      0.0|
|335.0|      0.0|
|334.0|      0.0|
|333.0|      0.0|
|332.0|      0.0|
|331.0|      0.0|
|330.0|      0.0|
|329.0|      0.0|
|328.0|      0.0|
|327.0|      0.0|
|326.0|      0.0|
|325.0|      0.0|
|324.0|      0.0|
|323.0|      0.0|
|322.0|      0.0|
|321.0|      0.0|
|320.0|      0.0|
+-----+---------+
only showing top 20 rows



In [None]:
!conda install python=3.10

In [8]:
!python --version

Python 3.9.12


In [36]:
spark.version

'3.3.0'

In [15]:
num_rows = df.count()
num_rows

160359

In [6]:
print(len(df.columns))

34


In [5]:
print(df.describe( ).show())

+-------+------------------+------------------+------------------+-------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+--------------------+------------------+------------------+-----------------+----------------+------------------+------------------+---------+---------+---------+---------+---------+---------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|summary|               RUL|         engine_no|      op_setting_1|       op_setting_2|      op_setting_3|          sensor_1|          sensor_10|         sensor_11|         sensor_12|         sensor_13|        sensor_14|         sensor_15|           sensor_16|         sensor_17|         sensor_18|        sensor_19|        sensor_2|         sensor_20|         sensor_21|sensor_22|sensor_23|sensor_24|sensor_25|sensor_26

In [7]:
numeric_columns = list()
categorical_column = list()
for col_ in df.columns:
    if df.select(col_).dtypes[0][1] != "string":
        numeric_columns.append(col_)
    else:
        categorical_column.append(col_)
        
print("Numeric columns",numeric_columns)
print("*********************************************************************")
print("*********************************************************************")
print("categorical columns",categorical_column)

Numeric columns ['RUL', '_id', 'engine_no', 'op_setting_1', 'op_setting_2', 'op_setting_3', 'sensor_1', 'sensor_10', 'sensor_11', 'sensor_12', 'sensor_13', 'sensor_14', 'sensor_15', 'sensor_16', 'sensor_17', 'sensor_18', 'sensor_19', 'sensor_2', 'sensor_20', 'sensor_21', 'sensor_22', 'sensor_23', 'sensor_24', 'sensor_25', 'sensor_26', 'sensor_27', 'sensor_3', 'sensor_4', 'sensor_5', 'sensor_6', 'sensor_7', 'sensor_8', 'sensor_9', 'time_in_cycles']
*********************************************************************
*********************************************************************
categorical columns []


Drop null value

In [43]:
df['sensor_5'].isNull()

Column<'(sensor_5 IS NULL)'>

In [18]:
print(df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns  ]).show())

+---+---+---------+------------+------------+------------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------+--------------+
|RUL|_id|engine_no|op_setting_1|op_setting_2|op_setting_3|sensor_1|sensor_10|sensor_11|sensor_12|sensor_13|sensor_14|sensor_15|sensor_16|sensor_17|sensor_18|sensor_19|sensor_2|sensor_20|sensor_21|sensor_22|sensor_23|sensor_24|sensor_25|sensor_26|sensor_27|sensor_3|sensor_4|sensor_5|sensor_6|sensor_7|sensor_8|sensor_9|time_in_cycles|
+---+---+---------+------------+------------+------------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------+-----------

In [19]:
df = df.dropna()
df.show()

+-----+--------------------+---------+------------+------------+------------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------+--------------+
|  RUL|                 _id|engine_no|op_setting_1|op_setting_2|op_setting_3|sensor_1|sensor_10|sensor_11|sensor_12|sensor_13|sensor_14|sensor_15|sensor_16|sensor_17|sensor_18|sensor_19|sensor_2|sensor_20|sensor_21|sensor_22|sensor_23|sensor_24|sensor_25|sensor_26|sensor_27|sensor_3|sensor_4|sensor_5|sensor_6|sensor_7|sensor_8|sensor_9|time_in_cycles|
+-----+--------------------+---------+------------+------------+------------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+--------

In [23]:
df.columns

['RUL',
 '_id',
 'engine_no',
 'op_setting_1',
 'op_setting_2',
 'op_setting_3',
 'sensor_1',
 'sensor_10',
 'sensor_11',
 'sensor_12',
 'sensor_13',
 'sensor_14',
 'sensor_15',
 'sensor_16',
 'sensor_17',
 'sensor_18',
 'sensor_19',
 'sensor_2',
 'sensor_20',
 'sensor_21',
 'sensor_22',
 'sensor_23',
 'sensor_24',
 'sensor_25',
 'sensor_26',
 'sensor_27',
 'sensor_3',
 'sensor_4',
 'sensor_5',
 'sensor_6',
 'sensor_7',
 'sensor_8',
 'sensor_9',
 'time_in_cycles']

In [25]:
df.select('RUL','engine_no','time_in_cycles',).show(40)

+-----+---------+--------------+
|  RUL|engine_no|time_in_cycles|
+-----+---------+--------------+
|339.0|      0.0|           1.0|
|338.0|      0.0|           2.0|
|337.0|      0.0|           3.0|
|336.0|      0.0|           4.0|
|335.0|      0.0|           5.0|
|334.0|      0.0|           6.0|
|333.0|      0.0|           7.0|
|332.0|      0.0|           8.0|
|331.0|      0.0|           9.0|
|330.0|      0.0|          10.0|
|329.0|      0.0|          11.0|
|328.0|      0.0|          12.0|
|327.0|      0.0|          13.0|
|326.0|      0.0|          14.0|
|325.0|      0.0|          15.0|
|324.0|      0.0|          16.0|
|323.0|      0.0|          17.0|
|322.0|      0.0|          18.0|
|321.0|      0.0|          19.0|
|320.0|      0.0|          20.0|
|319.0|      0.0|          21.0|
|318.0|      0.0|          22.0|
|317.0|      0.0|          23.0|
|316.0|      0.0|          24.0|
|315.0|      0.0|          25.0|
|314.0|      0.0|          26.0|
|313.0|      0.0|          27.0|
|312.0|   

In [61]:
# df.toPandas().engine_no.value_counts()
df.select("engine_no","time_in_cycles").distinct().show(100)
# df.select('engine_no').distinct()

+---------+--------------+
|engine_no|time_in_cycles|
+---------+--------------+
|      0.0|         114.0|
|      1.0|         101.0|
|      1.0|         199.0|
|      1.0|         239.0|
|      1.0|         245.0|
|      2.0|          92.0|
|      2.0|         101.0|
|      2.0|         108.0|
|      3.0|          64.0|
|      3.0|         250.0|
|      5.0|          54.0|
|      6.0|          70.0|
|      7.0|          32.0|
|      7.0|          71.0|
|      8.0|           3.0|
|      8.0|          77.0|
|      8.0|         134.0|
|     10.0|         170.0|
|     11.0|          48.0|
|     12.0|          16.0|
|     12.0|         167.0|
|     13.0|         200.0|
|     14.0|          40.0|
|     14.0|         291.0|
|     19.0|         101.0|
|     20.0|          62.0|
|     22.0|         296.0|
|     23.0|          16.0|
|     24.0|          33.0|
|     24.0|          35.0|
|     24.0|         188.0|
|     24.0|         204.0|
|     25.0|          79.0|
|     28.0|         137.0|
|

In [63]:
df.select('engine_no').distinct().collect()

[Row(engine_no=0.0),
 Row(engine_no=299.0),
 Row(engine_no=305.0),
 Row(engine_no=147.0),
 Row(engine_no=170.0),
 Row(engine_no=184.0),
 Row(engine_no=160.0),
 Row(engine_no=169.0),
 Row(engine_no=8.0),
 Row(engine_no=67.0),
 Row(engine_no=70.0),
 Row(engine_no=311.0),
 Row(engine_no=379.0),
 Row(engine_no=168.0),
 Row(engine_no=69.0),
 Row(engine_no=206.0),
 Row(engine_no=389.0),
 Row(engine_no=390.0),
 Row(engine_no=7.0),
 Row(engine_no=249.0),
 Row(engine_no=365.0),
 Row(engine_no=401.0),
 Row(engine_no=142.0),
 Row(engine_no=191.0),
 Row(engine_no=329.0),
 Row(engine_no=112.0),
 Row(engine_no=154.0),
 Row(engine_no=451.0),
 Row(engine_no=232.0),
 Row(engine_no=124.0),
 Row(engine_no=438.0),
 Row(engine_no=348.0),
 Row(engine_no=303.0),
 Row(engine_no=456.0),
 Row(engine_no=410.0),
 Row(engine_no=253.0),
 Row(engine_no=317.0),
 Row(engine_no=331.0),
 Row(engine_no=128.0),
 Row(engine_no=201.0),
 Row(engine_no=235.0),
 Row(engine_no=353.0),
 Row(engine_no=108.0),
 Row(engine_no=180.0

In [69]:
df = df.dropna()
df.show()


+-----+--------------------+---------+------------+------------+------------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------+--------------+
|  RUL|                 _id|engine_no|op_setting_1|op_setting_2|op_setting_3|sensor_1|sensor_10|sensor_11|sensor_12|sensor_13|sensor_14|sensor_15|sensor_16|sensor_17|sensor_18|sensor_19|sensor_2|sensor_20|sensor_21|sensor_22|sensor_23|sensor_24|sensor_25|sensor_26|sensor_27|sensor_3|sensor_4|sensor_5|sensor_6|sensor_7|sensor_8|sensor_9|time_in_cycles|
+-----+--------------------+---------+------------+------------+------------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+--------

In [137]:
df.limit(5).toPandas()
df.head(5)

[Row(RUL=339.0, _id=Row(oid='62bf37f08ac41af439d4bf0c'), engine_no=0.0, op_setting_1=25.0074, op_setting_2=0.62, op_setting_3=60.0, sensor_1=462.54, sensor_10=0.94, sensor_11=36.7, sensor_12=164.75, sensor_13=2028.38, sensor_14=7875.47, sensor_15=10.885, sensor_16=0.02, sensor_17=306.0, sensor_18=1915.0, sensor_19=84.93, sensor_2=536.84, sensor_20=14.35, sensor_21=8.4712, sensor_22=0.0, sensor_23=0.0, sensor_24=0.0, sensor_25=0.0, sensor_26=0.0, sensor_27=0.0, sensor_3=1256.52, sensor_4=1043.97, sensor_5=7.05, sensor_6=9.02, sensor_7=175.29, sensor_8=1915.47, sensor_9=8019.94, time_in_cycles=1.0),
 Row(RUL=338.0, _id=Row(oid='62bf37f08ac41af439d4bf0d'), engine_no=0.0, op_setting_1=35.0072, op_setting_2=0.8413, op_setting_3=100.0, sensor_1=449.44, sensor_10=1.03, sensor_11=41.78, sensor_12=183.67, sensor_13=2388.2, sensor_14=8073.12, sensor_15=9.2527, sensor_16=0.02, sensor_17=334.0, sensor_18=2223.0, sensor_19=100.0, sensor_2=555.44, sensor_20=14.88, sensor_21=8.9928, sensor_22=0.0, se

In [80]:
engine_mean = df.select(mean("RUL")).collect()
print(engine_mean)

[Row(avg(RUL)=122.33133781078705)]


In [29]:
cols_to_drop = ['_id','sensor_22', 'sensor_23','sensor_24','sensor_25','sensor_26','sensor_27']
df = df.drop(*cols_to_drop)

In [30]:
print(len(df.columns))

27


<h1>2 Machine learning in pyspark ¶

In [31]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

In [62]:
# Before modelling in Pyspark, we need to put all features to Vector using Pyspark VectorAssembler
feature = VectorAssembler(inputCols = df.columns,outputCol="features")
feature_vector= feature.transform(df)
feature_vector.limit(3).toPandas()

Unnamed: 0,RUL,engine_no,op_setting_1,op_setting_2,op_setting_3,sensor_1,sensor_10,sensor_11,sensor_12,sensor_13,...,sensor_21,sensor_3,sensor_4,sensor_5,sensor_6,sensor_7,sensor_8,sensor_9,time_in_cycles,features
0,339.0,0.0,25.0074,0.62,60.0,462.54,0.94,36.7,164.75,2028.38,...,8.4712,1256.52,1043.97,7.05,9.02,175.29,1915.47,8019.94,1.0,"[339.0, 0.0, 25.0074, 0.62, 60.0, 462.54, 0.94..."
1,338.0,0.0,35.0072,0.8413,100.0,449.44,1.03,41.78,183.67,2388.2,...,8.9928,1364.42,1128.75,5.48,8.0,194.71,2223.06,8361.86,2.0,"[338.0, 0.0, 35.0072, 0.8413, 100.0, 449.44, 1..."
2,337.0,0.0,25.0053,0.6215,60.0,462.54,0.94,36.49,164.49,2028.4,...,8.5107,1265.94,1047.23,7.05,9.03,175.29,1915.49,8021.37,3.0,"[337.0, 0.0, 25.0053, 0.6215, 60.0, 462.54, 0...."


In [63]:
our_df = feature_vector.select(['features','RUL'])
(train_df, test_df) = feature_vector.randomSplit([0.8, 0.2],seed = 11)

In [49]:
from pyspark.ml.regression import LinearRegression
module = LinearRegression(featuresCol = 'features', labelCol='RUL', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = module.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.9937643877351743,0.0,0.0,0.0,-0.0,0.006626853244016918,1.2311683897684107,-0.19344912477307422,-0.0,-0.0,0.0,-3.304426243712705,0.0,-0.012145491785298675,0.004079059129395872,-0.0,0.020980482813239894,-0.0,-0.0,-0.0030973898551242465,-0.004266613133762181,-0.0,-0.0,-0.0,0.0038692754129598175,-0.0004570907151796616,0.0]
Intercept: 22.26276304016813


In [50]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 2.681714
r2: 0.998968


In [51]:
train_df.describe().show()

+-------+-----------------+-----------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+--------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+
|summary|              RUL|        engine_no|      op_setting_1|       op_setting_2|      op_setting_3|          sensor_1|         sensor_10|        sensor_11|         sensor_12|         sensor_13|        sensor_14|         sensor_15|           sensor_16|         sensor_17|         sensor_18|         sensor_19|         sensor_2|         sensor_20|         sensor_21|          sensor_3|          sensor_4|         sensor_5|          sensor_6|         sensor_7|          sensor_8|  

In [64]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","RUL","features").show(15)

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="RUL",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))


+-------------------+---+--------------------+
|         prediction|RUL|            features|
+-------------------+---+--------------------+
| 2.1936748671611532|0.0|[0.0,5.0,-5.0E-4,...|
| 1.5509830906114352|0.0|[0.0,6.0,10.0028,...|
|-0.8795844897773293|0.0|[0.0,12.0,42.0066...|
| 3.2614905473697497|0.0|[0.0,13.0,8.0E-4,...|
|  1.518911127509213|0.0|[0.0,14.0,10.0028...|
| 3.3360796706030733|0.0|[0.0,16.0,-0.0015...|
|-0.9252182170956083|0.0|[0.0,17.0,42.0052...|
| 2.1273257896801603|0.0|[0.0,21.0,-2.0E-4...|
|-0.8410334643542825|0.0|[0.0,27.0,41.9988...|
| -6.623122237440484|0.0|[0.0,29.0,25.004,...|
|-0.8814374974127475|0.0|[0.0,32.0,41.9988...|
|  1.540912241761422|0.0|[0.0,37.0,10.0056...|
|-0.9384418459336459|0.0|[0.0,39.0,42.0048...|
|-0.7497871736758412|0.0|[0.0,47.0,42.0072...|
|  2.591650077485408|1.0|[1.0,2.0,10.0047,...|
+-------------------+---+--------------------+
only showing top 15 rows

R Squared (R2) on test data = 0.998955
