# Real-time Data Processing with Azure Databricks


Importing the libraries.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

The code block below creates the catalog and schemas for our solution. 

The approach utilises a multi-hop data storage architecture (medallion), consisting of bronze, silver, and gold schemas within a 'streaming' catalog. 

In [0]:
try:
    spark.sql("create catalog streaming;")
except:
    print('check if catalog already exists')

try:
    spark.sql("create schema streaming.bronze;")
except:
    print('check if bronze schema already exists')

try:
    spark.sql("create schema streaming.silver")
except:
    print('check if silver schema already exists')

try:
    spark.sql("create schema streaming.gold;")
except:
    print('check if gold schema already exists')

check if catalog already exists
check if bronze schema already exists
check if silver schema already exists
check if gold schema already exists


#### Bronze Layer

Set up Azure Event hubs connection string By key vault.

In [0]:
%python
connectionString = dbutils.secrets.get("ConnectionScope", key="ConnectionStringEventHub")
eventHubName = "firsteventhub"

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': eventHubName
}

Reading and writing the stream to the bronze layer.

In [0]:
df = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load() \

df.display()

df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/bronze/weather")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.bronze.weather")

body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties
eyJ0ZW1wZXJhdHVyZSI6IDIwLjcsICJodW1pZGl0eSI6IDk1LCAid2luZFNwZWVkIjogMjIuMCwgIndpbmREaXJlY3Rpb24iOiAiTkUiLCAicHJlY2lwaXRhdGlvbiI6IDYuNSwgImNvbmRpdGlvbnMiOiAiUGFydGx5IENsb3U= (truncated),0,12884909920,808,2024-12-25T19:53:41.475Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJ0ZW1wZXJhdHVyZSI6IDMzLjQsICJodW1pZGl0eSI6IDI4LCAid2luZFNwZWVkIjogMTguNywgIndpbmREaXJlY3Rpb24iOiAiTkUiLCAicHJlY2lwaXRhdGlvbiI6IDQ0LjQsICJjb25kaXRpb25zIjogIk92ZXJjYXN0In0=,0,12884910120,809,2024-12-25T19:53:46.615Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJ0ZW1wZXJhdHVyZSI6IDcuOCwgImh1bWlkaXR5IjogMzgsICJ3aW5kU3BlZWQiOiAxOS4wLCAid2luZERpcmVjdGlvbiI6ICJOIiwgInByZWNpcGl0YXRpb24iOiAzLjIsICJjb25kaXRpb25zIjogIlJhaW4ifQ==,0,12884910320,810,2024-12-25T19:53:51.756Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJ0ZW1wZXJhdHVyZSI6IDAuOSwgImh1bWlkaXR5IjogNzcsICJ3aW5kU3BlZWQiOiAxOC43LCAid2luZERpcmVjdGlvbiI6ICJOIiwgInByZWNpcGl0YXRpb24iOiAyMi41LCAiY29uZGl0aW9ucyI6ICJQYXJ0bHkgQ2xvdWQ= (truncated),0,12884910512,811,2024-12-25T19:53:56.912Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJ0ZW1wZXJhdHVyZSI6IDE3LjgsICJodW1pZGl0eSI6IDYzLCAid2luZFNwZWVkIjogMjQuMSwgIndpbmREaXJlY3Rpb24iOiAiTlciLCAicHJlY2lwaXRhdGlvbiI6IDM1LjksICJjb25kaXRpb25zIjogIlBhcnRseSBDbG8= (truncated),0,12884910712,812,2024-12-25T19:54:02.069Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJ0ZW1wZXJhdHVyZSI6IDIyLjcsICJodW1pZGl0eSI6IDM4LCAid2luZFNwZWVkIjogNC42LCAid2luZERpcmVjdGlvbiI6ICJORSIsICJwcmVjaXBpdGF0aW9uIjogMzguNiwgImNvbmRpdGlvbnMiOiAiT3ZlcmNhc3QifQ==,0,12884910920,813,2024-12-25T19:54:07.209Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJ0ZW1wZXJhdHVyZSI6IDI5LjYsICJodW1pZGl0eSI6IDE1LCAid2luZFNwZWVkIjogMTYuOSwgIndpbmREaXJlY3Rpb24iOiAiTiIsICJwcmVjaXBpdGF0aW9uIjogNDIuOCwgImNvbmRpdGlvbnMiOiAiT3ZlcmNhc3QifQ==,0,12884911120,814,2024-12-25T19:54:12.35Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJ0ZW1wZXJhdHVyZSI6IDIwLjksICJodW1pZGl0eSI6IDk2LCAid2luZFNwZWVkIjogMjMuMSwgIndpbmREaXJlY3Rpb24iOiAiVyIsICJwcmVjaXBpdGF0aW9uIjogMjEuMSwgImNvbmRpdGlvbnMiOiAiT3ZlcmNhc3QifQ==,0,12884911320,815,2024-12-25T19:54:17.491Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJ0ZW1wZXJhdHVyZSI6IDkuNywgImh1bWlkaXR5IjogMzYsICJ3aW5kU3BlZWQiOiAyNy40LCAid2luZERpcmVjdGlvbiI6ICJXIiwgInByZWNpcGl0YXRpb24iOiA1MC4wLCAiY29uZGl0aW9ucyI6ICJSYWluIn0=,0,12884911520,816,2024-12-25T19:54:22.647Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJ0ZW1wZXJhdHVyZSI6IDE2LjAsICJodW1pZGl0eSI6IDYxLCAid2luZFNwZWVkIjogMTYuOCwgIndpbmREaXJlY3Rpb24iOiAiTlciLCAicHJlY2lwaXRhdGlvbiI6IDkuNywgImNvbmRpdGlvbnMiOiAiUmFpbiJ9,0,12884911712,817,2024-12-25T19:54:27.788Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)


#### Silver Layer

Defining the schema for the JSON object.

In [0]:
json_schema = StructType([
    StructField("temperature", IntegerType()),
    StructField("humidity", IntegerType()),
    StructField("windSpeed", IntegerType()),
    StructField("windDirection", StringType()),
    StructField("precipitation", IntegerType()),
    StructField("conditions", StringType())
])

Reading, transforming and writing the stream from the bronze to the silver layer.

In [0]:
df = spark.readStream\
    .format("delta")\
    .table("streaming.bronze.weather")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"), json_schema))\
    .select("body.temperature", "body.humidity", "body.windSpeed", "body.windDirection", "body.precipitation", "body.conditions", col("enqueuedTime").alias('timestamp'))

df.display()

df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/silver/weather")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.silver.weather")

temperature,humidity,windSpeed,windDirection,precipitation,conditions,timestamp
,47,,NW,,Partly Cloudy,2024-12-25T19:45:28.015Z
,92,,W,,Rain,2024-12-25T19:45:33.202Z
,11,,W,,Partly Cloudy,2024-12-25T19:45:38.359Z
,60,,NW,,Partly Cloudy,2024-12-25T19:45:43.515Z
,72,,NW,,Overcast,2024-12-25T19:45:48.671Z
,18,,SE,,Snow,2024-12-25T19:45:53.812Z
,65,,N,,Rain,2024-12-25T19:45:58.968Z
,51,,NE,,Rain,2024-12-25T19:46:04.124Z
,18,,E,,Snow,2024-12-25T19:46:09.265Z
,60,,S,,Partly Cloudy,2024-12-25T19:46:14.422Z


#### Gold Layer

Reading, aggregating and writing the stream from the silver to the gold layer.

In [0]:
df = spark.readStream\
    .format("delta")\
    .table("streaming.silver.weather")\
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(window("timestamp", "5 minutes")) \
    .agg(avg("temperature").alias('temperature'), avg("humidity").alias('humidity'), avg("windSpeed").alias('windSpeed'), avg("precipitation").alias('precipitation'))\
	.select('window.start', 'window.end', 'temperature', 'humidity', 'windSpeed', 'precipitation')

df.display()

df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/weather_summary")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.gold.weather_summary")

start,end,temperature,humidity,windSpeed,precipitation
2024-12-25T19:45:00Z,2024-12-25T19:50:00Z,,53.09756097560975,,
2024-12-25T19:50:00Z,2024-12-25T19:55:00Z,,53.3125,,
