## **Real-Time Predictive Maintenance : Aircraft Engine**

In [None]:
from pyspark.ml import PipelineModel
import os
from pyspark.sql.session import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from pyspark.sql.types import TimestampType, IntegerType

In [None]:
#importing kafka & mongodb connector module

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,org.mongodb.spark:mongo-spark-connector_2.12:10.0.5" pyspark-shell'

In [None]:
#creating a spark session

spark_session = \
  SparkSession.builder\
              .appName("Session 05 Optional ")\
              .getOrCreate()

print(f"This cluster relies on Spark '{spark_session.version}'")

In [None]:
#creating & loading data from speed_event topic of kafka to a dataFrame using streamReader
event_dataframe = \
  spark_session.readStream\
               .format("kafka") \
               .option("kafka.bootstrap.servers", "localhost:9092") \
               .option("subscribe", "Aircraft_Sensor_Data") \
               .option("startingOffsets", "latest") \
               .load()

event_dataframe.printSchema()

### Data Preprocessing

In [None]:
#casting the value column to String for further manipulation

event_dataframe = \
  event_dataframe.select(F.col("value").cast(StringType()))

event_dataframe.printSchema()

In [None]:
#performing transformations on the dataFrame

sensor_df = \
    event_dataframe.select(F.split("value","\|").alias("fields"))\
                 .withColumn("OpSetting_1", F.col("fields").getItem(0).cast(IntegerType()))\
                 .withColumn("OpSetting_2", F.col("fields").getItem(1).cast(IntegerType()))\
                 .withColumn("OpSetting_3", F.col("fields").getItem(2).cast(IntegerType()))\
                 .withColumn("Sensor_1", F.col("fields").getItem(3).cast(IntegerType()))\
                 .withColumn("Sensor_2", F.col("fields").getItem(4).cast(IntegerType()))\
                 .withColumn("Sensor_3", F.col("fields").getItem(5).cast(IntegerType()))\
                 .withColumn("Sensor_4", F.col("fields").getItem(6).cast(IntegerType()))\
                 .withColumn("Sensor_5", F.col("fields").getItem(7).cast(IntegerType()))\
                 .withColumn("Sensor_6", F.col("fields").getItem(8).cast(IntegerType()))\
                 .withColumn("Sensor_7", F.col("fields").getItem(9).cast(IntegerType()))\
                 .withColumn("Sensor_8", F.col("fields").getItem(10).cast(IntegerType()))\
                 .withColumn("Sensor_9", F.col("fields").getItem(11).cast(IntegerType()))\
                 .withColumn("Sensor_10", F.col("fields").getItem(12).cast(IntegerType()))\
                 .withColumn("Sensor_11", F.col("fields").getItem(13).cast(IntegerType()))\
                 .withColumn("Sensor_12", F.col("fields").getItem(14).cast(IntegerType()))\
                 .withColumn("Sensor_13", F.col("fields").getItem(15).cast(IntegerType()))\
                 .withColumn("Sensor_14", F.col("fields").getItem(16).cast(IntegerType()))\
                 .withColumn("Sensor_15", F.col("fields").getItem(17).cast(IntegerType()))\
                 .withColumn("Sensor_16", F.col("fields").getItem(18).cast(IntegerType()))\
                 .withColumn("Sensor_17", F.col("fields").getItem(19).cast(IntegerType()))\
                 .withColumn("Sensor_18", F.col("fields").getItem(20).cast(IntegerType()))\
                 .withColumn("Sensor_19", F.col("fields").getItem(21).cast(IntegerType()))\
                 .withColumn("Sensor_20", F.col("fields").getItem(22).cast(IntegerType()))\
                 .withColumn("Sensor_21", F.col("fields").getItem(23).cast(IntegerType()))\
                 .withColumn("Timestamp", F.current_timestamp())\
                 .drop("fields")

In [None]:
sensor_df.printSchema()

### Implementing ML Pipeline & Predicting RUL

In [None]:
classifierPipeline = PipelineModel.load("hdfs://localhost:9000/datalake/raw/predictive-maintenance/ML_Pipeline")


predictions = classifierPipeline.transform(sensor_df)
predictions = predictions.select("Timestamp","features_raw", "prediction")

In [None]:
console_sink = \
  predictions.writeStream\
                            .format("console")\
                            .outputMode("append")\
                            .trigger(processingTime='10 seconds')

In [None]:
ap = console_sink.start()

In [None]:
console_sink.stop()

### Writing the Predictions to MongoDB

In [None]:
mongoDB_sink = predictions.writeStream\
                        .format("mongodb")\
                        .option("connection.uri", "mongodb://localhost:27017/Aircraft_Engine.Predictions") \
                        .outputMode("append")\
                        .trigger(processingTime='10 seconds')

In [None]:
application_handler = mongoDB_sink.start()

In [None]:
application_handler.status

In [None]:
application_handler.stop()