In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from platform import python_version

print(python_version())


3.9.7


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, decode 
import json
from pyspark.sql import functions as F
from pyspark.sql import Row
# from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType


In [3]:
print("STarting a spark Session")
spark = SparkSession.builder \
    .appName("ConsumeKafka") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.0") \
    .getOrCreate()
print("Session Established!")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

STarting a spark Session
Session Established!


In [4]:
df_raw = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "input") \
    .load()
#df will be retrned as binary so needs parsing 
df = df_raw.select(decode(col("value"), "UTF-8").alias("value_string"))


In [5]:
#df will be in json format so convert to df using the structure of the file 
def parse_and_cast(json_str):
    # Parse the JSON string
    data = json.loads(json_str)

    # Manually cast the fields
    return Row(
        _c0=str(data["_c0"]),
        UTC=str(data["UTC"]),
        Temperature_C=float(data["Temperature[C]"]),
        Humidity_Percent=float(data["Humidity[%]"]),
        TVOC_ppb=int(data["TVOC[ppb]"]),
        eCO2_ppm=int(data["eCO2[ppm]"]),
        Raw_H2=int(data["Raw H2"]),
        Raw_Ethanol=int(data["Raw Ethanol"]),
        Pressure_hPa=float(data["Pressure[hPa]"]),
        PM1=float(data["PM1"]),
        PM2_5=float(data["PM2_5"]),
        NC0_5=int(data["NC0_5"]),
        NC1_5=float(data["NC1_5"]),
        NC2_5=float(data["NC2_5"]),
        CNT=int(data["CNT"]),
        Fire_Alarm=str(data["Fire Alarm"])
    )

# Apply the function to each row
# rdd = df.rdd.map(lambda row: parse_and_cast(row.value_string))

# Convert the RDD back to a DataFrame
# new_df = spark.createDataFrame(rdd)

# Show the result
# new_df.show(truncate=False)

In [6]:
rdd = df.rdd.map(lambda row: parse_and_cast(row.value_string))

In [7]:
new_df = spark.createDataFrame(rdd)

In [8]:
new_df.show(truncate = False)

+---+----------+-------------+----------------+--------+--------+------+-----------+------------+----+-----+-----+-----+-----+---+----------+
|_c0|UTC       |Temperature_C|Humidity_Percent|TVOC_ppb|eCO2_ppm|Raw_H2|Raw_Ethanol|Pressure_hPa|PM1 |PM2_5|NC0_5|NC1_5|NC2_5|CNT|Fire_Alarm|
+---+----------+-------------+----------------+--------+--------+------+-----------+------------+----+-----+-----+-----+-----+---+----------+
|0  |1654733331|20.0         |57.36           |0       |400     |12306 |18520      |939.735     |0.0 |0.0  |0    |0.0  |0.0  |0  |0         |
|1  |1654733332|20.015       |56.67           |0       |400     |12345 |18651      |939.744     |0.0 |0.0  |0    |0.0  |0.0  |1  |0         |
|2  |1654733333|20.029       |55.96           |0       |400     |12374 |18764      |939.738     |0.0 |0.0  |0    |0.0  |0.0  |2  |0         |
|3  |1654733334|20.044       |55.28           |0       |400     |12390 |18849      |939.736     |0.0 |0.0  |0    |0.0  |0.0  |3  |0         |
|4  |1

In [9]:
aggregated_df = new_df.groupBy("Fire_Alarm").agg(
    F.max("Temperature_C").alias("Max_Temperature"),
    F.min("Temperature_C").alias("Min_Temperature"),
    F.avg("Humidity_Percent").alias("Avg_Humidity"),
)
aggregated_df.show()

+----------+---------------+---------------+-----------------+
|Fire_Alarm|Max_Temperature|Min_Temperature|     Avg_Humidity|
+----------+---------------+---------------+-----------------+
|         0|         20.146|           20.0|54.39181818181819|
+----------+---------------+---------------+-----------------+



In [15]:
new_df.createOrReplaceTempView("sensor_data")


result_df = spark.sql("""
    SELECT `Temperature_C`, COUNT(*) as `count`
    FROM sensor_data
    GROUP BY `Temperature_C`
    ORDER BY `Temperature_C`
""")

result_df.show()

+-------------+-----+
|Temperature_C|count|
+-------------+-----+
|         20.0|    1|
|       20.015|    1|
|       20.029|    1|
|       20.044|    1|
|       20.059|    1|
|       20.073|    1|
|       20.088|    1|
|       20.103|    1|
|       20.117|    1|
|       20.132|    1|
|       20.146|    1|
+-------------+-----+



In [10]:
pivoting_df = new_df.groupBy("Fire_Alarm") \
                    .pivot("Fire_Alarm") \
                    .agg(F.max("Temperature_C").alias("Max_Temperature"),
                         F.min("Temperature_C").alias("Min_Temperature"),
                         F.avg("Humidity_Percent").alias("Avg_Humidity"))

pivoting_df.show()

+----------+-----------------+-----------------+-----------------+
|Fire_Alarm|0_Max_Temperature|0_Min_Temperature|   0_Avg_Humidity|
+----------+-----------------+-----------------+-----------------+
|         0|           20.146|             20.0|54.39181818181819|
+----------+-----------------+-----------------+-----------------+



In [12]:
rollup_df = new_df.rollup("Fire_Alarm") \
                  .agg(F.max("Temperature_C").alias("Max_Temperature"),
                       F.min("Temperature_C").alias("Min_Temperature"),
                       F.avg("Humidity_Percent").alias("Avg_Humidity"))
rollup_df.show()

+----------+---------------+---------------+-----------------+
|Fire_Alarm|Max_Temperature|Min_Temperature|     Avg_Humidity|
+----------+---------------+---------------+-----------------+
|      NULL|         20.146|           20.0|54.39181818181819|
|         0|         20.146|           20.0|54.39181818181819|
+----------+---------------+---------------+-----------------+



In [13]:
cubes_df = new_df.cube("Fire_Alarm") \
                  .agg(F.max("Temperature_C").alias("Max_Temperature"),
                       F.min("Temperature_C").alias("Min_Temperature"),
                       F.avg("Humidity_Percent").alias("Avg_Humidity"))

cubes_df.show()

+----------+---------------+---------------+-----------------+
|Fire_Alarm|Max_Temperature|Min_Temperature|     Avg_Humidity|
+----------+---------------+---------------+-----------------+
|      NULL|         20.146|           20.0|54.39181818181819|
|         0|         20.146|           20.0|54.39181818181819|
+----------+---------------+---------------+-----------------+



In [14]:
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("Fire_Alarm").orderBy(F.desc("Temperature_C"))

ranking_df = new_df.withColumn("Rank", F.rank().over(windowSpec))

ranking_df.show()

+---+----------+-------------+----------------+--------+--------+------+-----------+------------+----+-----+-----+-----+-----+---+----------+----+
|_c0|       UTC|Temperature_C|Humidity_Percent|TVOC_ppb|eCO2_ppm|Raw_H2|Raw_Ethanol|Pressure_hPa| PM1|PM2_5|NC0_5|NC1_5|NC2_5|CNT|Fire_Alarm|Rank|
+---+----------+-------------+----------------+--------+--------+------+-----------+------------+----+-----+-----+-----+-----+---+----------+----+
| 10|1654733341|       20.146|           52.15|       0|     400| 12454|      19230|     939.757|0.89| 3.71|    0|4.289| 2.73| 10|         0|   1|
|  9|1654733340|       20.132|           52.46|       0|     400| 12453|      19195|     939.756| 0.9| 3.78|    0|4.369| 2.78|  9|         0|   2|
|  8|1654733339|       20.117|           52.81|       0|     400| 12448|      19155|     939.758| 0.0|  0.0|    0|  0.0|  0.0|  8|         0|   3|
|  7|1654733338|       20.103|            53.2|       0|     400| 12439|      19114|     939.758| 0.0|  0.0|    0|  0.

In [15]:
windowSpec = Window.partitionBy("Fire_Alarm").orderBy("UTC").rowsBetween(Window.unboundedPreceding, Window.currentRow)

analytic_df = new_df.withColumn("Cumulative_Humidity", F.sum("Humidity_Percent").over(windowSpec))

analytic_df.show()

+---+----------+-------------+----------------+--------+--------+------+-----------+------------+----+-----+-----+-----+-----+---+----------+-------------------+
|_c0|       UTC|Temperature_C|Humidity_Percent|TVOC_ppb|eCO2_ppm|Raw_H2|Raw_Ethanol|Pressure_hPa| PM1|PM2_5|NC0_5|NC1_5|NC2_5|CNT|Fire_Alarm|Cumulative_Humidity|
+---+----------+-------------+----------------+--------+--------+------+-----------+------------+----+-----+-----+-----+-----+---+----------+-------------------+
|  0|1654733331|         20.0|           57.36|       0|     400| 12306|      18520|     939.735| 0.0|  0.0|    0|  0.0|  0.0|  0|         0|              57.36|
|  1|1654733332|       20.015|           56.67|       0|     400| 12345|      18651|     939.744| 0.0|  0.0|    0|  0.0|  0.0|  1|         0|             114.03|
|  2|1654733333|       20.029|           55.96|       0|     400| 12374|      18764|     939.738| 0.0|  0.0|    0|  0.0|  0.0|  2|         0|             169.99|
|  3|1654733334|       20.04

In [11]:
from kafka import KafkaProducer
import json

In [12]:
producer = KafkaProducer(bootstrap_servers='localhost:9092',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

In [13]:
kafka_topic = 'output'
def sendKafka(df):
    print("STarting to send the data to kafka")
    count = 0
    for row in df.collect():
        producer.send(kafka_topic, row.asDict())
        producer.flush()
        if count == 10:
            break
        count += 1
    print("data sent!")


In [16]:
sendKafka(result_df)

STarting to send the data to kafka
data sent!


In [17]:
sendKafka(pivoting_df)

STarting to send the data to kafka
data sent!


In [21]:
sendKafka(rollup_df)

STarting to send the data to kafka
data sent!


In [22]:
sendKafka(cubes_df)

STarting to send the data to kafka
data sent!


In [23]:
sendKafka(ranking_df)

STarting to send the data to kafka
data sent!


In [24]:
sendKafka(analytic_df)

STarting to send the data to kafka
data sent!


In [25]:
producer.close()

In [26]:
spark.stop()