# Real-time Data Processing with Azure Databricks (and Event Hubs)

This notebook demonstrates the below architecture to build real-time data pipelines.
![Solution Architecture](https://raw.githubusercontent.com/malvik01/Real-Time-Streaming-with-Azure-Databricks/main/Azure%20Solution%20Architecture.png)


- Data Sources: Streaming data from IoT devices or social media feeds. (Simulated in Event Hubs)
- Ingestion: Azure Event Hubs for capturing real-time data.
- Processing: Azure Databricks for stream processing using Structured Streaming.
- Storage: Processed data stored Azure Data Lake (Delta Format).
- Visualisation: Data visualized using Power BI.


### Azure Services Required
- Databricks Workspace (Unity Catalog enabled)
- Azure Data Lake Storage (Premium)
- Azure Event Hub (Basic Tier)

### Azure Databricks Configuration Required
- Single Node Compute Cluster: `12.2 LTS (includes Apache Spark 3.3.2, Scala 2.12)`
- Maven Library installed on Compute Cluster: `com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22`

Importing the libraries.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

The code block below creates the catalog and schemas for our solution. 

The approach utilises a multi-hop data storage architecture (medallion), consisting of bronze, silver, and gold schemas within a 'streaming' catalog. 

In [0]:
try:
    spark.sql("create catalog hive_metastore;")
except:
    print('check: catalog already exists')

try:
    spark.sql("create schema hive_metastore.traffic_flow_bronze;")
except:
    print('check: bronze schema already exists')

try:
    spark.sql("create schema hive_metastore.traffic_flow_silver")
except:
    print('check: silver schema already exists')

try:
    spark.sql("create schema hive_metastore.traffic_flow_gold;")
except:
    print('check: gold schema already exists')

check: catalog already exists
check: bronze schema already exists
check: silver schema already exists
check: gold schema already exists


#### Bronze Layer

Set up Azure Event hubs connection string.

In [0]:
# Config
# Replace with your Event Hub namespace, name, and key
connectionString = "Endpoint=sb://labb2-eh-namespace.servicebus.windows.net/;SharedAccessKeyName=databricks;SharedAccessKey=Hk4AJX4B4EFl6LaDPlzvg/VPP1DGon2oy+AEhMPZNso=;EntityPath=labb2-event-hub"
eventHubName = "labb2-event-hub"

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': eventHubName,
  'eventhubs.consumerGroup': '$Default'
}

Reading and writing the stream to the bronze layer.

In [0]:
# Reading stream: Load data from Azure Event Hub into DataFrame 'df' using the previously configured settings
df = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load() \

# Displaying stream: Show the incoming streaming data for visualization and debugging purposes
df.display()

# Writing stream: Persist the streaming data to a Delta table in 'append' mode with checkpointing
df.writeStream \
  .option("checkpointLocation", "/tmp/checkpoints/traffic_flow_bronze/traffic_flow") \
  .outputMode("append") \
  .format("delta") \
  .toTable("traffic_flow_bronze.traffic_flow")

body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties
eyJSRVNQT05TRSI6eyJSRVNVTFQiOlt7IlRyYWZmaWNGbG93IjpbeyJTaXRlSWQiOjQwLCJNZWFzdXJlbWVudFRpbWUiOiIyMDI1LTEwLTEwVDE3OjE1OjAwLjAwMCswMjowMCIsIk1lYXN1cmVtZW50T3JDYWxjdWxhdGlvblA= (truncated),0,979252953304,13457,2025-10-10T15:15:53.968Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJSRVNQT05TRSI6eyJSRVNVTFQiOlt7IlRyYWZmaWNGbG93IjpbeyJTaXRlSWQiOjQwLCJNZWFzdXJlbWVudFRpbWUiOiIyMDI1LTEwLTEwVDE3OjE1OjAwLjAwMCswMjowMCIsIk1lYXN1cmVtZW50T3JDYWxjdWxhdGlvblA= (truncated),0,979252957712,13458,2025-10-10T15:16:00.171Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJSRVNQT05TRSI6eyJSRVNVTFQiOlt7IlRyYWZmaWNGbG93IjpbeyJTaXRlSWQiOjQwLCJNZWFzdXJlbWVudFRpbWUiOiIyMDI1LTEwLTEwVDE3OjE2OjAwLjAwMCswMjowMCIsIk1lYXN1cmVtZW50T3JDYWxjdWxhdGlvblA= (truncated),0,979252962120,13459,2025-10-10T15:17:00.344Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJSRVNQT05TRSI6eyJSRVNVTFQiOlt7IlRyYWZmaWNGbG93IjpbeyJTaXRlSWQiOjQwLCJNZWFzdXJlbWVudFRpbWUiOiIyMDI1LTEwLTEwVDE3OjE3OjAwLjAwMCswMjowMCIsIk1lYXN1cmVtZW50T3JDYWxjdWxhdGlvblA= (truncated),0,979252966528,13460,2025-10-10T15:18:00.611Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJSRVNQT05TRSI6eyJSRVNVTFQiOlt7IlRyYWZmaWNGbG93IjpbeyJTaXRlSWQiOjQwLCJNZWFzdXJlbWVudFRpbWUiOiIyMDI1LTEwLTEwVDE3OjE4OjAwLjAwMCswMjowMCIsIk1lYXN1cmVtZW50T3JDYWxjdWxhdGlvblA= (truncated),0,979252970936,13461,2025-10-10T15:19:01.096Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJSRVNQT05TRSI6eyJSRVNVTFQiOlt7IlRyYWZmaWNGbG93IjpbeyJTaXRlSWQiOjQwLCJNZWFzdXJlbWVudFRpbWUiOiIyMDI1LTEwLTEwVDE3OjE5OjAwLjAwMCswMjowMCIsIk1lYXN1cmVtZW50T3JDYWxjdWxhdGlvblA= (truncated),0,983547510784,13462,2025-10-10T15:20:00.972Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)


In [0]:
display(df)


body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties
eyJSRVNQT05TRSI6eyJSRVNVTFQiOlt7IlRyYWZmaWNGbG93IjpbeyJTaXRlSWQiOjQwLCJNZWFzdXJlbWVudFRpbWUiOiIyMDI1LTEwLTEwVDE3OjIwOjAwLjAwMCswMjowMCIsIk1lYXN1cmVtZW50T3JDYWxjdWxhdGlvblA= (truncated),0,983547515192,13463,2025-10-10T15:21:00.769Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)


#### Silver Layer

Defining the schema for the JSON object.

In [0]:
# Defining the schema for the JSON object

json_schema = StructType([
    StructField("RESPONSE", StructType([
        StructField("RESULT", ArrayType(StructType([
            StructField("TrafficFlow", ArrayType(StructType([
                StructField("SiteId", IntegerType(), True),
                StructField("MeasurementTime", StringType(), True),
                StructField("MeasurementOrCalculationPeriod", IntegerType(), True),
                StructField("VehicleType", StringType(), True),
                StructField("VehicleFlowRate", IntegerType(), True),
                StructField("AverageVehicleSpeed", DoubleType(), True),
                StructField("CountyNo", IntegerType(), True),
                StructField("Deleted", BooleanType(), True),
                StructField("Geometry", StructType([
                    StructField("SWEREF99TM", StringType(), True),
                    StructField("WGS84", StringType(), True)
                ]), True),
                StructField("RegionId", IntegerType(), True),
                StructField("DataQuality", StringType(), True),
                StructField("SpecificLane", StringType(), True),
                StructField("MeasurementSide", StringType(), True),
                StructField("ModifiedTime", StringType(), True)
            ])), True)
        ])), True)
    ]), True)
])

Reading, transforming and writing the stream from the bronze to the silver layer.

In [0]:
# Reading and Transforming: Load streaming data from the 'traffic_flow_bronze.traffic_flow' Delta table, cast 'body' to string, parse JSON, and select specific fields
df = spark.readStream\
    .format("delta")\
    .table("hive_metastore.traffic_flow_bronze.traffic_flow")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"), json_schema))\
    .select(
        explode_outer(col("body.RESPONSE.RESULT")[0].TrafficFlow).alias("TrafficFlow"),
        col("enqueuedTime")
    ).select(
        col("TrafficFlow.SiteId"),
        col("TrafficFlow.MeasurementTime"),
        col("TrafficFlow.VehicleType"),
        col("TrafficFlow.VehicleFlowRate"),
        col("TrafficFlow.AverageVehicleSpeed"),
        col("TrafficFlow.CountyNo"),
        col("TrafficFlow.RegionId"),
        col("TrafficFlow.DataQuality"),
        col("TrafficFlow.SpecificLane"),
        col("TrafficFlow.MeasurementSide"),
        col("TrafficFlow.ModifiedTime"),
        col("TrafficFlow.Geometry.SWEREF99TM").alias("SWEREF99TM"),
        col("TrafficFlow.Geometry.WGS84").alias("WGS84"),
        col("enqueuedTime").alias("timestamp"))
    
# Displaying stream: Visualize the transformed data in the DataFrame for verification and analysis
df.display()

# Writing stream: Save the transformed data to the 'traffic_flow_silver.traffic_flow' Delta table in 'append' mode with checkpointing for data reliability
df.writeStream\
    .option("checkpointLocation", "/mnt/checkpoints/traffic_flow_silver/traffic_flow")\
    .outputMode("append")\
    .format("delta")\
    .toTable("hive_metastore.traffic_flow_silver.traffic_flow")

SiteId,MeasurementTime,VehicleType,VehicleFlowRate,AverageVehicleSpeed,CountyNo,RegionId,DataQuality,SpecificLane,MeasurementSide,ModifiedTime,SWEREF99TM,WGS84,timestamp
40,2025-10-10T15:52:00.000+02:00,anyVehicle,1440,72.21,1,4,good,lane1,unknown,2025-10-10T13:52:07.060Z,POINT (677754.96 6578623.19),POINT (18.122711 59.308964),2025-10-10T13:53:00.125Z
4306,2025-10-10T15:52:00.000+02:00,anyVehicle,660,68.01,1,4,good,lane2,unknown,2025-10-10T13:52:07.000Z,POINT (674788.02 6583302.84),POINT (18.074469 59.352165),2025-10-10T13:53:00.125Z
861,2025-10-10T15:52:00.000+02:00,anyVehicle,1380,72.86,1,4,good,lane2,unknown,2025-10-10T13:52:07.415Z,POINT (651018.96 6563771.14),POINT (17.64332 59.186134),2025-10-10T13:53:00.125Z
1471,2025-10-10T15:52:00.000+02:00,anyVehicle,1200,80.84,1,4,good,lane2,unknown,2025-10-10T13:52:07.863Z,POINT (652805.94 6565755.17),POINT (17.675951 59.203293),2025-10-10T13:53:00.125Z
174,2025-10-10T15:52:00.000+02:00,anyVehicle,660,74.42,1,4,good,lane1,unknown,2025-10-10T13:52:07.059Z,POINT (656154.96 6566753.11),POINT (17.735239 59.211025),2025-10-10T13:53:00.125Z
1183,2025-10-10T15:52:00.000+02:00,anyVehicle,240,59.75,1,4,good,lane1,unknown,2025-10-10T13:52:06.882Z,POINT (673561 6576993.92),POINT (18.047848 59.296097),2025-10-10T13:53:00.125Z
661,2025-10-10T15:52:00.000+02:00,anyVehicle,1620,37.89,1,4,good,lane3,unknown,2025-10-10T13:52:06.708Z,POINT (673560.02 6577001.1),POINT (18.047836 59.29616),2025-10-10T13:53:00.125Z
1155,2025-10-10T15:52:00.000+02:00,anyVehicle,1080,63.34,1,4,good,lane1,unknown,2025-10-10T13:52:07.187Z,POINT (674026.99 6576474.18),POINT (18.0556 59.291245),2025-10-10T13:53:00.125Z
797,2025-10-10T15:52:00.000+02:00,anyVehicle,1440,40.58,1,4,good,lane2,unknown,2025-10-10T13:52:07.187Z,POINT (673338.96 6577008.01),POINT (18.043966 59.296314),2025-10-10T13:53:00.125Z
614,2025-10-10T15:52:00.000+02:00,anyVehicle,1560,44.77,1,4,good,lane2,unknown,2025-10-10T13:52:07.748Z,POINT (672453 6577193.9),POINT (18.028582 59.298344),2025-10-10T13:53:00.125Z


#### Gold Layer

Reading, aggregating and writing the stream from the silver to the gold layer.

In [0]:
from pyspark.sql.functions import (
    window, avg, max, min, count, first, col, to_timestamp, expr
)

# Aggregating Stream: Read from 'traffic_flow_silver.traffic_flow', apply watermarking and windowing
df = (
    spark.readStream
    .format("delta")
    .table("hive_metastore.traffic_flow_silver.traffic_flow")
    .withColumn("MeasurementTime", to_timestamp("MeasurementTime"))
    .withWatermark("MeasurementTime", "1 minute")
    .groupBy(
        window("MeasurementTime", "1 minute"),
        col("SiteId")
    )
    .agg(
        avg("VehicleFlowRate").alias("avgFlowRate"),
        max("VehicleFlowRate").alias("maxFlowRate"),
        min("VehicleFlowRate").alias("minFlowRate"),
        avg("AverageVehicleSpeed").alias("avgSpeed"),
        max("AverageVehicleSpeed").alias("maxSpeed"),
        min("AverageVehicleSpeed").alias("minSpeed"),
        count("SiteId").alias("siteCount"),
        # Alias aggregated columns to avoid ambiguity
        first("SiteId").alias("SiteId_first"),
        first("RegionId").alias("RegionId"),
        first("CountyNo").alias("CountyNo"),
        first("SpecificLane").alias("Lane"),
        first("MeasurementSide").alias("Side"),
        first("VehicleType").alias("VehicleType"),
        first("DataQuality").alias("DataQuality"),
        first("WGS84").alias("WGS84")
    )
    .withColumn("speedPerFlowUnit", expr("avgSpeed / avgFlowRate"))
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("SiteId"),
        "avgFlowRate",
        "maxFlowRate",
        "minFlowRate",
        "avgSpeed",
        "maxSpeed",
        "minSpeed",
        "speedPerFlowUnit",
        "siteCount",
        "RegionId",
        "CountyNo",
        "Lane",
        "Side",
        "VehicleType",
        "DataQuality",
        "WGS84"
    )
)

# Displaying Stream: Visualize data for insights and aggregated trends
df.display()

# Writing Aggregated Stream: Store the summary data in 'traffic_flow_gold.traffic_flow_summary' with checkpointing for data integrity
df.writeStream \
    .option("checkpointLocation", "/mnt/checkpoints/traffic_flow_gold/traffic_flow_2") \
    .outputMode("append") \
    .format("delta") \
    .toTable("hive_metastore.traffic_flow_gold.traffic_flow_2")

window_start,window_end,SiteId,avgFlowRate,maxFlowRate,minFlowRate,avgSpeed,maxSpeed,minSpeed,speedPerFlowUnit,siteCount,RegionId,CountyNo,Lane,Side,VehicleType,DataQuality,WGS84
2025-10-10T13:53:00Z,2025-10-10T13:54:00Z,4306,660.0,660,660,67.12,67.12,67.12,0.1016969696969697,1,4,1,lane2,unknown,anyVehicle,good,POINT (18.074469 59.352165)
2025-10-10T14:03:00Z,2025-10-10T14:04:00Z,661,1500.0,1500,1500,43.16,43.16,43.16,0.0287733333333333,1,4,1,lane3,unknown,anyVehicle,good,POINT (18.047836 59.29616)
2025-10-10T14:04:00Z,2025-10-10T14:05:00Z,614,1020.0,1020,1020,33.76,33.76,33.76,0.0330980392156862,1,4,1,lane2,unknown,anyVehicle,good,POINT (18.028582 59.298344)
2025-10-10T14:07:00Z,2025-10-10T14:08:00Z,614,1200.0,1200,1200,33.75,33.75,33.75,0.028125,1,4,1,lane2,unknown,anyVehicle,good,POINT (18.028582 59.298344)
2025-10-10T14:32:00Z,2025-10-10T14:33:00Z,174,780.0,780,780,60.12,60.12,60.12,0.077076923076923,1,4,1,lane1,unknown,anyVehicle,good,POINT (17.735239 59.211025)
2025-10-10T14:45:00Z,2025-10-10T14:46:00Z,40,1620.0,1620,1620,66.0,66.0,66.0,0.0407407407407407,1,4,1,lane1,unknown,anyVehicle,good,POINT (18.122711 59.308964)
2025-10-10T14:53:00Z,2025-10-10T14:54:00Z,1183,240.0,240,240,61.5,61.5,61.5,0.25625,1,4,1,lane1,unknown,anyVehicle,good,POINT (18.047848 59.296097)
2025-10-10T15:05:00Z,2025-10-10T15:06:00Z,1471,1200.0,1200,1200,83.21,83.21,83.21,0.0693416666666666,1,4,1,lane2,unknown,anyVehicle,good,POINT (17.675951 59.203293)
2025-10-10T15:07:00Z,2025-10-10T15:08:00Z,391,840.0,840,840,90.44,90.44,90.44,0.1076666666666666,1,4,1,lane4,unknown,anyVehicle,good,POINT (18.002157 59.387173)
2025-10-10T15:07:00Z,2025-10-10T15:08:00Z,620,240.0,240,240,67.41,67.41,67.41,0.280875,1,4,1,lane1,unknown,anyVehicle,good,POINT (18.057972 59.294197)


In [0]:
%sql
select * from hive_metastore.traffic_flow_gold.traffic_flow_2

window_start,window_end,SiteId,avgFlowRate,maxFlowRate,minFlowRate,avgSpeed,maxSpeed,minSpeed,speedPerFlowUnit,siteCount,RegionId,CountyNo,Lane,Side,VehicleType,DataQuality,WGS84
2025-10-10T14:06:00Z,2025-10-10T14:07:00Z,4306,1200.0,1200,1200,59.65,59.65,59.65,0.0497083333333333,1,4,1,lane2,unknown,anyVehicle,good,POINT (18.074469 59.352165)
2025-10-10T14:07:00Z,2025-10-10T14:08:00Z,797,1020.0,1020,1020,61.0,61.0,61.0,0.0598039215686274,1,4,1,lane2,unknown,anyVehicle,good,POINT (18.043966 59.296314)
2025-10-10T14:11:00Z,2025-10-10T14:12:00Z,797,1500.0,1500,1500,47.44,47.44,47.44,0.0316266666666666,1,4,1,lane2,unknown,anyVehicle,good,POINT (18.043966 59.296314)
2025-10-10T14:13:00Z,2025-10-10T14:14:00Z,1471,1080.0,1080,1080,79.06,79.06,79.06,0.0732037037037037,1,4,1,lane2,unknown,anyVehicle,good,POINT (17.675951 59.203293)
2025-10-10T14:16:00Z,2025-10-10T14:17:00Z,797,960.0,960,960,46.63,46.63,46.63,0.0485729166666666,1,4,1,lane2,unknown,anyVehicle,good,POINT (18.043966 59.296314)
2025-10-10T14:17:00Z,2025-10-10T14:18:00Z,174,1380.0,1380,1380,27.44,27.44,27.44,0.0198840579710144,1,4,1,lane1,unknown,anyVehicle,good,POINT (17.735239 59.211025)
2025-10-10T14:24:00Z,2025-10-10T14:25:00Z,861,1620.0,1620,1620,77.0,77.0,77.0,0.0475308641975308,1,4,1,lane2,unknown,anyVehicle,good,POINT (17.64332 59.186134)
2025-10-10T14:32:00Z,2025-10-10T14:33:00Z,614,1380.0,1380,1380,37.26,37.26,37.26,0.027,1,4,1,lane2,unknown,anyVehicle,good,POINT (18.028582 59.298344)
2025-10-10T14:40:00Z,2025-10-10T14:41:00Z,1183,600.0,600,600,54.3,54.3,54.3,0.0905,1,4,1,lane1,unknown,anyVehicle,good,POINT (18.047848 59.296097)
2025-10-10T14:59:00Z,2025-10-10T15:00:00Z,797,960.0,960,960,57.13,57.13,57.13,0.0595104166666666,1,4,1,lane2,unknown,anyVehicle,good,POINT (18.043966 59.296314)
