In [1]:
# Importing libraries

import os
import io
import time
import json
import struct
import requests 

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext

In [2]:
# Environment set-up
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.5,org.apache.kafka:kafka-clients:2.6.0 pyspark-shell'

In [3]:
# Creating Spark session
spark = (SparkSession.builder.master("local[*]").appName("Electricity Consumption App").getOrCreate())
spark

In [4]:
# Specifying server and topic
topic = "Elec_Cons_Topic" # Same as topic in Stream Generator
servers = "kafka:9092" # Same as server in Stream Generator

In [5]:
Electricity_Cons_sdf = (spark
  .readStream # readStream, not read since we want a streaming df
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("startingOffsets", "earliest")
  .option("subscribe", topic)
  .load())

In [6]:
# DEBUG - We want a streaming data frame
Electricity_Cons_sdf.isStreaming

True

In [7]:
# DEBUG: sdf types
Electricity_Cons_sdf

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [8]:
# Converting binaries into strings
Electricity_Cons_sdf = Electricity_Cons_sdf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [9]:
# DEBUG: sdf types
Electricity_Cons_sdf.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [10]:
# Defining the sdf schema
Electricity_Cons_schema = StructType([StructField("ElectricityConsumption", DoubleType(), True),
                                      StructField("ts", TimestampType(), True)])

In [11]:
# Applying the schema
Electricity_Cons_sdf = (Electricity_Cons_sdf
                      .select(col("key").cast("string"), from_json(col("value"), Electricity_Cons_schema).alias("value"))
                      .select("key","value.*"))

In [12]:
# DEBUG: sdf types
Electricity_Cons_sdf.printSchema()

root
 |-- key: string (nullable = true)
 |-- ElectricityConsumption: double (nullable = true)
 |-- ts: timestamp (nullable = true)



In [13]:
# Checking Kafka-Spark connection
basic_query = (Electricity_Cons_sdf
    .writeStream
    .format("memory") # this is for debug purpose only! DO NOT USE IN PRODUCTION
    .queryName("sinkTable")
    .start())

In [22]:
spark.sql("SELECT * FROM sinkTable").show(10)

+---+----------------------+-------------------+
|key|ElectricityConsumption|                 ts|
+---+----------------------+-------------------+
|EC1|                 950.0|2022-10-31 14:39:40|
|EC1|                 939.0|2022-10-31 14:39:51|
|EC1|                 943.0|2022-10-31 14:40:01|
|EC1|                 971.0|2022-10-31 14:40:11|
|EC1|                1014.0|2022-10-31 14:40:21|
|EC1|                1041.0|2022-10-31 14:40:31|
|EC1|                1023.0|2022-10-31 14:40:41|
|EC1|                1030.0|2022-10-31 14:40:51|
|EC1|                1004.0|2022-10-31 14:41:01|
|EC1|                 995.0|2022-10-31 14:41:11|
+---+----------------------+-------------------+
only showing top 10 rows



In [15]:
# create a logic table on top of the streaming data frame
Electricity_Cons_sdf.createTempView("AVG_Electricity_Consumption")

In [16]:
query_string = """
SELECT key, AVG(ElectricityConsumption) 
FROM AVG_Electricity_Consumption
GROUP BY key
"""

# NB - Landmark window is used here, it will fill the memory and stop working! (All time a)
q1 = (spark.sql(query_string)
                     .writeStream
                     .format("memory")
                     .outputMode("complete")
                     .queryName("AVG_Electricity_Consumption")
                     .start())

In [23]:
# look up the most recent results
spark.sql("SELECT * FROM AVG_Electricity_Consumption").show()

+---+---------------------------+
|key|avg(ElectricityConsumption)|
+---+---------------------------+
|EC1|                    1033.65|
+---+---------------------------+



In [18]:
# create a logic table on top of the streaming data frame
Electricity_Cons_sdf.createTempView("AVG_Electricity_Consumption_2")

In [19]:
# Logical Tumbling Window - The average temperature of the last 30 seconds every 30 seconds (was 4 seconds in EPL)
q2 = (Electricity_Cons_sdf
                    .withWatermark("TS", "30 seconds") # Last 30 seconds
                    .groupBy(window("TS", "30 seconds"),"key") # Every 30 seconds
                    .avg("ElectricityConsumption")
                    .writeStream
                    .format("memory")
                    .queryName("AVG_Electricity_Consumption_2")
                    .start())

In [24]:
# look up the most recent results
spark.sql("SELECT * FROM AVG_Electricity_Consumption_2 ORDER BY window DESC").show(5,False)

+------------------------------------------+---+---------------------------+
|window                                    |key|avg(ElectricityConsumption)|
+------------------------------------------+---+---------------------------+
|[2022-10-31 14:45:00, 2022-10-31 14:45:30]|EC1|1080.3333333333333         |
|[2022-10-31 14:44:30, 2022-10-31 14:45:00]|EC1|1087.3333333333333         |
|[2022-10-31 14:44:00, 2022-10-31 14:44:30]|EC1|1067.6666666666667         |
|[2022-10-31 14:43:30, 2022-10-31 14:44:00]|EC1|1035.3333333333333         |
|[2022-10-31 14:43:00, 2022-10-31 14:43:30]|EC1|1039.6666666666667         |
+------------------------------------------+---+---------------------------+
only showing top 5 rows



In [21]:
# Stopping queries
# q1.stop()
# q2.stop()