Importing the libraries.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

The code block below creates the catalog and schemas for our solution. 

The approach utilises a multi-hop data storage architecture (medallion), consisting of bronze, silver, and gold schemas within a 'streaming' catalog. 

In [0]:
try:
    spark.sql("create catalog streaming;")
except:
    print('check if catalog already exists')

try:
    spark.sql("create schema streaming.bronze;")
except:
    print('check if bronze schema already exists')

try:
    spark.sql("create schema streaming.silver")
except:
    print('check if silver schema already exists')

try:
    spark.sql("create schema streaming.gold;")
except:
    print('check if gold schema already exists')

#### Bronze Layer

Set up Azure Event hubs connection string.

In [0]:
# Configuring the connection to Azure Event Hub
connectionString = " Paste connection string here"
eventHubName = "eh-baggio"

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': eventHubName
}

Reading and writing the stream to the bronze layer.

In [0]:
# Reading stream: Load data from Azure Event Hub into DataFrame 'df' using the previously configured settings
df = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load() \

# Displaying stream: Show the incoming streaming data for visualization and debugging purposes
df.display()

# Writing stream: Persist the streaming data to a Delta table 'streaming.bronze.weather' in 'append' mode with checkpointing
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/bronze/weather")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.bronze.weather")

body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties
ew0KICAgICJ0ZW1wZXJhdHVyZSI6IDIwLA0KICAgICJodW1pZGl0eSI6IDYwLA0KICAgICJ3aW5kU3BlZWQiOiAxMCwNCiAgICAid2luZERpcmVjdGlvbiI6ICJOVyIsDQogICAgInByZWNpcGl0YXRpb24iOiAwLA0KICAgICI= (truncated),1,4294967296,3,2025-01-03T23:11:33.176+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, correlation-id -> 3e3125ed-7319-4ded-b985-281489f88482, message-id -> EHExplorer-211dc059-4e81-4ecd-a580-f6ab5436d81f, content-type -> application/json)"
ew0KICAgICJ0ZW1wZXJhdHVyZSI6IDI1LA0KICAgICJodW1pZGl0eSI6IDUwLA0KICAgICJ3aW5kU3BlZWQiOiAxMCwNCiAgICAid2luZERpcmVjdGlvbiI6ICJTVyIsDQogICAgInByZWNpcGl0YXRpb24iOiAwLA0KICAgICI= (truncated),0,4294967296,3,2025-01-03T23:12:00.105+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, correlation-id -> 3d153e7e-6a95-4843-9d6f-327ae85dbac8, message-id -> EHExplorer-5a748037-81c7-4786-bb56-c3ccee3e810a, content-type -> application/json)"
ew0KICAgICJ0ZW1wZXJhdHVyZSI6IDI4LA0KICAgICJodW1pZGl0eSI6IDYwLA0KICAgICJ3aW5kU3BlZWQiOiAxMCwNCiAgICAid2luZERpcmVjdGlvbiI6ICJOVyIsDQogICAgInByZWNpcGl0YXRpb24iOiAwLA0KICAgICI= (truncated),0,4294967656,4,2025-01-03T23:13:23.434+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, correlation-id -> 67da3925-8e78-486b-8cff-cf2fea577935, message-id -> EHExplorer-fbfdb246-df21-43cc-ad91-cbbbe67991ae, content-type -> application/json)"


#### Silver Layer

Defining the schema for the JSON object.

In [0]:
# Defining the schema for the JSON object

json_schema = StructType([
    StructField("temperature", IntegerType()),
    StructField("humidity", IntegerType()),
    StructField("windSpeed", IntegerType()),
    StructField("windDirection", StringType()),
    StructField("precipitation", IntegerType()),
    StructField("conditions", StringType())
])

Reading, transforming and writing the stream from the bronze to the silver layer.

In [0]:
# Reading and Transforming: Load streaming data from the 'streaming.bronze.weather' Delta table, cast 'body' to string, parse JSON, and select specific fields
df = spark.readStream\
    .format("delta")\
    .table("streaming.bronze.weather")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"), json_schema))\
    .select("body.temperature", "body.humidity", "body.windSpeed", "body.windDirection", "body.precipitation", "body.conditions", col("enqueuedTime").alias('timestamp'))

# Displaying stream: Visualize the transformed data in the DataFrame for verification and analysis
df.display()

temperature,humidity,windSpeed,windDirection,precipitation,conditions,timestamp
20,60,10,NW,0,Partly Cloudy,2025-01-03T22:50:06.581+0000
22,60,10,NW,0,Partly Cloudy,2025-01-03T22:50:36.363+0000
32,40,20,SW,0,Partly Cloudy,2025-01-03T22:52:40.194+0000
20,60,10,NW,0,Partly Cloudy,2025-01-03T23:11:33.176+0000
25,50,10,SW,0,Partly Cloudy,2025-01-03T23:12:00.105+0000
28,60,10,NW,0,Sunny,2025-01-03T23:13:23.434+0000


In [0]:
# Writing stream: Save the transformed data to the 'streaming.silver.weather' Delta table in 'append' mode with checkpointing for data reliability
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/silver/weather")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.silver.weather")

Out[13]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f4eac8b7be0>

#### Gold Layer

Reading, aggregating and writing the stream from the silver to the gold layer.

In [0]:
# Aggregating Stream: Read from 'streaming.silver.weather', apply watermarking and windowing, and calculate average weather metrics
df = spark.readStream\
    .format("delta")\
    .table("streaming.silver.weather")\
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(window("timestamp", "5 minutes")) \
    .agg(avg("temperature").alias('temperature'), avg("humidity").alias('humidity'), avg("windSpeed").alias('windSpeed'), avg("precipitation").alias('precipitation'))\
	.select('window.start', 'window.end', 'temperature', 'humidity', 'windSpeed', 'precipitation')

# Displaying Aggregated Stream: Visualize aggregated data for insights into weather trends
df.display()

# Writing Aggregated Stream: Store the aggregated data in 'streaming.gold.weather_aggregated' with checkpointing for data integrity
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/weather_summary")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.gold.weather_summary")

start,end,temperature,humidity,windSpeed,precipitation
2025-01-03T22:50:00.000+0000,2025-01-03T22:55:00.000+0000,24.666666666666668,53.333333333333336,13.333333333333334,0.0
2025-01-03T23:10:00.000+0000,2025-01-03T23:15:00.000+0000,24.33333333333333,56.66666666666666,10.0,0.0
