In [0]:
#Importing all the libraries
import pyspark.sql.functions as F
from pyspark.sql.functions import col, lit
from pyspark.sql.functions import col, unix_timestamp, round
from pyspark.sql.window import Window

In [0]:
# Loading the dataset for telemetry
telemetry = spark.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/telemetry.csv')

The error dataset consists of non-breaking errors count when machine is still.
we can use this dataset for predictive of a future failure event.
we have to combine the timestamps of telemetry and error dataset collected in hours.

In [0]:
# Loading the dataset for error
error = spark.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/errors.csv')

display(error)

datetime,machineID,errorID
2015-01-06 03:00:00,1,error3
2015-02-03 06:00:00,1,error4
2015-02-21 11:00:00,1,error1
2015-02-21 16:00:00,1,error2
2015-03-20 06:00:00,1,error1
2015-04-04 06:00:00,1,error5
2015-05-04 06:00:00,1,error4
2015-05-19 06:00:00,1,error2
2015-05-19 06:00:00,1,error3
2015-06-03 06:00:00,1,error5


In [0]:
# handle missing values
#spark automatically detects if a column is string or numeric
error.na.fill('unknown').show()

#fill integer value column
error.na.fill(0).show()

In [0]:
# converting the telemetry datatime column from string to timestamp
error = error.withColumn('datetime', col('datetime').cast('timestamp'))
error.show()

In [0]:
print(error.count())
display(error.summary())

summary,machineID,errorID
count,11967.0,11967
mean,498.324475641347,
stddev,292.2513121245124,
min,1.0,error1
25%,243.0,
50%,497.0,
75%,755.0,
max,1000.0,error5


In [0]:
# displaying the errorID count
error.select("errorID").display()

errorID
error3
error4
error1
error2
error1
error5
error4
error2
error3
error5


Feature Engineering for Error Dataset

Error dataset consists of timestamps, but the errorID feature is categorical and not numeric value.
we need to choose the lag window here of 24hours.
So we can't do average over the time intervals(rolling mean or SD).
We can count the number of errors of each errorID with in the lag window.

In [0]:
# create a column for each errorID and filling out the missing values of category with 
# dummy and numerical values with 0. 

errorID_col = (error.groupBy("machineID","datetime","errorID").pivot('errorID')
             .agg(F.count('machineID').alias('dummycol')).drop('errorID').fillna(0)
             .groupBy("machineID","datetime")
             .agg(F.sum('error1').alias('error1sum'), 
                  F.sum('error2').alias('error2sum'), 
                  F.sum('error3').alias('error3sum'), 
                  F.sum('error4').alias('error4sum'), 
                  F.sum('error5').alias('error5sum')))

display(errorID_col)

machineID,datetime,error1sum,error2sum,error3sum,error4sum,error5sum
668,2015-04-26T06:00:00.000+0000,1,1,1,0,0
441,2015-06-07T06:00:00.000+0000,1,1,1,0,0
107,2015-10-04T06:00:00.000+0000,0,1,1,0,0
532,2015-05-01T06:00:00.000+0000,1,1,1,0,0
975,2015-07-22T06:00:00.000+0000,0,1,1,0,0
813,2015-04-03T07:00:00.000+0000,0,1,0,0,0
236,2015-04-10T06:00:00.000+0000,0,1,1,0,0
689,2015-12-08T18:00:00.000+0000,0,0,1,0,0
222,2015-02-13T06:00:00.000+0000,0,1,1,0,0
657,2015-07-08T06:00:00.000+0000,0,1,1,0,0


In [0]:
# joining the telemetry data with column of errorID created
error_count = (telemetry.join(errorID_col, 
                              ((telemetry['machineID'] == errorID_col['machineID']) 
                               & (telemetry['datetime'] == errorID_col['datetime'])), "left")
               .drop('volt', 'rotate', 'pressure', 'vibration')
               .drop(errorID_col.machineID).drop(errorID_col.datetime)
               .fillna(0))

display(error_count)

datetime,machineID,error1sum,error2sum,error3sum,error4sum,error5sum
2015-01-01 06:00:00,1,0,0,0,0,0
2015-01-01 07:00:00,1,0,0,0,0,0
2015-01-01 08:00:00,1,0,0,0,0,0
2015-01-01 09:00:00,1,0,0,0,0,0
2015-01-01 10:00:00,1,0,0,0,0,0
2015-01-01 11:00:00,1,0,0,0,0,0
2015-01-01 12:00:00,1,0,0,0,0,0
2015-01-01 13:00:00,1,0,0,0,0,0
2015-01-01 14:00:00,1,0,0,0,0,0
2015-01-01 15:00:00,1,0,0,0,0,0


In [0]:
int_errorfeat = ['error1sum','error2sum', 'error3sum', 'error4sum', 'error5sum']
wSpec = Window.partitionBy('machineID').orderBy('datetime').rowsBetween(1-24, 0)

In [0]:
for col_name in int_errorfeat:
    # We're only interested in the erros in the previous 24 hours.
    error_count = error_count.withColumn(col_name+'_rollingmean_24', 
                                         F.avg(col(col_name)).over(wSpec))

display(error_count)

datetime,machineID,error1sum,error2sum,error3sum,error4sum,error5sum,error1sum_rollingmean_24,error2sum_rollingmean_24,error3sum_rollingmean_24,error4sum_rollingmean_24,error5sum_rollingmean_24
2015-01-01 06:00:00,31,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0
2015-01-01 07:00:00,31,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0
2015-01-01 08:00:00,31,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0
2015-01-01 09:00:00,31,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0
2015-01-01 10:00:00,31,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0
2015-01-01 11:00:00,31,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0
2015-01-01 12:00:00,31,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0
2015-01-01 13:00:00,31,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0
2015-01-01 14:00:00,31,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0
2015-01-01 15:00:00,31,0,0,0,0,0,0.0,0.0,0.0,0.0,0.0


In [0]:
`error_feat = (error_count.withColumn("df_time", df_time)
              .drop('error1sum', 'error2sum', 'error3sum', 'error4sum', 'error5sum').fillna(0)
              .groupBy("machineID","df_time")
              .agg(F.mean('error1sum_rollingmean_24').alias('error1sum_rollingmean_24'), 
                   F.mean('error2sum_rollingmean_24').alias('error2sum_rollingmean_24'), 
                   F.mean('error3sum_rollingmean_24').alias('error3sum_rollingmean_24'), 
                   F.mean('error4sum_rollingmean_24').alias('error4sum_rollingmean_24'), 
                   F.mean('error5sum_rollingmean_24').alias('error5sum_rollingmean_24')))

display(error_feat)

machineID,df_time,error1sum_rollingmean_24,error2sum_rollingmean_24,error3sum_rollingmean_24,error4sum_rollingmean_24,error5sum_rollingmean_24
53,2015-02-02T00:00:00.000+0000,0.0,0.0,0.0,0.0,0.0
108,2015-01-26T00:00:00.000+0000,0.0,0.0,0.0,0.0,0.0
148,2015-01-31T12:00:00.000+0000,0.0,0.0,0.0,0.0,0.0
155,2015-01-17T12:00:00.000+0000,0.0,0.0,0.0,0.0,0.0
243,2015-01-13T00:00:00.000+0000,0.0,0.0416666666666666,0.0416666666666666,0.0,0.0
251,2015-01-09T00:00:00.000+0000,0.0,0.0,0.0,0.0,0.0
255,2015-01-02T12:00:00.000+0000,0.0,0.0,0.0,0.0,0.0
471,2015-01-03T00:00:00.000+0000,0.0,0.0,0.0,0.0,0.0
481,2015-01-10T00:00:00.000+0000,0.0,0.0,0.0,0.0,0.0
540,2015-01-13T12:00:00.000+0000,0.0,0.0,0.0,0.0,0.0


In [0]:
print(error_feat.count())
display(error_feat.summary())

summary,machineID,error1sum_rollingmean_24,error2sum_rollingmean_24,error3sum_rollingmean_24,error4sum_rollingmean_24,error5sum_rollingmean_24
count,731000.0,731000.0,731000.0,731000.0,731000.0,731000.0
mean,500.5,0.0003137479265372088,0.00036848610512379233,0.0003512854935564592,0.0001821574612048376,0.000154118435517043
stddev,288.67518770952563,0.0035779161554425,0.0038678356733719,0.003786323105419,0.0026962163455832,0.0025381246797544
min,1.0,0.0,0.0,0.0,0.0,0.0
25%,250.0,0.0,0.0,0.0,0.0,0.0
50%,500.0,0.0,0.0,0.0,0.0,0.0
75%,750.0,0.0,0.0,0.0,0.0,0.0
max,1000.0,0.2586008898508898,0.2586008898508898,0.2586008898508898,0.2586008898508898,0.2586008898508898
