Importing the libraries.

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

The code block below creates the catalog and schemas for our solution. 

The approach utilises a multi-hop data storage architecture (medallion), consisting of bronze, silver, and gold schemas within a 'streaming' catalog. 

In [None]:
try:
    spark.sql("create catalog streaming;")
except:
    print('check if catalog already exists')

try:
    spark.sql("create schema streaming.bronze;")
except:
    print('check if bronze schema already exists')

try:
    spark.sql("create schema streaming.silver")
except:
    print('check if silver schema already exists')

try:
    spark.sql("create schema streaming.gold;")
except:
    print('check if gold schema already exists')

#### Bronze Layer

Set up Azure Event hubs connection string.

In [None]:
# Config
# Replace with your Event Hub namespace, name, and key
connectionString = "key"
eventHubName = "eh-demo"

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': eventHubName
}

In [None]:
# Reading stream: Load data from Azure Event Hub into DataFrame 'df' using the previously configured settings
df = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load() \

# Displaying stream: Show the incoming streaming data for visualization and debugging purposes
df.printSchema()

# Writing stream: Persist the streaming data to a Delta table 'streaming.bronze.patient' in 'append' mode with checkpointing
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/bronze/patient")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.bronze.patient")

root
 |-- body: binary (nullable = true)
 |-- partition: string (nullable = true)
 |-- offset: string (nullable = true)
 |-- sequenceNumber: long (nullable = true)
 |-- enqueuedTime: timestamp (nullable = true)
 |-- publisher: string (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- systemProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

Out[4]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f8428882610>

Reading and writing the stream to the bronze layer.

In [None]:
%sql
select * from streaming.bronze.patient;


body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties


#### Silver Layer

Defining the schema for the JSON object.

In [None]:
# Defining the schema for the JSON object

json_schema = StructType([
    StructField("PatientID", IntegerType()),
    StructField("HeartRate", IntegerType()),
    StructField("Temperature", FloatType()),
    StructField("BloodPressure", StringType()),
    StructField("Event", StringType())
])

Reading, transforming and writing the stream from the bronze to the silver layer.

In [None]:
# Reading and Transforming
df = spark.readStream \
    .format("delta")\
    .table("streaming.bronze.patient")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"),json_schema))\
    .withColumn("raw_json", col("body"))\
    .select("body.PatientID","body.HeartRate", "body.Temperature", "body.BloodPressure", "body.Event", col("enqueuedTime").alias("timestampnew"))
df.display()
# Writing stream
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/silver/patient")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.silver.patient")

PatientID,Timestamp,HeartRate,Temperature,BloodPressure,Event,timestampnew
11.0,1649396215,80.0,36.6,120/75,normal,2024-04-28T21:53:40.495+0000
,1649396216,,,130/80,normal,2024-04-28T21:54:13.831+0000


#### Gold Layer

Reading, aggregating and writing the stream from the silver to the gold layer.

In [None]:
# Aggregating Stream: Read from 'streaming.silver.weather', apply watermarking and windowing, and calculate average weather metrics
df = spark.readStream\
    .format("delta")\
    .table("streaming.silver.patient")\
    .filter(col("Event") == "warning")\
	.select('PatientID', 'Event')

# Displaying Aggregated Stream: Visualize aggregated data for insights into weather trends
df.display()
print("Needs immediate attention")

# Writing Aggregated Stream: Store the aggregated data in 'streaming.gold.weather_aggregated' with checkpointing for data integrity
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/patient_summary")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.gold.patient_summary")

PatientID,Event
