In [2]:
from pyspark import SparkContext,keyword_only
from pyspark.sql import SparkSession,DataFrame
from pyspark.conf import SparkConf
from pyspark.sql.types import IntegerType, FloatType, ArrayType
import pyspark.sql.functions as F
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.pandas import date_range
from pyspark.ml.feature import VectorAssembler, Vector, DenseVector
from pyspark.sql import Row
from sklearn import neighbors
import random 
from functools import reduce
import pandas as pd 
import numpy as np
import os




In [3]:
conf = SparkConf()
conf.setMaster("local").setAppName("My app")

<pyspark.conf.SparkConf at 0x28212f3a0>

In [4]:
conf.get("spark.master")

'local'

In [5]:
conf.get("spark.app.name")

'My app'

In [6]:
conf.set("spark.driver.bindAddress", "127.0.0.1")

<pyspark.conf.SparkConf at 0x28212f3a0>

In [7]:
conf.get("spark.driver.bindAddress")

'127.0.0.1'

In [8]:
sc = SparkContext(conf = conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/16 19:43:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/16 19:43:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [9]:
os.listdir("./Azure")

['PdM_maint.csv',
 'PdM_errors.csv',
 'PdM_machines.csv',
 'PdM_failures.csv',
 '.ipynb_checkpoints',
 'PdM_telemetry.csv']

In [10]:
ses = SparkSession(sparkContext= sc)

In [11]:
maint = ses.read.option("header",True).csv("./Azure/Pdm_maint.csv")
errors = ses.read.option("header",True).csv("./Azure/PdM_errors.csv")
machines = ses.read.option("header",True).csv("./Azure/PdM_machines.csv")
telemetry = ses.read.option("header",True).csv("./Azure/PdM_telemetry.csv")
failures = ses.read.option("header",True).csv("./Azure/PdM_failures.csv")

                                                                                

In [12]:
""" check datatypes of each dataframe 
"""
print("maint datatypes")
print(maint.dtypes)
print("\n")
print("errors datatypes")
print(errors.dtypes)
print("\n")
print("machines datatypes")
print(machines.dtypes)
print("\n")
print("telemetry datatypes")
print(telemetry.dtypes)
print("\n")
print("failures datatypes")
print(failures.dtypes)

maint datatypes
[('datetime', 'string'), ('machineID', 'string'), ('comp', 'string')]


errors datatypes
[('datetime', 'string'), ('machineID', 'string'), ('errorID', 'string')]


machines datatypes
[('machineID', 'string'), ('model', 'string'), ('age', 'string')]


telemetry datatypes
[('datetime', 'string'), ('machineID', 'string'), ('volt', 'string'), ('rotate', 'string'), ('pressure', 'string'), ('vibration', 'string')]


failures datatypes
[('datetime', 'string'), ('machineID', 'string'), ('failure', 'string')]


In [13]:
"""
cast data to the right datatype,
ErrorID, MachineID, comp, failures is not needed to be cast, they will be encode later
datetime will be cast as timestamp

age, volt, rotate, pressure, vibration will be cast as float
"""
def String2Date(df):
    return df.withColumn("datetime",to_timestamp("datetime"))

def String2float(df):
    list_column = ["volt","rotate","pressure","vibration","age"]
    columns = []
    for i in df.columns:
        if i in list_column:
            columns.append(i)
    if len(columns) == 0:
        return df
    for col_name in columns:
        df = df.withColumn(col_name, col(col_name).cast('double'))
    return df

def CastDF(df):
    if "datetime" in df.columns:
        df = df.transform(String2Date)
    df = df.transform(String2float)
    return df 

In [14]:
maint = maint.transform(CastDF)
errors = errors.transform(CastDF)
machines = machines.transform(CastDF)
telemetry = telemetry.transform(CastDF)
failures = failures.transform(CastDF)


""" 
check datatypes of each dataframe after transform
"""
print("maint datatypes")
print(maint.dtypes)
print("\n")
print("errors datatypes")
print(errors.dtypes)
print("\n")
print("machines datatypes")
print(machines.dtypes)
print("\n")
print("telemetry datatypes")
print(telemetry.dtypes)
print("\n")
print("failures datatypes")
print(failures.dtypes)

maint datatypes
[('datetime', 'timestamp'), ('machineID', 'string'), ('comp', 'string')]


errors datatypes
[('datetime', 'timestamp'), ('machineID', 'string'), ('errorID', 'string')]


machines datatypes
[('machineID', 'string'), ('model', 'string'), ('age', 'double')]


telemetry datatypes
[('datetime', 'timestamp'), ('machineID', 'string'), ('volt', 'double'), ('rotate', 'double'), ('pressure', 'double'), ('vibration', 'double')]


failures datatypes
[('datetime', 'timestamp'), ('machineID', 'string'), ('failure', 'string')]


## Build features from each static data features 

In [15]:
"""
Helper function 
"""

# funtion to get the unique class in the columns to encode
def get_unique_features_list(df,column):
    return df.select(column).distinct().rdd.flatMap(lambda x:x).collect()

# create the dummies encode
def Dummies_encode(df,column):
    category = get_unique_features_list(df,column)
    exprs = []
    cols = [column]
    for cat in category:
        number = cat[-1]
        col_name = f"{column}{number}"
        expr = when(col(column) == cat,1).otherwise(0).alias(col_name)
        exprs.append(expr)
    return df.select(df.columns + exprs)

# count the main_column
def cum_count_all(df,column,after_fix = "count"):
    name = f"{column}_{after_fix}"
    df = df.withColumn(name,df[column])
    return df.withColumn(name,row_number().over(Window.partitionBy("machineID").orderBy(["machineID","datetime"])))

def cum_count(df,columns,after_fix = "count"):
    for i in columns:
        windowval = (Window.partitionBy("machineID").orderBy("datetime")
                     .rangeBetween(Window.unboundedPreceding, 0))
        name = f"{i}_{after_fix}"
        df = df.withColumn(name, sum(i).over(windowval))
    return df



In [16]:
"""
Transform maint
"""

maint_list = ["comp1","comp2","comp3","comp4"]
count_maint_list = [i + "_count" for i in maint_list]
To_show = ["datetime","machineID"]
To_show.extend(count_maint_list)
maint = maint.transform(Dummies_encode,"comp").transform(cum_count_all,"comp")\
.transform(cum_count,maint_list)
print(maint.dtypes)
maint.show()

[('datetime', 'timestamp'), ('machineID', 'string'), ('comp', 'string'), ('comp1', 'int'), ('comp2', 'int'), ('comp3', 'int'), ('comp4', 'int'), ('comp_count', 'int'), ('comp1_count', 'bigint'), ('comp2_count', 'bigint'), ('comp3_count', 'bigint'), ('comp4_count', 'bigint')]
+-------------------+---------+-----+-----+-----+-----+-----+----------+-----------+-----------+-----------+-----------+
|           datetime|machineID| comp|comp1|comp2|comp3|comp4|comp_count|comp1_count|comp2_count|comp3_count|comp4_count|
+-------------------+---------+-----+-----+-----+-----+-----+----------+-----------+-----------+-----------+-----------+
|2014-06-01 06:00:00|        1|comp2|    0|    1|    0|    0|         1|          0|          1|          0|          0|
|2014-07-16 06:00:00|        1|comp4|    0|    0|    0|    1|         2|          0|          1|          0|          1|
|2014-07-31 06:00:00|        1|comp3|    0|    0|    1|    0|         3|          0|          1|          1|          1

                                                                                

In [17]:
# view the summarize features
maint.select(To_show).show()

+-------------------+---------+-----------+-----------+-----------+-----------+
|           datetime|machineID|comp1_count|comp2_count|comp3_count|comp4_count|
+-------------------+---------+-----------+-----------+-----------+-----------+
|2014-06-01 06:00:00|        1|          0|          1|          0|          0|
|2014-07-16 06:00:00|        1|          0|          1|          0|          1|
|2014-07-31 06:00:00|        1|          0|          1|          1|          1|
|2014-12-13 06:00:00|        1|          1|          1|          1|          1|
|2015-01-05 06:00:00|        1|          2|          1|          1|          2|
|2015-01-05 06:00:00|        1|          2|          1|          1|          2|
|2015-01-20 06:00:00|        1|          3|          1|          2|          2|
|2015-01-20 06:00:00|        1|          3|          1|          2|          2|
|2015-02-04 06:00:00|        1|          3|          1|          3|          3|
|2015-02-04 06:00:00|        1|         

In [18]:
"""
Transform errors, failures 
"""

print("List of category of errors: ", end = "")
print(get_unique_features_list(errors,"errorID"))
print("\n")
print("List of category of failures: ", end = "")
print(get_unique_features_list(failures,"failure"))

List of category of errors: ['error3', 'error2', 'error4', 'error5', 'error1']


List of category of failures: ['comp1', 'comp2', 'comp3', 'comp4']


In [19]:
errors.show()

+-------------------+---------+-------+
|           datetime|machineID|errorID|
+-------------------+---------+-------+
|2015-01-03 07:00:00|        1| error1|
|2015-01-03 20:00:00|        1| error3|
|2015-01-04 06:00:00|        1| error5|
|2015-01-10 15:00:00|        1| error4|
|2015-01-22 10:00:00|        1| error4|
|2015-01-25 15:00:00|        1| error4|
|2015-01-27 04:00:00|        1| error1|
|2015-03-03 22:00:00|        1| error2|
|2015-03-05 06:00:00|        1| error1|
|2015-03-20 18:00:00|        1| error1|
|2015-03-26 01:00:00|        1| error2|
|2015-03-31 23:00:00|        1| error1|
|2015-04-19 06:00:00|        1| error2|
|2015-04-19 06:00:00|        1| error3|
|2015-04-29 19:00:00|        1| error4|
|2015-05-04 23:00:00|        1| error2|
|2015-05-12 09:00:00|        1| error1|
|2015-05-21 07:00:00|        1| error4|
|2015-05-24 02:00:00|        1| error3|
|2015-05-25 05:00:00|        1| error1|
+-------------------+---------+-------+
only showing top 20 rows



In [20]:
# dummies_encode columns
errors_features = ["errorID" + i[-1] for i in get_unique_features_list(errors,"errorID")]
failures_features = ["failure" + i[-1] for i in get_unique_features_list(failures,"failure")]

# dummies_count columns 
errors_count_features = [i + "_count" for i in errors_features]
failures_count_features = [i + "_count" for i in failures_features]

# transform errors and failures 
errors = errors.transform(Dummies_encode,"errorID").transform(cum_count_all,"errorID")\
.transform(cum_count,errors_features)

failures = failures.transform(Dummies_encode,"failure").transform(cum_count_all,"failure")\
.transform(cum_count,failures_features)

In [21]:
failures.show()

+-------------------+---------+-------+--------+--------+--------+--------+-------------+--------------+--------------+--------------+--------------+
|           datetime|machineID|failure|failure1|failure2|failure3|failure4|failure_count|failure1_count|failure2_count|failure3_count|failure4_count|
+-------------------+---------+-------+--------+--------+--------+--------+-------------+--------------+--------------+--------------+--------------+
|2015-01-05 06:00:00|        1|  comp4|       0|       0|       0|       1|            1|             0|             0|             0|             1|
|2015-03-06 06:00:00|        1|  comp1|       1|       0|       0|       0|            2|             1|             0|             0|             1|
|2015-04-20 06:00:00|        1|  comp2|       0|       1|       0|       0|            3|             1|             1|             0|             1|
|2015-06-19 06:00:00|        1|  comp4|       0|       0|       0|       1|            4|           

In [22]:
errors.show()

+-------------------+---------+-------+--------+--------+--------+--------+--------+-------------+--------------+--------------+--------------+--------------+--------------+
|           datetime|machineID|errorID|errorID3|errorID2|errorID4|errorID5|errorID1|errorID_count|errorID3_count|errorID2_count|errorID4_count|errorID5_count|errorID1_count|
+-------------------+---------+-------+--------+--------+--------+--------+--------+-------------+--------------+--------------+--------------+--------------+--------------+
|2015-01-03 07:00:00|        1| error1|       0|       0|       0|       0|       1|            1|             0|             0|             0|             0|             1|
|2015-01-03 20:00:00|        1| error3|       1|       0|       0|       0|       0|            2|             1|             0|             0|             0|             1|
|2015-01-04 06:00:00|        1| error5|       0|       0|       0|       1|       0|            3|             1|             0|  

In [23]:
"""
Join telemetry and machines dataframes
"""

telemetry = telemetry.join(machines, on = "machineID",how = "left")
telemetry.show()

+---------+-------------------+----------------+----------------+----------------+----------------+------+----+
|machineID|           datetime|            volt|          rotate|        pressure|       vibration| model| age|
+---------+-------------------+----------------+----------------+----------------+----------------+------+----+
|        1|2015-01-01 06:00:00|176.217853015625|418.504078221616|113.077935462083|45.0876857639276|model3|18.0|
|        1|2015-01-01 07:00:00| 162.87922289706|402.747489565395|95.4605253823187|43.4139726834815|model3|18.0|
|        1|2015-01-01 08:00:00|170.989902405567|527.349825452291|75.2379048586662|34.1788471214451|model3|18.0|
|        1|2015-01-01 09:00:00|162.462833264092|346.149335043074|109.248561276504|41.1221440884256|model3|18.0|
|        1|2015-01-01 10:00:00| 157.61002119306|435.376873016938|111.886648210168|25.9905109982024|model3|18.0|
|        1|2015-01-01 11:00:00|172.504839196295|430.323362106675|95.9270416939636|35.6550173268837|model

In [24]:
"""
Extend features of time stamp for period analysis: 
We do this through three steps:
create a minimum values of datetime
create time range from that begin - end 
add machine ID (Done)
merge to needed features from dataframes - ex: errors, maint, failure
select only the count columns 
fillna count columns with 0, 
using max(upper preceding window to current) to perform cum_max
perform function to create lookup_table 
and update table (look at the trial notebook in Machine_Learning folder)
"""
min_maint_date = maint.select(min(col("datetime"))).collect()[0][0].timestamp()
print(f"time stamp of min maint date: {min_maint_date}")
min_date = telemetry.select(min(col("datetime"))).collect()[0][0].timestamp()
print(f"time stamp of min date: {min_date}")
max_date = telemetry.select(max(col("datetime"))).collect()[0][0].timestamp()
print(f"time stamp of max date: {max_date}")


time stamp of min maint date: 1401577200.0


                                                                                

time stamp of min date: 1420066800.0


[Stage 49:>                                                         (0 + 1) / 1]

time stamp of max date: 1451602800.0


                                                                                

In [25]:
"""
create subdf contain only features that needed for analyzing
"""
# maint subdf
maint_count_columns = ["datetime","machineID","comp_count","comp1_count", \
                       "comp2_count","comp3_count", "comp4_count"]
maint_count_features = maint.select(maint_count_columns)

# errors subdf 
failures_count_columns = ["datetime","machineID","failure_count", "failure1_count", \
                          "failure2_count", "failure3_count", "failure4_count"]
failures_count_features = failures.select(failures_count_columns)

# errors subdf
errors_count_columns = ["datetime","machineID","errorID_count","errorID1_count", \
                        "errorID2_count", "errorID3_count", "errorID4_count"]
errors_count_features = errors.select(errors_count_columns)


In [26]:
"""
we create a different time df for maint because they're having data back to 2014
while the other dataframe only having data in 2015 
"""
maint_time_df = ses.range(min_maint_date,max_date,step =3600)\
.withColumn("id",from_unixtime("id")) \
.withColumn("id",to_timestamp("id")) \
.withColumnRenamed("id","datetime")

time_df = ses.range(min_date,max_date,step =3600)\
.withColumn("id",from_unixtime("id")) \
.withColumn("id",to_timestamp("id")) \
.withColumnRenamed("id","datetime")

"""
Adding the machineID
"""

maint_time_dfs = []
for i in range(1,101):
    maint_time_dfs.append(maint_time_df.withColumn("machineID",lit(str(i))))
maintTimeDFs = reduce(DataFrame.unionAll, maint_time_dfs)

time_dfs = []
for i in range(1,101):
    time_dfs.append(time_df.withColumn("machineID",lit(str(i))))
timeDFs = reduce(DataFrame.unionAll, time_dfs)


In [27]:
"""
Helper functions, explaination of each will be commeted on the functions itself
"""

def date_of_last_event(df,event_column):
    """
    get the date since last event happened
    """
    windowval = (Window.partitionBy(["machineID",event_column]).orderBy("datetime")
                 .rangeBetween(Window.unboundedPreceding, 0))
    name = event_column.split("_")[0]
    name = f"{name}_date"
    df = df.withColumn(name, min("datetime").over(windowval))
    return df

def fillna(df,values_dict):
    return df.na.fill(values_dict)

def cum_max(df,columns):
    """
    get the max values of a columns partition by its machineID, index by datetime
    """
    for i in columns:
        if i != "datetime" and i != "machineID":
            windowval = (Window.partitionBy("machineID").orderBy("datetime")
                         .rangeBetween(Window.unboundedPreceding, 0))
            name = f"{i}_max"
            df = df.withColumn(i, max(i).over(windowval))
    return df

"""
Adding columns of the timestamp an event happens
Ex: when did the last component change happen
"""
def create_date_features(df_time):
    """
    Explain code: 
    the function iterate through the dataframe columns 
    At each iteration: 
    The function create partition dataframe using 2 columns: machineID and that column
    the new features added will be the min(datetime) of the partition dataframe 
    that features reflect the first date the event happen:
    
    df_time : dataframe that haven't extract information of period since last events yet
    Ex: timeDFs, maintDFs
    """
    for i in df_time.columns:
        if i != "datetime" and i != "machineID":
            df_time = df_time.transform(date_of_last_event,i)
    return df_time

"""
adding column of how long since a last event happen
Ex: when did the last component change happen 
"""

def get_period(df):
    """
    Transform information about the date of the last events happend
    to the period since that events happen (in hours)
    """
    for i in df.columns:
        if i != "datetime" and "date" in i:
            col_name = i.split("_")[0]
            col_name = f"period_{col_name}"
            df = df.withColumn(col_name,(col("datetime").cast("long") - 
                                         col(i).cast("long"))/3600)
    return df

def selected_columns(df):
    """
    Return dataframe contain only the columns we need, the period since the last events happend
    and the "machineID" and "Datetime" to be indexer match with original dataframe
    """
    columns = ["datetime","machineID"]
    for i in df.columns:
        if "period" in i:
            columns.append(i)
    return df.select(columns)

In [28]:
"""
Merge subdf with created time df to create features of period since last event
"""

maintTimeDFs = maintTimeDFs.join(maint_count_features, 
on =["datetime","machineID"], how = "left").na.fill(0) \
.transform(cum_max, maintTimeDFs.columns)\
.transform(create_date_features) \
.transform(get_period)


timeDFs = timeDFs.join(failures_count_features, on =["datetime","machineID"], how = "left").join(
errors_count_features, on = ["datetime","machineID"], how = "left").na.fill(0) \
.transform(cum_max,timeDFs.columns) \
.transform(create_date_features) \
.transform(get_period)

In [29]:
maintTimeDFs.show()

[Stage 69:>                                                         (0 + 1) / 1]

+-------------------+---------+----------+-----------+-----------+-----------+-----------+-------------------+-------------------+-------------------+-------------------+-------------------+-----------+------------+------------+------------+------------+
|           datetime|machineID|comp_count|comp1_count|comp2_count|comp3_count|comp4_count|          comp_date|         comp1_date|         comp2_date|         comp3_date|         comp4_date|period_comp|period_comp1|period_comp2|period_comp3|period_comp4|
+-------------------+---------+----------+-----------+-----------+-----------+-----------+-------------------+-------------------+-------------------+-------------------+-------------------+-----------+------------+------------+------------+------------+
|2014-06-01 06:00:00|        1|         1|          0|          1|          0|          0|2014-06-01 06:00:00|2014-06-01 06:00:00|2014-06-01 06:00:00|2014-06-01 06:00:00|2014-06-01 06:00:00|        0.0|         0.0|         0.0|       

                                                                                

In [30]:
timeDFs.show()

22/12/16 19:44:42 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.




+-------------------+---------+-------------+--------------+--------------+--------------+--------------+-------------+--------------+--------------+--------------+--------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+---------------+---------------+---------------+---------------+--------------+---------------+---------------+---------------+---------------+
|           datetime|machineID|failure_count|failure1_count|failure2_count|failure3_count|failure4_count|errorID_count|errorID1_count|errorID2_count|errorID3_count|errorID4_count|       failure_date|      failure1_date|      failure2_date|      failure3_date|      failure4_date|       errorID_date|      errorID1_date|      errorID2_date|      errorID3_date|      errorID4_date|period_failure|period_failure1|period_failure2|period_failure3|period_failure4|peri

                                                                                

In [31]:
"""
combine time dataframe and the telemetry dataframe 
"""
telemetry = telemetry.join(timeDFs, on = ["machineID","datetime"], how = "left") \
.join(maintTimeDFs, on = ["machineID","datetime"], how = "left")

In [32]:
def filternull(df,columns):
    df = df.filter(col(columns[0]).isNull())
    columns = columns[1:]
    if len(columns) > 0:
        return filternull(df,columns)
    return df 

In [33]:
"""
Create features count number of events between maintanance 
ex: 
number of errors happened since the last maintainance
"""

def cum_count_between_maint(df, columns, after_fix = "since_last_maint"):
    for i in columns:
        windowval = (Window.partitionBy(["machineID","comp_count"]).orderBy("datetime")
                         .rangeBetween(Window.unboundedPreceding, 0))
        name = f"{i}_{after_fix}"
        df = df.withColumn(name, sum(i).over(windowval))
    return df      

In [34]:
errors.limit(5).show()

+-------------------+---------+-------+--------+--------+--------+--------+--------+-------------+--------------+--------------+--------------+--------------+--------------+
|           datetime|machineID|errorID|errorID3|errorID2|errorID4|errorID5|errorID1|errorID_count|errorID3_count|errorID2_count|errorID4_count|errorID5_count|errorID1_count|
+-------------------+---------+-------+--------+--------+--------+--------+--------+-------------+--------------+--------------+--------------+--------------+--------------+
|2015-01-03 07:00:00|        1| error1|       0|       0|       0|       0|       1|            1|             0|             0|             0|             0|             1|
|2015-01-03 20:00:00|        1| error3|       1|       0|       0|       0|       0|            2|             1|             0|             0|             0|             1|
|2015-01-04 06:00:00|        1| error5|       0|       0|       0|       1|       0|            3|             1|             0|  

In [35]:
col_errors = ["datetime","machineID","errorID","errorID1","errorID2","errorID3","errorID4","errorID5"]
telemetry = telemetry.join(errors.select(col_errors), on = ["datetime","machineID"], how = "left")

In [36]:
col_errors[3:]

['errorID1', 'errorID2', 'errorID3', 'errorID4', 'errorID5']

In [37]:
for i in col_errors[3:]:
    telemetry = telemetry.na.fill({i:0})    

In [38]:
telemetry = telemetry.na.fill({"errorID":"ok"})

In [39]:
def get_encode(name):
    if name == "ok":
        return 0
    return 1 
encode_errors_udf = F.udf(lambda s:get_encode(s),IntegerType())

In [40]:
telemetry = telemetry.withColumn("errorID",encode_errors_udf("errorID") )

In [41]:
telemetry.limit(1).collect()[0]["errorID"]



22/12/16 19:45:30 WARN DAGScheduler: Broadcasting large task binary with size 1087.7 KiB


                                                                                

0

In [42]:
telemetry = telemetry.transform(cum_count_between_maint,col_errors[2:])

In [43]:
telemetry.write.option("header",True).csv("./pipeline_res/transformed.csv")

[Stage 370:>                                                        (0 + 1) / 1]

22/12/16 19:46:05 WARN DAGScheduler: Broadcasting large task binary with size 1087.8 KiB




22/12/16 19:46:15 WARN DAGScheduler: Broadcasting large task binary with size 1234.1 KiB


                                                                                