Importing the libraries.

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

The code block below creates the catalog and schemas for our solution. 

The approach utilises a multi-hop data storage architecture (medallion), consisting of bronze, silver, and gold schemas within a 'streaming' catalog. 

In [None]:
try:
    spark.sql("create catalog streaming MANAGED LOCATION 'abfss://streamingcontainer@streamingstorageaccount.dfs.core.windows.net/';")
except:
    print('check if catalog already exists')

try:
    spark.sql("create schema streaming.bronze MANAGED LOCATION 'abfss://streamingcontainer@streamingstorageaccount.dfs.core.windows.net/';")
except:
    print('check if bronze schema already exists')

try:
    spark.sql("create schema streaming.silver MANAGED LOCATION 'abfss://streamingcontainer@streamingstorageaccount.dfs.core.windows.net/';")
except:
    print('check if silver schema already exists')

try:
    spark.sql("create schema streaming.gold MANAGED LOCATION 'abfss://streamingcontainer@streamingstorageaccount.dfs.core.windows.net/';")
except:
    print('check if gold schema already exists')

check if catalog already exists
check if bronze schema already exists
check if silver schema already exists


#### Bronze Layer

Set up Azure Event hubs connection string.

In [None]:
# Config
# Replace with your Event Hub namespace, name, and key
connectionString = "***"
eventHubName = "***"

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': eventHubName
}

Reading and writing the stream to the bronze layer.

In [None]:
# Reading stream: Load data from Azure Event Hub into DataFrame 'df' using the previously configured settings
df = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load() \

# Displaying stream: Show the incoming streaming data for visualization and debugging purposes
df.display()

# Writing stream: Persist the streaming data to a Delta table 'streaming.bronze.weather' in 'append' mode with checkpointing
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/bronze/weather")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.bronze.weather")

body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties
eyJ0ZW1wZXJhdHVyZSI6ICIyNlx1MDBiMEMiLCAidGltZSI6ICJNb25kYXkgMTA6NTdcdTIwMmZwLm0uIiwgInNreWNvbmRpdGlvbiI6ICJDbGVhciJ9,0,152,1,2024-07-30T02:58:25.463Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJ0ZW1wZXJhdHVyZSI6ICIyNlx1MDBiMEMiLCAidGltZSI6ICJNb25kYXkgMTE6MDhcdTIwMmZwLm0uIiwgInNreWNvbmRpdGlvbiI6ICJDbGVhciJ9,0,304,2,2024-07-30T03:09:07.561Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJ0ZW1wZXJhdHVyZSI6ICIyNlx1MDBiMEMiLCAidGltZSI6ICJNb25kYXkgMTE6MDlcdTIwMmZwLm0uIiwgInNreWNvbmRpdGlvbiI6ICJDbGVhciJ9,0,456,3,2024-07-30T03:10:18.235Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJ0ZW1wZXJhdHVyZSI6ICIyNlx1MDBiMEMiLCAidGltZSI6ICJNb25kYXkgMTE6MTBcdTIwMmZwLm0uIiwgInNreWNvbmRpdGlvbiI6ICJDbGVhciJ9,0,608,4,2024-07-30T03:11:28.439Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJ0ZW1wZXJhdHVyZSI6ICIyNlx1MDBiMEMiLCAidGltZSI6ICJNb25kYXkgMTE6MTFcdTIwMmZwLm0uIiwgInNreWNvbmRpdGlvbiI6ICJDbGVhciJ9,0,760,5,2024-07-30T03:12:38.301Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJ0ZW1wZXJhdHVyZSI6ICIyNlx1MDBiMEMiLCAidGltZSI6ICJNb25kYXkgMTE6MTJcdTIwMmZwLm0uIiwgInNreWNvbmRpdGlvbiI6ICJDbGVhciJ9,0,912,6,2024-07-30T03:13:47.943Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)


#### Silver Layer

Defining the schema for the JSON object.

In [None]:
# Defining the schema for the JSON object

json_schema = StructType([StructField("temperature",StringType(),True),
                      StructField("time",StringType(),True),
                      StructField("skycondition", StringType(),True)])

Reading, transforming and writing the stream from the bronze to the silver layer.

In [None]:
# Reading and Transforming: Load streaming data from the 'streaming.bronze.weather' Delta table, cast 'body' to string, parse JSON, and select specific fields
df = spark.readStream\
    .format("delta")\
    .table("streaming.bronze.weather")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"), json_schema))\
    .select("body.temperature","body.time","body.skycondition", col("enqueuedTime").alias('timestamp'))


# Displaying stream: Visualize the transformed data in the DataFrame for verification and analysis
df.display()

# Writing stream: Save the transformed data to the 'streaming.silver.weather' Delta table in 'append' mode with checkpointing for data reliability
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/silver/weather")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.silver.weather")

temperature,time,skycondition,timestamp
26°C,Monday 10:57 p.m.,Clear,2024-07-30T02:58:25.463Z
26°C,Monday 11:08 p.m.,Clear,2024-07-30T03:09:07.561Z
26°C,Monday 11:09 p.m.,Clear,2024-07-30T03:10:18.235Z
26°C,Monday 11:10 p.m.,Clear,2024-07-30T03:11:28.439Z
26°C,Monday 11:11 p.m.,Clear,2024-07-30T03:12:38.301Z
26°C,Monday 11:12 p.m.,Clear,2024-07-30T03:13:47.943Z


#### Gold Layer

Reading, aggregating and writing the stream from the silver to the gold layer.

In [None]:
# Aggregating Stream: Read from 'streaming.silver.weather', apply watermarking and windowing, and calculate average weather metrics
df = spark.readStream\
    .format("delta")\
    .table("streaming.silver.weather")\
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(window("timestamp", "5 minutes")) \
    .agg(avg("temperature").alias('temperature'), avg("humidity").alias('humidity'), avg("windSpeed").alias('windSpeed'), avg("precipitation").alias('precipitation'))\
	.select('window.start', 'window.end', 'temperature', 'humidity', 'windSpeed', 'precipitation')

# Displaying Aggregated Stream: Visualize aggregated data for insights into weather trends
df.display()

# Writing Aggregated Stream: Store the aggregated data in 'streaming.gold.weather_aggregated' with checkpointing for data integrity
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/weather_summary")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.gold.weather_summary")