# Real-time Data Processing with Azure Databricks (and Event Hubs)

This notebook demonstrates the below architecture to build real-time data pipelines.
![Solution Architecture](https://raw.githubusercontent.com/malvik01/Real-Time-Streaming-with-Azure-Databricks/main/Azure%20Solution%20Architecture.png)


- Data Sources: Streaming data from IoT devices or social media feeds. (Simulated in Event Hubs)
- Ingestion: Azure Event Hubs for capturing real-time data.
- Processing: Azure Databricks for stream processing using Structured Streaming.
- Storage: Processed data stored Azure Data Lake (Delta Format).
- Visualisation: Data visualized using Power BI.


### Azure Services Required
- Databricks Workspace (Unity Catalog enabled)
- Azure Data Lake Storage (Premium)
- Azure Event Hub (Basic Tier)

### Azure Databricks Configuration Required
- Single Node Compute Cluster: `12.2 LTS (includes Apache Spark 3.3.2, Scala 2.12)`
- Maven Library installed on Compute Cluster: `com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22`

Importing the libraries.

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

The code block below creates the catalog and schemas for our solution. 

The approach utilises a multi-hop data storage architecture (medallion), consisting of bronze, silver, and gold schemas within a 'streaming' catalog. 

In [None]:
try:
    spark.sql("CREATE CATALOG IF NOT EXISTS streaming")
    print("Catalog 'streaming' created successfully or already exists.")
except Exception as e:
    print(f"Error creating catalog: {e}")

try:
    spark.sql("CREATE SCHEMA IF NOT EXISTS streaming.bronze")
    print("Schema 'streaming.bronze' created successfully or already exists.")
except Exception as e:
    print(f"Error creating schema 'streaming.bronze': {e}")

try:
    spark.sql("CREATE SCHEMA IF NOT EXISTS streaming.silver")
    print("Schema 'streaming.silver' created successfully or already exists.")
except Exception as e:
    print(f"Error creating schema 'streaming.silver': {e}")

try:
    spark.sql("CREATE SCHEMA IF NOT EXISTS streaming.gold")
    print("Schema 'streaming.gold' created successfully or already exists.")
except Exception as e:
    print(f"Error creating schema 'streaming.gold': {e}")

Error creating catalog: [RequestId=3e6d84c2-77e5-4344-8d48-8ef4ebf81607 ErrorClass=INVALID_STATE] Metastore storage root URL does not exist. Please provide a storage location for the catalog (for example 'CREATE CATALOG myCatalog MANAGED LOCATION '<location-path>'). Alternatively set up a metastore root storage location to provide a storage location for all catalogs in the metastore.
Schema 'streaming.bronze' created successfully or already exists.
Schema 'streaming.silver' created successfully or already exists.
Schema 'streaming.gold' created successfully or already exists.


In [None]:
# List all catalogs to verify if the 'streaming' catalog was created
catalogs = spark.sql("SHOW CATALOGS")
display(catalogs)

catalog
adb_de_pipeline_demo
hive_metastore
samples
streaming
system


#### Bronze Layer

Set up Azure Event hubs connection string.

In [None]:
# Config
# Replace with your Event Hub namespace, name, and key
connectionString = "$myconnectionstring"
eventHubName = "eh-demo-01"

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': eventHubName
}

Reading and writing the stream to the bronze layer.

In [None]:
# Reading stream: Load data from Azure Event Hub into DataFrame 'df' using the previously configured settings
df = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load() \

# Displaying stream: Show the incoming streaming data for visualization and debugging purposes
df.display()


body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties


In [None]:
# Reading stream: Load data from Azure Event Hub into DataFrame 'df' using the previously configured settings
df = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load() \

# Displaying stream: Show the incoming streaming data for visualization and debugging purposes
df.display()

# Writing stream: Persist the streaming data to a Delta table 'streaming.bronze.weather' in 'append' mode with checkpointing
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/bronze/weather")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.bronze.weather")

body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties
ew0KICAgICJ0ZW1wZXJhdHVyZSI6IDIwLA0KICAgICJodW1pZGl0eSI6IDYwLA0KICAgICJ3aW5kU3BlZWQiOiAxMCwNCiAgICAid2luZERpcmVjdGlvbiI6ICJOVyIsDQogICAgInByZWNpcGl0YXRpb24iOiAwLA0KICAgICI= (truncated),1,0,0,2025-01-28T21:15:38.705+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, correlation-id -> dcfd1a80-099d-483d-a282-8bc4104d1e0f, message-id -> EHExplorer-85f50c36-bd4c-4655-9518-3a92d4e2105f, content-type -> application/json)"
ew0KICAgICJ0ZW1wZXJhdHVyZSI6IDIyLA0KICAgICJodW1pZGl0eSI6IDYwLA0KICAgICJ3aW5kU3BlZWQiOiAxMCwNCiAgICAid2luZERpcmVjdGlvbiI6ICJOVyIsDQogICAgInByZWNpcGl0YXRpb24iOiAwLA0KICAgICI= (truncated),1,376,1,2025-01-28T21:16:14.066+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, correlation-id -> e61ed747-bb22-421e-8100-771274186211, message-id -> EHExplorer-a51d5d8b-8c50-4b60-b3d9-7fa08ef85b77, content-type -> application/json)"
ew0KICAgICJ0ZW1wZXJhdHVyZSI6IDMyLA0KICAgICJodW1pZGl0eSI6IDY1LA0KICAgICJ3aW5kU3BlZWQiOiAxMCwNCiAgICAid2luZERpcmVjdGlvbiI6ICJTVyIsDQogICAgInByZWNpcGl0YXRpb24iOiAwLA0KICAgICI= (truncated),0,1096,3,2025-01-28T21:26:01.814+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, correlation-id -> aef19a37-afa3-41ee-a349-eee2c3d48756, message-id -> EHExplorer-f9893fbf-251c-401c-9ca3-b506f92c09f4, content-type -> application/json)"
ew0KICAgICJ0ZW1wZXJhdHVyZSI6IDE4LA0KICAgICJodW1pZGl0eSI6IDQwLA0KICAgICJ3aW5kU3BlZWQiOiAyNSwNCiAgICAid2luZERpcmVjdGlvbiI6ICJTIiwNCiAgICAicHJlY2lwaXRhdGlvbiI6IDAsDQogICAgImM= (truncated),1,752,2,2025-01-28T21:39:16.609+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, correlation-id -> 5da9eae5-f1f3-40e3-9211-cfb1fb4806b3, message-id -> EHExplorer-5703cd83-63cc-4582-a28b-923c18d05000, content-type -> application/json)"
ew0KICAgICJ0ZW1wZXJhdHVyZSI6IDI4LA0KICAgICJodW1pZGl0eSI6IDUwLA0KICAgICJ3aW5kU3BlZWQiOiAyMCwNCiAgICAid2luZERpcmVjdGlvbiI6ICJXIiwNCiAgICAicHJlY2lwaXRhdGlvbiI6IDAsDQogICAgImM= (truncated),1,1120,3,2025-01-28T21:47:43.988+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, correlation-id -> 07fd3b73-0e34-4ed5-bff1-06f2b4a42dfd, message-id -> EHExplorer-dca7f56e-e218-4471-8f2e-2dfe2b4d0d44, content-type -> application/json)"
ew0KICAgICJ0ZW1wZXJhdHVyZSI6IDEwLA0KICAgICJodW1pZGl0eSI6IDEwLA0KICAgICJ3aW5kU3BlZWQiOiA0MCwNCiAgICAid2luZERpcmVjdGlvbiI6ICJOIiwNCiAgICAicHJlY2lwaXRhdGlvbiI6IDAsDQogICAgImM= (truncated),0,1472,4,2025-01-28T21:51:40.714+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, correlation-id -> 45feea42-663c-4645-8b7d-2f920030ffeb, message-id -> EHExplorer-e64912aa-33d0-48d6-933c-6c61dfa79828, content-type -> application/json)"
ew0KICAgICJ0ZW1wZXJhdHVyZSI6IDE1LA0KICAgICJodW1pZGl0eSI6IDYwLA0KICAgICJ3aW5kU3BlZWQiOiAxMCwNCiAgICAid2luZERpcmVjdGlvbiI6ICJOVyIsDQogICAgInByZWNpcGl0YXRpb24iOiAxMCwNCiAgICA= (truncated),1,1488,4,2025-01-28T22:07:12.279+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, correlation-id -> 243e6a0d-7cbb-4c4e-a3ac-cf5724bf14df, message-id -> EHExplorer-8028d215-4f09-496d-814e-619cb7d26afe, content-type -> application/json)"


#### Silver Layer

Defining the schema for the JSON object.

In [None]:
# Defining the schema for the JSON object

json_schema = StructType([
    StructField("temperature", IntegerType()),
    StructField("humidity", IntegerType()),
    StructField("windSpeed", IntegerType()),
    StructField("windDirection", StringType()),
    StructField("precipitation", IntegerType()),
    StructField("conditions", StringType())
])

Reading, transforming and writing the stream from the bronze to the silver layer.

In [None]:
temp_df = spark.read.table("streaming.bronze.weather")\
            .withColumn("body", col("body").cast("string"))\
            .withColumn("body",from_json(col("body"), json_schema))\
            .select("body.temperature", "body.humidity", "body.windSpeed",  col("enqueuedTime").alias('timestamp'))
temp_df.display()

temperature,humidity,windSpeed,timestamp
22,60,10,2025-01-28T21:16:14.066+0000
20,60,10,2025-01-28T21:15:38.705+0000
32,65,10,2025-01-28T21:26:01.814+0000


In [None]:
# Reading and Transforming: Load streaming data from the 'streaming.bronze.weather' Delta table, cast 'body' to string, parse JSON, and select specific fields
df = spark.readStream\
    .format("delta")\
    .table("streaming.bronze.weather")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"), json_schema))\
    .select("body.temperature", "body.humidity", "body.windSpeed", "body.windDirection", "body.precipitation", "body.conditions", col("enqueuedTime").alias('timestamp'))

# Displaying stream: Visualize the transformed data in the DataFrame for verification and analysis
df.display()

# Writing stream: Save the transformed data to the 'streaming.silver.weather' Delta table in 'append' mode with checkpointing for data reliability
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/silver/weather")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.silver.weather")

temperature,humidity,windSpeed,windDirection,precipitation,conditions,timestamp
22,60,10,NW,0,Partly Cloudy,2025-01-28T21:16:14.066+0000
20,60,10,NW,0,Partly Cloudy,2025-01-28T21:15:38.705+0000
32,65,10,SW,0,Partly Cloudy,2025-01-28T21:26:01.814+0000
18,40,25,S,0,Cloudy,2025-01-28T21:39:16.609+0000
28,50,20,W,0,Sunny,2025-01-28T21:47:43.988+0000
10,10,40,N,0,Cloudy,2025-01-28T21:51:40.714+0000
15,60,10,NW,10,Rainy,2025-01-28T22:07:12.279+0000


#### Gold Layer

Reading, aggregating and writing the stream from the silver to the gold layer.

In [None]:
# Aggregating Stream: Read from 'streaming.silver.weather', apply watermarking and windowing, and calculate average weather metrics
df = spark.readStream\
    .format("delta")\
    .table("streaming.silver.weather")\
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(window("timestamp", "5 minutes")) \
    .agg(avg("temperature").alias('temperature'), avg("humidity").alias('humidity'), avg("windSpeed").alias('windSpeed'), avg("precipitation").alias('precipitation'))\
	.select('window.start', 'window.end', 'temperature', 'humidity', 'windSpeed', 'precipitation')

# Displaying Aggregated Stream: Visualize aggregated data for insights into weather trends
df.display()

start,end,temperature,humidity,windSpeed,precipitation
2025-01-28T21:15:00.000+0000,2025-01-28T21:20:00.000+0000,21.0,60.0,10.0,0.0
2025-01-28T21:45:00.000+0000,2025-01-28T21:50:00.000+0000,28.0,50.0,20.0,0.0
2025-01-28T21:50:00.000+0000,2025-01-28T21:55:00.000+0000,10.0,10.0,40.0,0.0
2025-01-28T21:25:00.000+0000,2025-01-28T21:30:00.000+0000,32.0,65.0,10.0,0.0
2025-01-28T21:35:00.000+0000,2025-01-28T21:40:00.000+0000,18.0,40.0,25.0,0.0


In [None]:
# Aggregating Stream: Read from 'streaming.silver.weather', apply watermarking and windowing, and calculate average weather metrics
df = spark.readStream\
    .format("delta")\
    .table("streaming.silver.weather")\
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(window("timestamp", "5 minutes")) \
    .agg(avg("temperature").alias('temperature'), avg("humidity").alias('humidity'), avg("windSpeed").alias('windSpeed'), avg("precipitation").alias('precipitation'))\
	.select('window.start', 'window.end', 'temperature', 'humidity', 'windSpeed', 'precipitation')

# Displaying Aggregated Stream: Visualize aggregated data for insights into weather trends
df.display()

# Writing Aggregated Stream: Store the aggregated data in 'streaming.gold.weather_aggregated' with checkpointing for data integrity
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/gold/weather_summary")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.gold.weather_summary")

start,end,temperature,humidity,windSpeed,precipitation
2025-01-28T21:15:00.000+0000,2025-01-28T21:20:00.000+0000,21.0,60.0,10.0,0.0
2025-01-28T21:45:00.000+0000,2025-01-28T21:50:00.000+0000,28.0,50.0,20.0,0.0
2025-01-28T22:05:00.000+0000,2025-01-28T22:10:00.000+0000,15.0,60.0,10.0,10.0
2025-01-28T21:50:00.000+0000,2025-01-28T21:55:00.000+0000,10.0,10.0,40.0,0.0
2025-01-28T21:25:00.000+0000,2025-01-28T21:30:00.000+0000,32.0,65.0,10.0,0.0
2025-01-28T21:35:00.000+0000,2025-01-28T21:40:00.000+0000,18.0,40.0,25.0,0.0
