# Real-time Data Processing with Azure Databricks (and Event Hubs)

This notebook demonstrates the below architecture to build real-time data pipelines.
![Solution Architecture](https://raw.githubusercontent.com/malvik01/Real-Time-Streaming-with-Azure-Databricks/main/Azure%20Solution%20Architecture.png)


- Data Sources: Streaming data from IoT devices or social media feeds. (Simulated in Event Hubs)
- Ingestion: Azure Event Hubs for capturing real-time data.
- Processing: Azure Databricks for stream processing using Structured Streaming.
- Storage: Processed data stored Azure Data Lake (Delta Format).
- Visualisation: Data visualized using Power BI.


### Azure Services Required
- Databricks Workspace (Unity Catalog enabled)
- Azure Data Lake Storage (Premium)
- Azure Event Hub (Basic Tier)

### Azure Databricks Configuration Required
- Single Node Compute Cluster: `12.2 LTS (includes Apache Spark 3.3.2, Scala 2.12)`
- Maven Library installed on Compute Cluster: `com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22`

Importing the libraries.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

The code block below creates the catalog and schemas for our solution. 

The approach utilises a multi-hop data storage architecture (medallion), consisting of bronze, silver, and gold schemas within a 'streaming' catalog. 

In [0]:
# Configurations for OAuth2 authentication
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": "your-client-id",  # Replace with your actual Client ID
    "fs.azure.account.oauth2.client.secret": "your-client-secret",  # Replace with your actual Client Secret
    "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/your-tenant-id/oauth2/token"  # Replace with your actual OAuth2 token endpoint
}

# Mount the storage container
dbutils.fs.mount(
    source = "abfss://databricks@damasmartstorage.dfs.core.windows.net/",
    mount_point = "/mnt/tokyomountM",
    extra_configs = configs
)


In [0]:
try:
    spark.sql("create catalog streaming;")
except:
    print('check if catalog already exists')

try:
    spark.sql("create schema streaming.bronze;")
except:
    print('check if bronze schema already exists')

try:
    spark.sql("create schema streaming.silver")
except:
    print('check if silver schema already exists')

try:
    spark.sql("create schema streaming.gold;")
except:
    print('check if gold schema already exists')

check if catalog already exists
check if bronze schema already exists
check if silver schema already exists
check if gold schema already exists


#### Bronze Layer

Set up Azure Event hubs connection string.

In [0]:
# Config
# Replace with your Event Hub namespace, name, and key
connectionString = "Endpoint=sb://eh-namespace-project.servicebus.windows.net/;SharedAccessKeyName=databricks;SharedAccessKey=T85OhbuXV15l/wjCin7vzwsVpfc+ul5io+AEhHnE0Q0=;EntityPath=eh-project"
eventHubName = "eh-project"

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': eventHubName
}

Reading and writing the stream to the bronze layer.

In [0]:

# Reading stream: Load data from Azure Event Hub into DataFrame 'df' using the previously configured settings
df = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load() \

# Displaying stream: Show the incoming streaming data for visualization and debugging purposes
df.display()

# Writing stream: Persist the streaming data to a Delta table 'streaming.bronze.weather' in 'append' mode with checkpointing
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/bronze/weather")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.bronze.weather")

body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties
ewogICJ0ZW1wZXJhdHVyZSI6IDEwLAogICJodW1pZGl0eSI6IDE1LAogICJ3aW5kU3BlZWQiOiAyMCwKICAid2luZERpcmVjdGlvbiI6ICJORSIsCiAgInByZWNpcGl0YXRpb24iOiAyLAogICJjb25kaXRpb25zIjogIlJhaW4= (truncated),0,4294970600,17,2024-08-19T05:56:14.577+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDE1LAogICJodW1pZGl0eSI6IDIwLAogICJ3aW5kU3BlZWQiOiAzMCwKICAid2luZERpcmVjdGlvbiI6ICJXRSIsCiAgInByZWNpcGl0YXRpb24iOiAyLAogICJjb25kaXRpb25zIjogIkNvbGQ= (truncated),0,4294970848,18,2024-08-19T05:56:37.343+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDIyLAogICJodW1pZGl0eSI6IDIwLAogICJ3aW5kU3BlZWQiOiAzMCwKICAid2luZERpcmVjdGlvbiI6ICJXRSIsCiAgInByZWNpcGl0YXRpb24iOiAyLAogICJjb25kaXRpb25zIjogIlN1bm4= (truncated),0,4294971096,19,2024-08-19T05:56:49.426+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDMzLAogICJodW1pZGl0eSI6IDMwLAogICJ3aW5kU3BlZWQiOiAzMCwKICAid2luZERpcmVjdGlvbiI6ICJORSIsCiAgInByZWNpcGl0YXRpb24iOiAyLAogICJjb25kaXRpb25zIjogIlBhcnQ= (truncated),0,4294971344,20,2024-08-19T05:59:01.318+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDcwLAogICJodW1pZGl0eSI6IDEwMCwKICAid2luZFNwZWVkIjogMTAsCiAgIndpbmREaXJlY3Rpb24iOiAiTlciLAogICJwcmVjaXBpdGF0aW9uIjogMCwKICAiY29uZGl0aW9ucyI6ICJWZXI= (truncated),0,4294971600,21,2024-08-19T06:16:29.775+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"


In [0]:
%sql
-- I could also View the stored data in bronze layer by using SQL

select * from streaming.bronze.weather;


body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties
ewogICJ0ZW1wZXJhdHVyZSI6IDIwLAogICJodW1pZGl0eSI6IDMwLAogICJ3aW5kU3BlZWQiOiAzMCwKICAid2luZERpcmVjdGlvbiI6ICJOVyIsCiAgInByZWNpcGl0YXRpb24iOiAwLAogICJjb25kaXRpb25zIjogIkhhcnM= (truncated),0,4294968064,7,2024-08-19T05:28:34.582+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDMwLAogICJodW1pZGl0eSI6IDQwLAogICJ3aW5kU3BlZWQiOiAzMCwKICAid2luZERpcmVjdGlvbiI6ICJOVyIsCiAgInByZWNpcGl0YXRpb24iOiAwLAogICJjb25kaXRpb25zIjogIkhhcnM= (truncated),0,4294968320,8,2024-08-19T05:28:44.113+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDIyLAogICJodW1pZGl0eSI6IDQwLAogICJ3aW5kU3BlZWQiOiA1MCwKICAid2luZERpcmVjdGlvbiI6ICJOVyIsCiAgInByZWNpcGl0YXRpb24iOiAwLAogICJjb25kaXRpb25zIjogIkhhcnM= (truncated),0,4294968576,9,2024-08-19T05:29:00.488+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDIzLAogICJodW1pZGl0eSI6IDMzLAogICJ3aW5kU3BlZWQiOiA1MCwKICAid2luZERpcmVjdGlvbiI6ICJOVyIsCiAgInByZWNpcGl0YXRpb24iOiAwLAogICJjb25kaXRpb25zIjogIkhhcnM= (truncated),0,4294969088,11,2024-08-19T05:30:51.365+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDE1LAogICJodW1pZGl0eSI6IDIxLAogICJ3aW5kU3BlZWQiOiA0MCwKICAid2luZERpcmVjdGlvbiI6ICJFVyIsCiAgInByZWNpcGl0YXRpb24iOiAyLAogICJjb25kaXRpb25zIjogIkNvbGQ= (truncated),0,4294969600,13,2024-08-19T05:45:32.550+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDIxLAogICJodW1pZGl0eSI6IDMyLAogICJ3aW5kU3BlZWQiOiA1MCwKICAid2luZERpcmVjdGlvbiI6ICJOVyIsCiAgInByZWNpcGl0YXRpb24iOiAwLAogICJjb25kaXRpb25zIjogIkhhcnM= (truncated),0,4294968832,10,2024-08-19T05:30:42.052+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDQwLAogICJodW1pZGl0eSI6IDQ5LAogICJ3aW5kU3BlZWQiOiAyMCwKICAid2luZERpcmVjdGlvbiI6ICJOVyIsCiAgInByZWNpcGl0YXRpb24iOiAwLAogICJjb25kaXRpb25zIjogIkhhcnM= (truncated),0,4294969344,12,2024-08-19T05:31:05.318+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDkwLAogICJodW1pZGl0eSI6IDgwLAogICJ3aW5kU3BlZWQiOiAzMCwKICAid2luZERpcmVjdGlvbiI6ICJOVyIsCiAgInByZWNpcGl0YXRpb24iOiAwLAogICJjb25kaXRpb25zIjogIkhhcnM= (truncated),0,4294967808,6,2024-08-19T05:20:21.449+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IC0wMywKICAiaHVtaWRpdHkiOiA0LAogICJ3aW5kU3BlZWQiOiAxMCwKICAid2luZERpcmVjdGlvbiI6ICJORSIsCiAgInByZWNpcGl0YXRpb24iOiAyLAogICJjb25kaXRpb25zIjogIlNub3c= (truncated),0,4294969856,14,2024-08-19T05:46:05.879+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDAsCiAgImh1bWlkaXR5IjogMTAsCiAgIndpbmRTcGVlZCI6IDEwLAogICJ3aW5kRGlyZWN0aW9uIjogIk5FIiwKICAicHJlY2lwaXRhdGlvbiI6IDIsCiAgImNvbmRpdGlvbnMiOiAiRHJpeno= (truncated),0,4294970104,15,2024-08-19T05:47:06.506+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"


#### Silver Layer

Defining the schema for the JSON object.

In [0]:
# Defining the schema for the JSON object

json_schema = StructType([
    StructField("temperature", IntegerType()),
    StructField("humidity", IntegerType()),
    StructField("windSpeed", IntegerType()),
    StructField("windDirection", StringType()),
    StructField("precipitation", IntegerType()),
    StructField("conditions", StringType())
])

In [0]:
# Before transforming data in bronze layer! Lets display the bronze layer data

temp = spark.read.table('streaming.bronze.weather')

temp.display()

body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties
ewogICJ0ZW1wZXJhdHVyZSI6IDIwLAogICJodW1pZGl0eSI6IDMwLAogICJ3aW5kU3BlZWQiOiAzMCwKICAid2luZERpcmVjdGlvbiI6ICJOVyIsCiAgInByZWNpcGl0YXRpb24iOiAwLAogICJjb25kaXRpb25zIjogIkhhcnM= (truncated),0,4294968064,7,2024-08-19T05:28:34.582+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDMwLAogICJodW1pZGl0eSI6IDQwLAogICJ3aW5kU3BlZWQiOiAzMCwKICAid2luZERpcmVjdGlvbiI6ICJOVyIsCiAgInByZWNpcGl0YXRpb24iOiAwLAogICJjb25kaXRpb25zIjogIkhhcnM= (truncated),0,4294968320,8,2024-08-19T05:28:44.113+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDIyLAogICJodW1pZGl0eSI6IDQwLAogICJ3aW5kU3BlZWQiOiA1MCwKICAid2luZERpcmVjdGlvbiI6ICJOVyIsCiAgInByZWNpcGl0YXRpb24iOiAwLAogICJjb25kaXRpb25zIjogIkhhcnM= (truncated),0,4294968576,9,2024-08-19T05:29:00.488+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDIzLAogICJodW1pZGl0eSI6IDMzLAogICJ3aW5kU3BlZWQiOiA1MCwKICAid2luZERpcmVjdGlvbiI6ICJOVyIsCiAgInByZWNpcGl0YXRpb24iOiAwLAogICJjb25kaXRpb25zIjogIkhhcnM= (truncated),0,4294969088,11,2024-08-19T05:30:51.365+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDE1LAogICJodW1pZGl0eSI6IDIxLAogICJ3aW5kU3BlZWQiOiA0MCwKICAid2luZERpcmVjdGlvbiI6ICJFVyIsCiAgInByZWNpcGl0YXRpb24iOiAyLAogICJjb25kaXRpb25zIjogIkNvbGQ= (truncated),0,4294969600,13,2024-08-19T05:45:32.550+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDIxLAogICJodW1pZGl0eSI6IDMyLAogICJ3aW5kU3BlZWQiOiA1MCwKICAid2luZERpcmVjdGlvbiI6ICJOVyIsCiAgInByZWNpcGl0YXRpb24iOiAwLAogICJjb25kaXRpb25zIjogIkhhcnM= (truncated),0,4294968832,10,2024-08-19T05:30:42.052+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDQwLAogICJodW1pZGl0eSI6IDQ5LAogICJ3aW5kU3BlZWQiOiAyMCwKICAid2luZERpcmVjdGlvbiI6ICJOVyIsCiAgInByZWNpcGl0YXRpb24iOiAwLAogICJjb25kaXRpb25zIjogIkhhcnM= (truncated),0,4294969344,12,2024-08-19T05:31:05.318+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDkwLAogICJodW1pZGl0eSI6IDgwLAogICJ3aW5kU3BlZWQiOiAzMCwKICAid2luZERpcmVjdGlvbiI6ICJOVyIsCiAgInByZWNpcGl0YXRpb24iOiAwLAogICJjb25kaXRpb25zIjogIkhhcnM= (truncated),0,4294967808,6,2024-08-19T05:20:21.449+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IC0wMywKICAiaHVtaWRpdHkiOiA0LAogICJ3aW5kU3BlZWQiOiAxMCwKICAid2luZERpcmVjdGlvbiI6ICJORSIsCiAgInByZWNpcGl0YXRpb24iOiAyLAogICJjb25kaXRpb25zIjogIlNub3c= (truncated),0,4294969856,14,2024-08-19T05:46:05.879+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"
ewogICJ0ZW1wZXJhdHVyZSI6IDAsCiAgImh1bWlkaXR5IjogMTAsCiAgIndpbmRTcGVlZCI6IDEwLAogICJ3aW5kRGlyZWN0aW9uIjogIk5FIiwKICAicHJlY2lwaXRhdGlvbiI6IDIsCiAgImNvbmRpdGlvbnMiOiAiRHJpeno= (truncated),0,4294970104,15,2024-08-19T05:47:06.506+0000,,,Map(),"Map(x-opt-sequence-number-epoch -> -1, content-type -> application/json)"


Reading, transforming and writing the stream from the bronze to the silver layer.

In [0]:
# Reading and Transforming: Load streaming data from the 'streaming.bronze.weather' Delta table, cast 'body' to string, parse JSON, and select specific fields
df = spark.readStream\
    .format("delta")\
    .table("streaming.bronze.weather")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"), json_schema))\
    .select("body.temperature", "body.humidity", "body.windSpeed", "body.windDirection", "body.precipitation", "body.conditions", col("enqueuedTime").alias('timestamp'))

# Displaying stream: Visualize the transformed data in the DataFrame for verification and analysis
df.display()

# Writing stream: Save the transformed data to the 'streaming.silver.weather' Delta table in 'append' mode with checkpointing for data reliability
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/silver/weather")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.silver.weather")

temperature,humidity,windSpeed,windDirection,precipitation,conditions,timestamp
20.0,30.0,30.0,NW,0.0,Harsh Climate,2024-08-19T05:28:34.582+0000
30.0,40.0,30.0,NW,0.0,Harsh Climate,2024-08-19T05:28:44.113+0000
22.0,40.0,50.0,NW,0.0,Harsh Climate,2024-08-19T05:29:00.488+0000
23.0,33.0,50.0,NW,0.0,Harsh Climate,2024-08-19T05:30:51.365+0000
15.0,21.0,40.0,EW,2.0,Cold Cliamte,2024-08-19T05:45:32.550+0000
21.0,32.0,50.0,NW,0.0,Harsh Climate,2024-08-19T05:30:42.052+0000
40.0,49.0,20.0,NW,0.0,Harsh Climate,2024-08-19T05:31:05.318+0000
90.0,80.0,30.0,NW,0.0,Harsh Climate,2024-08-19T05:20:21.449+0000
,,,,,,2024-08-19T05:46:05.879+0000
0.0,10.0,10.0,NE,2.0,Drizzling,2024-08-19T05:47:06.506+0000


#### Gold Layer

Reading, aggregating and writing the stream from the silver to the gold layer.

In [0]:
# Aggregating Stream: Read from 'streaming.silver.weather', apply watermarking and windowing, and calculate average weather metrics
df = spark.readStream\
    .format("delta")\
    .table("streaming.silver.weather")\
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(window("timestamp", "5 minutes")) \
    .agg(avg("temperature").alias('temperature'), avg("humidity").alias('humidity'), avg("windSpeed").alias('windSpeed'), avg("precipitation").alias('precipitation'))\
	.select('window.start', 'window.end', 'temperature', 'humidity', 'windSpeed', 'precipitation')

# Displaying Aggregated Stream: Visualize aggregated data for insights into weather trends
df.display()

# Writing Aggregated Stream: Store the aggregated data in 'streaming.gold.weather_aggregated' with checkpointing for data integrity
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/gold/weather_summary")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.gold.weather_summary")

start,end,temperature,humidity,windSpeed,precipitation
2024-08-19T05:30:00.000+0000,2024-08-19T05:35:00.000+0000,28.0,38.0,40.0,0.0
2024-08-19T05:20:00.000+0000,2024-08-19T05:25:00.000+0000,90.0,80.0,30.0,0.0
2024-08-19T05:45:00.000+0000,2024-08-19T05:50:00.000+0000,8.333333333333334,13.666666666666666,20.0,2.0
2024-08-19T06:15:00.000+0000,2024-08-19T06:20:00.000+0000,70.0,100.0,10.0,0.0
2024-08-19T05:55:00.000+0000,2024-08-19T06:00:00.000+0000,20.0,21.25,27.5,2.0
2024-08-19T05:25:00.000+0000,2024-08-19T05:30:00.000+0000,24.0,36.66666666666666,36.66666666666666,0.0


In [0]:
%sql

select * from streaming.gold.weather_summary;

start,end,temperature,humidity,windSpeed,precipitation
2024-08-19T05:20:00.000+0000,2024-08-19T05:25:00.000+0000,90.0,80.0,30.0,0.0
2024-08-19T05:20:00.000+0000,2024-08-19T05:25:00.000+0000,90.0,80.0,30.0,0.0
2024-08-19T05:30:00.000+0000,2024-08-19T05:35:00.000+0000,28.0,38.0,40.0,0.0
2024-08-19T05:45:00.000+0000,2024-08-19T05:50:00.000+0000,8.333333333333334,13.666666666666666,20.0,2.0
2024-08-19T05:25:00.000+0000,2024-08-19T05:30:00.000+0000,24.0,36.66666666666666,36.66666666666666,0.0
2024-08-19T05:30:00.000+0000,2024-08-19T05:35:00.000+0000,28.0,38.0,40.0,0.0
2024-08-19T05:25:00.000+0000,2024-08-19T05:30:00.000+0000,24.0,36.66666666666666,36.66666666666666,0.0
