# <font color='blue'>IoT-Analysis</font>

### <font>Real-Time IoT Sensor Data Analysis with Apache Spark Streaming and Apache Kafka</font>

In [1]:
# Import findspark and initialize
import findspark
findspark.init()

In [2]:
# Import required modules
import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, from_json

> We need to add Spark Streaming Integration Connector with Apache Kafka. Pay attention to the version of PySpark being used.

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

In [3]:
# Conector
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

## Creating the Spark Session

In [4]:
# Create the Spark session
spark = SparkSession.builder.appName("IoT-Analysis").getOrCreate()

## Read Kafka Spark Structured Stream

In [5]:
# Let's create a subscription to the topic that has the data stream we want to "pull" the data from.
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "IotAnalysis") \
  .load()

## Data Source Schema Definition

In [6]:
# We define the schema of the data we want to capture for analysis (temperature)
data_schema_temp = StructType([StructField("reading", 
                                             StructType([StructField("temperature", DoubleType(), True)]), True)])

In [7]:
# Define the global data schema in the stream
data_schema = StructType([ 
    StructField("id_sensor", StringType(), True), 
    StructField("equipment_id", StringType(), True), 
    StructField("sensor", StringType(), True), 
    StructField("date_event", StringType(), True), 
    StructField("standard", data_schema_temp, True)
])

## Parse the Data Source

In [8]:
# Capture each line of data (each value) as a string
df_conversion = df.selectExpr("CAST(value AS STRING)")

In [9]:
# Parse JSON format into dataframe
df_conversion = df_conversion.withColumn("jsonData", from_json(col("value"), data_schema)).select("jsonData.*")

In [10]:
df_conversion.printSchema()

root
 |-- id_sensor: string (nullable = true)
 |-- equipment_id: string (nullable = true)
 |-- sensor: string (nullable = true)
 |-- date_event: string (nullable = true)
 |-- standard: struct (nullable = true)
 |    |-- reading: struct (nullable = true)
 |    |    |-- temperature: double (nullable = true)



## Prepare the Dataframe

This dataframe is in the format we need for parsing.

In [11]:
# We renamed the columns to simplify our analysis
df_conversion_temp_sensor = df_conversion.select(col("standard.reading.temperature").alias("temperature"), 
                                               col("sensor"))

In [12]:
df_conversion_temp_sensor.printSchema()

root
 |-- temperature: double (nullable = true)
 |-- sensor: string (nullable = true)



## Real Time Data Analysis

In [13]:
# Here we have the object that will contain our analysis, the calculation of average temperatures per sensor
df_avg_temp_sensor = df_conversion_temp_sensor.groupby("sensor").mean("temperature")

In [14]:
df_avg_temp_sensor.printSchema()

root
 |-- sensor: string (nullable = true)
 |-- avg(temperature): double (nullable = true)



In [15]:
# We renamed the columns to simplify our analysis
df_avg_temp_sensor = df_avg_temp_sensor.select(col("sensor").alias("sensor"), 
                                                   col("avg(temperature)").alias("avg_temp"))

In [16]:
df_avg_temp_sensor.printSchema()

root
 |-- sensor: string (nullable = true)
 |-- avg_temp: double (nullable = true)



Below we open the streaming for real-time data analysis, printing the result on the console.

In [17]:
# Object that starts querying the streaming in console format
query = df_avg_temp_sensor.writeStream.outputMode("complete").format("console").start()

Upload new files to Kafka to see the real-time analysis over here. Click the Stop button in the top menu to stop the cell at any time.

In [None]:
# Execute the streaming query and prevent the process from being terminated
query.awaitTermination()

In [18]:
query.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [19]:
query.lastProgress

In [20]:
query.explain()

== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@464e5a35, org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2336/0x00000001010f5840@714443c7
+- *(4) HashAggregate(keys=[sensor#28], functions=[avg(temperature#36)])
   +- StateStoreSave [sensor#28], state info [ checkpoint = file:/C:/Users/Caio/AppData/Local/Temp/temporary-b8fdda09-504d-4ab0-9bcc-e8aeffb2747b/state, runId = 7096587f-a337-40de-94e9-34a963c99a5c, opId = 0, ver = 0, numPartitions = 200], Complete, 0, 2
      +- *(3) HashAggregate(keys=[sensor#28], functions=[merge_avg(temperature#36)])
         +- StateStoreRestore [sensor#28], state info [ checkpoint = file:/C:/Users/Caio/AppData/Local/Temp/temporary-b8fdda09-504d-4ab0-9bcc-e8aeffb2747b/state, runId = 7096587f-a337-40de-94e9-34a963c99a5c, opId = 0, ver = 0, numPartitions = 200], 2
            +- *(2) HashAggregate(keys=[sensor#28], functions=[merge_avg(temperature#36)])
               +- Exch

## Real Time Data Analysis

In [21]:
# Object that starts the query to the stream with memory format (creates a temporary table)
query_memoria = df_avg_temp_sensor \
    .writeStream \
    .queryName("Iot_Analysis") \
    .outputMode("complete") \
    .format("memory") \
    .start()

In [22]:
# streams enabled
spark.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x219fb9d4b20>,
 <pyspark.sql.streaming.StreamingQuery at 0x219fbb53190>]

In [None]:
# Let's keep the query running for a while and apply SQL to the data in real time
from time import sleep

for x in range(10):
    
    spark.sql("select sensor, round(avg_temp, 2) as avg from Iot_Analysis where avg_temp > 65").show()
    sleep(3)
    
query_memoria.stop()

## Disclaimer:
A good part of this project was largely done in the Data Science Academy, Big Data Real-Time Analytics with Python and Spark course (part of the Data Scientist training)

# End