### Import `libraries`

In [1]:
from pyspark.sql import SparkSession,functions, Window
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType,BooleanType,DateType, TimestampType,LongType,DecimalType,  DoubleType, StringType, ArrayType, LongType,StructField, StructType
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'
import sys
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
import pyspark.pandas as ps
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from datetime import datetime


### Create `SparkSession` to read data from Kafka as source

In [2]:
def SparkSessionStreamingData():
    LoadingDriver = r"x"
    try:
        spark = SparkSession \
        .builder \
        .appName("Streaming from Kafka") \
        .config("spark.streaming.stopGracefullyOnShutdown", True) \
        .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') \
        .config("spark.sql.shuffle.partitions", 4) \
        .config("spark.driver.extraClassPath",LoadingDriver) \
        .master("local[*]") \
        .getOrCreate()
        print("\033[92m Spark Session was successfully created")
    except  Exception:
        print("\033[91m Spark Session wasn't successfully created")
    return spark 

spark = SparkSessionStreamingData()
spark 


[92m Spark Session was successfully created


### Creating a `Kafka Source` for Streaming Quries

In [3]:
def ReadingDataFromKafka():
    topic_name = 'SmokeDetection'
    localhost = 'localhost:9092'
    try:
        df = spark \
            .read \
            .format("kafka") \
            .option("kafka.bootstrap.servers", localhost) \
            .option("subscribe", topic_name) \
            .load()
        print("\033[92m a Kafka Source for Streaming queries was successfully created")
    except Exception:
        print("\033[91m a Kafka Source for Streaming queries wasn't successfully created")
    return df   

df =ReadingDataFromKafka()

[92m a Kafka Source for Streaming queries was successfully created


### Creating `Streaming DataFrames` and `streaming datasets`

In [4]:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show(truncate=False)

+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|key |value                                                                                                                                                                                                                                                                                 |
+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|null|{"UTC": 1654753115, "Temperature[C]": 18.363, "Humidity[%]": 52.94, "TVOC[ppb]": 1195, "eCO2[ppm]": 400, "Raw H2": 12927, "Raw Ethanol":

### Reading `JSON data` from a DataFrame column

In [5]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



### Parsing value from `binary to string`

In [6]:
json_df = df.selectExpr("cast(value as string) as value")
json_df

DataFrame[value: string]

### Parsing the `JSON values from the Data Frame` column

In [7]:
json_schema_df = spark.read.json(json_df.rdd.map(lambda row: row.value))
json_schema_df.show(truncate=False)

+-----+----------+-----------+-----+------+-----+-----+-----+-------------+-----------+------+---------+--------------+----------+---------+
|CNT  |Fire Alarm|Humidity[%]|NC0.5|NC1.0 |NC2.5|PM1.0|PM2.5|Pressure[hPa]|Raw Ethanol|Raw H2|TVOC[ppb]|Temperature[C]|UTC       |eCO2[ppm]|
+-----+----------+-----------+-----+------+-----+-----+-----+-------------+-----------+------+---------+--------------+----------+---------+
|19784|1         |52.94      |12.27|1.913 |0.043|1.78 |1.85 |938.708      |19427      |12927 |1195     |18.363        |1654753115|400      |
|4277 |0         |34.92      |7.17 |1.118 |0.025|1.04 |1.08 |936.86       |19837      |13137 |3118     |39.16         |1654716464|400      |
|752  |0         |41.58      |13.3 |2.074 |0.047|1.93 |2.01 |937.537      |20695      |12790 |55       |8.198         |1655125060|426      |
|23159|1         |54.16      |13.1 |2.043 |0.046|1.9  |1.98 |938.709      |19380      |12971 |1412     |24.34         |1654784502|413      |
|12629|1     

### `Cleaning` & `Transforming` DataFrame 
#### Changing `datatypes`
#### Creating `User Defined Function` to change UTC number to datetime

In [8]:
json_schema_df.columns

['CNT',
 'Fire Alarm',
 'Humidity[%]',
 'NC0.5',
 'NC1.0',
 'NC2.5',
 'PM1.0',
 'PM2.5',
 'Pressure[hPa]',
 'Raw Ethanol',
 'Raw H2',
 'TVOC[ppb]',
 'Temperature[C]',
 'UTC',
 'eCO2[ppm]']

In [9]:
json_schema_df.printSchema()

root
 |-- CNT: long (nullable = true)
 |-- Fire Alarm: long (nullable = true)
 |-- Humidity[%]: double (nullable = true)
 |-- NC0.5: double (nullable = true)
 |-- NC1.0: double (nullable = true)
 |-- NC2.5: double (nullable = true)
 |-- PM1.0: double (nullable = true)
 |-- PM2.5: double (nullable = true)
 |-- Pressure[hPa]: double (nullable = true)
 |-- Raw Ethanol: long (nullable = true)
 |-- Raw H2: long (nullable = true)
 |-- TVOC[ppb]: long (nullable = true)
 |-- Temperature[C]: double (nullable = true)
 |-- UTC: long (nullable = true)
 |-- eCO2[ppm]: long (nullable = true)



In [10]:
json_schema_df = json_schema_df.select("UTC","Temperature[C]","Humidity[%]",
                      "TVOC[ppb]","eCO2[ppm]","Raw H2",
                      "Raw Ethanol","Pressure[hPa]",'`PM1.0`',
                      "`PM2.5`","`NC0.5`","`NC1.0`","`NC2.5`","CNT","Fire Alarm")

json_schema_df.show()

+----------+--------------+-----------+---------+---------+------+-----------+-------------+-----+-----+-----+------+-----+-----+----------+
|       UTC|Temperature[C]|Humidity[%]|TVOC[ppb]|eCO2[ppm]|Raw H2|Raw Ethanol|Pressure[hPa]|PM1.0|PM2.5|NC0.5| NC1.0|NC2.5|  CNT|Fire Alarm|
+----------+--------------+-----------+---------+---------+------+-----------+-------------+-----+-----+-----+------+-----+-----+----------+
|1654753115|        18.363|      52.94|     1195|      400| 12927|      19427|      938.708| 1.78| 1.85|12.27| 1.913|0.043|19784|         1|
|1654716464|         39.16|      34.92|     3118|      400| 13137|      19837|       936.86| 1.04| 1.08| 7.17| 1.118|0.025| 4277|         0|
|1655125060|         8.198|      41.58|       55|      426| 12790|      20695|      937.537| 1.93| 2.01| 13.3| 2.074|0.047|  752|         0|
|1654784502|         24.34|      54.16|     1412|      413| 12971|      19380|      938.709|  1.9| 1.98| 13.1| 2.043|0.046|23159|         1|
|1654745960| 

In [11]:
json_schema_df = json_schema_df.withColumn("UTC", col("UTC").cast("int")) \
                .withColumn("Temperature[C]", col("Temperature[C]").cast(DecimalType(5,2))) \
                .withColumn("Humidity[%]", col("Humidity[%]").cast(DecimalType(5,2))) \
                .withColumn("TVOC[ppb]", col("TVOC[ppb]").cast("int")) \
                .withColumn("eCO2[ppm]", col("eCO2[ppm]").cast("int")) \
                .withColumn("Raw H2", col("Raw H2").cast("int")) \
                .withColumn("Raw Ethanol", col("Raw Ethanol").cast("int")) \
                .withColumn("Pressure[hPa]", col("Pressure[hPa]").cast(DecimalType(5,2))) \
                .withColumn("PM1.0", col("`PM1.0`").cast(DecimalType(5,2))) \
                .withColumn("PM2.5", col("`PM2.5`").cast(DecimalType(5,2))) \
                .withColumn("NC0.5", col("`NC0.5`").cast(DecimalType(5,2))) \
                .withColumn("NC1.0", col("`NC1.0`").cast(DecimalType(5,2))) \
                .withColumn("NC2.5", col("`NC2.5`").cast(DecimalType(5,2))) \
                .withColumn("CNT", col("CNT").cast("int")) \
                .withColumn("Fire Alarm", col("Fire Alarm").cast("int")) 


In [12]:
def to_date(n):
    return datetime.fromtimestamp(n).strftime('%Y-%m-%d %H:%M:%S')

ToDateUDF = udf(lambda z:to_date(z))   


In [13]:
json_schema_df = json_schema_df.withColumn("UTC", ToDateUDF("UTC"))

In [14]:
json_schema_df.printSchema()

root
 |-- UTC: string (nullable = true)
 |-- Temperature[C]: decimal(5,2) (nullable = true)
 |-- Humidity[%]: decimal(5,2) (nullable = true)
 |-- TVOC[ppb]: integer (nullable = true)
 |-- eCO2[ppm]: integer (nullable = true)
 |-- Raw H2: integer (nullable = true)
 |-- Raw Ethanol: integer (nullable = true)
 |-- Pressure[hPa]: decimal(5,2) (nullable = true)
 |-- PM1.0: decimal(5,2) (nullable = true)
 |-- PM2.5: decimal(5,2) (nullable = true)
 |-- NC0.5: decimal(5,2) (nullable = true)
 |-- NC1.0: decimal(5,2) (nullable = true)
 |-- NC2.5: decimal(5,2) (nullable = true)
 |-- CNT: integer (nullable = true)
 |-- Fire Alarm: integer (nullable = true)



In [15]:
json_schema_df.show()

+-------------------+--------------+-----------+---------+---------+------+-----------+-------------+-----+-----+-----+-----+-----+-----+----------+
|                UTC|Temperature[C]|Humidity[%]|TVOC[ppb]|eCO2[ppm]|Raw H2|Raw Ethanol|Pressure[hPa]|PM1.0|PM2.5|NC0.5|NC1.0|NC2.5|  CNT|Fire Alarm|
+-------------------+--------------+-----------+---------+---------+------+-----------+-------------+-----+-----+-----+-----+-----+-----+----------+
|2022-06-09 07:38:35|         18.36|      52.94|     1195|      400| 12927|      19427|       938.71| 1.78| 1.85|12.27| 1.91| 0.04|19784|         1|
|2022-06-08 21:27:44|         39.16|      34.92|     3118|      400| 13137|      19837|       936.86| 1.04| 1.08| 7.17| 1.12| 0.03| 4277|         0|
|2022-06-13 14:57:40|          8.20|      41.58|       55|      426| 12790|      20695|       937.54| 1.93| 2.01|13.30| 2.07| 0.05|  752|         0|
|2022-06-09 16:21:42|         24.34|      54.16|     1412|      413| 12971|      19380|       938.71| 1.90

### Spliting Streaming DataFrames and streaming Datasets from Kafka into `DataFrameFireAlarmOn & DataFrameFireAlarmOff`

In [16]:
dfSDDFireAlarmOn = json_schema_df.filter(col("Fire Alarm") == 1)
dfSDDFireAlarmOff = json_schema_df.filter(col("Fire Alarm") == 0)

In [17]:
dfSDDFireAlarmOn.show()

+-------------------+--------------+-----------+---------+---------+------+-----------+-------------+-----+-----+-----+-----+-----+-----+----------+
|                UTC|Temperature[C]|Humidity[%]|TVOC[ppb]|eCO2[ppm]|Raw H2|Raw Ethanol|Pressure[hPa]|PM1.0|PM2.5|NC0.5|NC1.0|NC2.5|  CNT|Fire Alarm|
+-------------------+--------------+-----------+---------+---------+------+-----------+-------------+-----+-----+-----+-----+-----+-----+----------+
|2022-06-09 07:38:35|         18.36|      52.94|     1195|      400| 12927|      19427|       938.71| 1.78| 1.85|12.27| 1.91| 0.04|19784|         1|
|2022-06-09 16:21:42|         24.34|      54.16|     1412|      413| 12971|      19380|       938.71| 1.90| 1.98|13.10| 2.04| 0.05|23159|         1|
|2022-06-09 05:39:20|         20.42|      46.87|     1042|      557| 12836|      19464|       938.97| 2.25| 2.34|15.48| 2.41| 0.06|12629|         1|
|2022-06-09 04:26:25|         -4.45|      56.23|      324|      400| 13102|      19945|       939.44| 0.32

In [18]:
dfSDDFireAlarmOff.show()

+-------------------+--------------+-----------+---------+---------+------+-----------+-------------+-----+-----+-----+-----+-----+----+----------+
|                UTC|Temperature[C]|Humidity[%]|TVOC[ppb]|eCO2[ppm]|Raw H2|Raw Ethanol|Pressure[hPa]|PM1.0|PM2.5|NC0.5|NC1.0|NC2.5| CNT|Fire Alarm|
+-------------------+--------------+-----------+---------+---------+------+-----------+-------------+-----+-----+-----+-----+-----+----+----------+
|2022-06-08 21:27:44|         39.16|      34.92|     3118|      400| 13137|      19837|       936.86| 1.04| 1.08| 7.17| 1.12| 0.03|4277|         0|
|2022-06-13 14:57:40|          8.20|      41.58|       55|      426| 12790|      20695|       937.54| 1.93| 2.01|13.30| 2.07| 0.05| 752|         0|
|2022-06-09 02:17:52|         25.62|      49.89|       11|      400| 12813|      19802|       939.80| 0.24| 0.25| 1.69| 0.26| 0.01| 541|         0|
|2022-06-09 10:06:41|         12.71|      51.80|       33|      400| 12868|      19818|       939.81| 0.29| 0.30

### `Loading` DataFrames to DWH

In [21]:
def LoadingDataFrameFireAlarmOnToDWH():
  ConnectionString = ""
  try:
    dfSDDFireAlarmOn.write \
    .format("jdbc") \
    .mode("overwrite") \
    .option("url", ConnectionString) \
    .option("dbtable", "SDDFireAlarmOn") \
    .save()
    print("\033[92m DataFrame was successfully loaded")
  except Exception:
    print("\033[91m Data Frame wasn't successfully loaded")
  
LoadingDataFrameFireAlarmOnToDWH()


[92m DataFrame was successfully loaded


In [23]:
def LoadingDataFrameFireAlarmOffToDWH():
  ConnectionString = ""
  try:
    dfSDDFireAlarmOff.write \
    .format("jdbc") \
    .mode("overwrite") \
    .option("url", ConnectionString) \
    .option("dbtable", "SDDFireAlarmOnOFF") \
    .save()
    print("\033[92m DataFrame was successfully loaded")
  except  Exception:
    print("\033[91m Data Frame wasn't successfully loaded")

LoadingDataFrameFireAlarmOffToDWH()

[92m DataFrame was successfully loaded


In [None]:
#https://medium.com/@dogukannulu/data-engineering-end-to-end-project-1-7a7be2a3671
# https://github.com/dogukannulu/kafka_spark_structured_streaming/blob/main/spark_streaming.py
# adding python functions to it!