# Real time IoT Data Processing with Azure Databricks

- **Data Sources: Streaming data from IoT devices.**
- **Ingestion: Azure Event Hubs for capturing real-time data.**
- **Processing: Azure Databricks for stream processing using Structured Streaming.**
- **Storage: Processed data stored Azure Data Lake (Delta Format).**
- **Visualisation: Data visualized using Power BI.**


## Tools and Technologies Used
- Raspberry Pi Azure IoT Online Simulator (https://azure-samples.github.io/raspberry-pi-web-simulator/)
- Azure IoT Hub
- Azure Event Hub
- Azure Databricks (Unity Catalog enabled)
- Azure Data Lake Storage Gen2
- Power BI


## Azure Databricks Configuration

- **Single Node Compute Cluster**: 15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12)
- **Maven Library installed on Compute Cluster**: com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22

In [0]:
#Importing Libraries
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# Medallion data storage architecture consisting of bronze, silver and gold schemas within 'db_iot_data_2538456515664749' catalog
try:
    spark.sql("create schema db_iot_data_2538456515664749.bronze;")
except:
    print('check if bronze schema already exists')

try:
    spark.sql("create schema db_iot_data_2538456515664749.silver")
except:
    print('check if silver schema already exists')

try:
    spark.sql("create schema db_iot_data_2538456515664749.gold;")
except:
    print('check if gold schema already exists')

### Bronze Layer

In [0]:
# Set up Azure Event Hub connection config
connectionString = "add connection string here"

eventHubName = "eh-iot-data"

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': eventHubName
}

In [0]:
# Read and write data to the bronze schema
# Reading stream: Load data from Azure Event Hub into DataFrame
df = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load()

# Displaying stream: Show the incoming streaming data for visualization and debugging purposes
df.display()

# Writing stream: Persist the streaming data to a Delta table 'db_iot_data_2538456515664749.bronze.weather' in 'append' mode with checkpointing
df.writeStream\
    .option("checkpointLocation", "/mnt/db_iot_data_2538456515664749/bronze/weather")\
    .outputMode("append")\
    .format("delta")\
    .toTable("db_iot_data_2538456515664749.bronze.weather")

### Silver Layer

In [0]:
# Defining the schema for the JSON object
json_schema = StructType([
    StructField("temperature", IntegerType()),
    StructField("humidity", IntegerType()),
    StructField("windSpeed", IntegerType()),
    StructField("windDirection", StringType()),
    StructField("precipitation", IntegerType()),
    StructField("conditions", StringType())
])

In [0]:
# Reading and Transforming: Load streaming data from the 'db_iot_data_2538456515664749.bronze.weather' Delta table
df = spark.readStream\
    .format("delta")\
    .table("db_iot_data_2538456515664749.bronze.weather")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"), json_schema))\
    .select("body.temperature", "body.humidity", "body.windSpeed", "body.windDirection", "body.precipitation", "body.conditions", col("enqueuedTime").alias('timestamp'))

# Displaying stream: Visualize the transformed data in the DataFrame for verification and analysis
df.display()

# Writing stream: Save the transformed data to the 'db_iot_data_2538456515664749.silver.weather' Delta table in 'append' mode with checkpointing for data reliability
df.writeStream\
    .option("checkpointLocation", "/mnt/db_iot_data_2538456515664749/silver/weather")\
    .outputMode("append")\
    .format("delta")\
    .toTable("db_iot_data_2538456515664749.silver.weather")

### Gold Layer

In [0]:
# Aggregating Stream: Read from 'db_iot_data_2538456515664749.silver.weather', apply watermarking and windowing, and calculate average weather metrics
df = spark.readStream\
    .format("delta")\
    .table("db_iot_data_2538456515664749.silver.weather")\
    .withWatermark("timestamp", "1 minutes") \
    .groupBy(window("timestamp", "5 minutes")) \
    .agg(avg("temperature").alias('temperature'), avg("humidity").alias('humidity'), avg("windSpeed").alias('windSpeed'), avg("precipitation").alias('precipitation'))\
	.select('window.start', 'window.end', 'temperature', 'humidity', 'windSpeed', 'precipitation')

# Displaying Aggregated Stream: Visualize aggregated data for insights into weather trends
df.display()

# Writing Aggregated Stream: Store the aggregated data in 'db_iot_data_2538456515664749.gold.weather_aggregated' with checkpointing for data integrity
df.writeStream\
    .option("checkpointLocation", "/mnt/db_iot_data_2538456515664749/gold/weather_summary")\
    .outputMode("append")\
    .format("delta")\
    .toTable("db_iot_data_2538456515664749.gold.weather_summary")

In [0]:
%sql
select * from db_iot_data_2538456515664749.gold.weather_summary