Importing the libraries.

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

The code block below creates the catalog and schemas for our solution. 

The approach utilises a multi-hop data storage architecture (medallion), consisting of bronze, silver, and gold schemas within a 'streaming' catalog. 

In [None]:
try:
    spark.sql("create catalog streaming MANAGED LOCATION 'abfss://streamingcontainer@streamingstorageaccount.dfs.core.windows.net/';")
except:
    print('check if catalog already exists')

try:
    spark.sql("create schema streaming.bronze MANAGED LOCATION 'abfss://streamingcontainer@streamingstorageaccount.dfs.core.windows.net/';")
except:
    print('check if bronze schema already exists')

try:
    spark.sql("create schema streaming.silver MANAGED LOCATION 'abfss://streamingcontainer@streamingstorageaccount.dfs.core.windows.net/';")
except:
    print('check if silver schema already exists')

try:
    spark.sql("create schema streaming.gold MANAGED LOCATION 'abfss://streamingcontainer@streamingstorageaccount.dfs.core.windows.net/';")
except:
    print('check if gold schema already exists')

check if catalog already exists
check if bronze schema already exists
check if silver schema already exists
check if gold schema already exists


#### Bronze Layer

Set up Azure Event hubs connection string.

In [None]:
# Config
# Replace with your Event Hub namespace, name, and key
connectionString = dbutils.secrets.get('TestScope111','eventhubs-connectionstr')
eventHubName = "streamingeventhubs"


ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': eventHubName
}

Reading and writing the stream to the bronze layer.

In [None]:
checkpoint_path=f"/volumescheckpoint//Streaming/bronze/checkpointing/"



In [None]:
# Reading stream: Load data from Azure Event Hub into DataFrame 'df' using the previously configured settings
df = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load() \

# Displaying stream: Show the incoming streaming data for visualization and debugging purposes
df.display()

# Writing stream: Persist the streaming data to a Delta table 'streaming.bronze.weather' in 'append' mode with checkpointing
df.writeStream\
    .option("checkpointLocation", checkpoint_path)\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.bronze.weather")

body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties


#### Silver Layer

Defining the schema for the JSON object.

In [None]:
# Defining the schema for the JSON object

json_schema = StructType([StructField("temperature",StringType(),True),
                      StructField("time",StringType(),True),
                      StructField("skycondition", StringType(),True)])

Reading, transforming and writing the stream from the bronze to the silver layer.

In [None]:
checkpoint_path1=f"/volumescheckpoint//Streaming/silver/checkpointing/"

In [None]:
# Reading and Transforming: Load streaming data from the 'streaming.bronze.weather' Delta table, cast 'body' to string, parse JSON, and select specific fields
df = spark.readStream\
    .format("delta")\
    .table("streaming.bronze.weather")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"), json_schema))\
    .select("body.temperature","body.time","body.skycondition", col("enqueuedTime").alias('timestamp'))


# Displaying stream: Visualize the transformed data in the DataFrame for verification and analysis
df.display()

# Writing stream: Save the transformed data to the 'streaming.silver.weather' Delta table in 'append' mode with checkpointing for data reliability
df.writeStream\
    .option("checkpointLocation", checkpoint_path1)\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.silver.weather")

temperature,time,skycondition,timestamp


#### Gold Layer

Reading, aggregating and writing the stream from the silver to the gold layer.