# ETL


## Data Sources¶
Common data sources for predictive maintenance problems are :

* **Failure history**: The failure history of a machine or component within the machine.
* **Maintenance history**: The repair history of a machine, e.g. error codes, previous maintenance activities or component replacements.
* **Machine conditions and usage**: The operating conditions of a machine e.g. data collected from sensors.
* **Machine features**: The features of a machine, e.g. engine size, make and model, location.
* **Operator features**: The features of the operator, e.g. gender, past experience The data for this example comes from 4 different sources which are real-time telemetry data collected from machines, error messages, historical maintenance records that include failures and machine information such as type and age.

In [2]:
import findspark
findspark.init()

findspark.find()
import pyspark
findspark.find()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *

sc = pyspark.SparkContext(appName="ETL")
spark = SparkSession(sc)

In [33]:
from pyspark.sql.types import *
    
telemetry_schema = StructType([
  # Define a StructField for each field
  StructField('datetime', TimestampType(), False),
  StructField('machineID', IntegerType(), False),
  StructField('volt', DoubleType(), False),
  StructField('rotate', DoubleType(), False),
  StructField('pressure', DoubleType(), False),
  StructField('vibration', DoubleType(), False)
])

errors_schema = StructType([
  # Define a StructField for each field
  StructField('datetime', TimestampType(), False),
  StructField('machineID', IntegerType(), False),
  StructField('errorID', StringType(), False)
])

maint_schema = StructType([
  # Define a StructField for each field
  StructField('datetime', TimestampType(), False),
  StructField('machineID', IntegerType(), False),
  StructField('comp', StringType(), False)
])

failures_schema = StructType([
  # Define a StructField for each field
  StructField('datetime', StringType(), False),
  StructField('machineID', IntegerType(), False),
  StructField('failure', StringType(), False)
])

machines_schema = StructType([
  # Define a StructField for each field
  StructField('machineID', IntegerType(), False),
  StructField('model', StringType(), False),
  StructField('age', IntegerType(), False)
])

In [34]:
telemetry = spark.read.csv('../data/PdM_telemetry.csv', header = True, schema = telemetry_schema)
errors = spark.read.csv('../data/PdM_errors.csv', header = True, schema = errors_schema)
maint = spark.read.csv('../data/PdM_maint.csv', header = True, schema = maint_schema)
failures = spark.read.csv('../data/PdM_failures.csv', header = True, schema = failures_schema)
machines = spark.read.csv('../data/PdM_machines.csv', header = True, schema = machines_schema)

In [35]:
telemetry.printSchema()

root
 |-- datetime: timestamp (nullable = true)
 |-- machineID: integer (nullable = true)
 |-- volt: double (nullable = true)
 |-- rotate: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- vibration: double (nullable = true)



In [36]:
telemetry.show(10)

+-------------------+---------+----------------+----------------+----------------+----------------+
|           datetime|machineID|            volt|          rotate|        pressure|       vibration|
+-------------------+---------+----------------+----------------+----------------+----------------+
|2015-01-01 06:00:00|        1|176.217853015625|418.504078221616|113.077935462083|45.0876857639276|
|2015-01-01 07:00:00|        1| 162.87922289706|402.747489565395|95.4605253823187|43.4139726834815|
|2015-01-01 08:00:00|        1|170.989902405567|527.349825452291|75.2379048586662|34.1788471214451|
|2015-01-01 09:00:00|        1|162.462833264092|346.149335043074|109.248561276504|41.1221440884256|
|2015-01-01 10:00:00|        1| 157.61002119306|435.376873016938|111.886648210168|25.9905109982024|
|2015-01-01 11:00:00|        1|172.504839196295|430.323362106675|95.9270416939636|35.6550173268837|
|2015-01-01 12:00:00|        1|156.556030606329|499.071623068962|111.755684290096|42.7539196974773|


In [37]:
# Write the data sets to intermediate storage

telemetry.write.parquet("../Python/input/PdM_telemetry.parquet")
errors.write.parquet("../Python/input/PdM_errors.parquet")
maint.write.parquet("../Python/input/PdM_maint.parquet")
failures.write.parquet("../Python/input/PdM_failures.parquet")
machines.write.parquet("../Python/input/PdM_machines.parquet")