
## Real-time Streaming Data leveraging Medallion Architeture


![Image](https://raw.githubusercontent.com/born2begrait1/Real-time-Streaming-Data-in-Azure-Databricks-Notebook/5c1242c17cce48880e7de78f9e17ae3f85458cf6/realtime%20in%20databricks.png)


### Setup and Configuration:
- Azure Subscription: Ensure you have an active Azure subscription.
- Azure Event Hub: Create an Event Hub namespace and an event hub for capturing real-time data.
- Azure Databricks: Set up an Azure Databricks workspace with Unity Catalog enabled.
- Azure Data Lake Storage: Configure Azure Data Lake Storage for storing processed data.

### Azure Databricks Cluster Required Configuration
- Single Node Compute Cluster: `12.2 LTS (includes Apache Spark 3.3.2, Scala 2.12)`
- Maven Library installed on Compute Cluster: `com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22`

### Development Environment:
- VS Code: Install Visual Studio Code and set up the necessary extensions for Python and Azure.
- Maven Libraries: Add the following Maven libraries to your project:
a. com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22
b.com.azure:azure-messaging-eventhubs:5.18.0
c.com.azure:azure-identity:1.11.2

### Implementation Steps:
- Create Event Hub Connection:
a. Obtain the connection string for your Event Hub namespace.
b. Configure the connection string and Event Hub name in your Databricks notebook.
- Ingest Data from Event Hub:
a. Use Spark Structured Streaming to read data from Azure Event Hub.
b. Define the schema for the incoming data.

### Process Data in Databricks:
- Perform necessary transformations and aggregations on the streaming data.
- Use the Medallion Architecture (Bronze, Silver, Gold layers) to organize and process data.

### Store Processed Data:
- Write the processed data to Azure Data Lake Storage in Delta format.
- Use Delta Lake for efficient storage and querying.

### Visualize Data:
-Visualize Data in Power BI using DirectQuery

##Importing required libraries.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import avg, count, round, col

##Azure Event Hubs Connection

In [0]:
# Azure Event Hubs Connection String, Event Hub Namespace, name, and key
CONNECTION_STR = "<Event Hub Primary Connection String>"
EVENTHUB_NAME = "EventHub Name"

connectionConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(CONNECTION_STR),
  'eventhubs.name': EVENTHUB_NAME
}

##Create Medallion Architecture Catalog, Schemas and Volumes

In [0]:
# Storage location for catalog
storage_location = '<storage-location-abfss-path>'

# Create Unity Catalog
spark.sql(f"CREATE CATALOG IF NOT EXISTS weather MANAGED LOCATION '{storage_location}'")

# Create bronze layer schema
spark.sql("CREATE SCHEMA IF NOT EXISTS weather.bronze")

# Create silver layer schema
spark.sql("CREATE SCHEMA IF NOT EXISTS weather.silver")

# Create gold layer schema
spark.sql("CREATE SCHEMA IF NOT EXISTS weather.gold")

# Create bronze layer volume for checkpoint
spark.sql("CREATE VOLUME weather.bronze.bronze_volume")

# Create silver layer volume for checkpoint
spark.sql("CREATE VOLUME weather.silver.silver_volume")

# Create gold layer volume for checkpoint
spark.sql("CREATE VOLUME weather.gold.gold_volume")

## Bronze Layer

In [0]:
# Reading stream to load data from Azure Event Hub into df Spark DataFrame
df = spark.readStream \
    .format("eventhubs") \
    .options(**connectionConf) \
    .load()

# Displaying stream 
df.display()


# Writing the streaming data to the Delta table in append mode 
df.writeStream\
    .option("checkpointLocation", "<volumme-path>")\
    .outputMode("append")\
    .format("delta")\
    .toTable("weather.bronze.bronze_data")

#### Silver Layer

In [0]:
# Define the JSON schema
json_schema = StructType([
    StructField("city", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("wind_speed", DoubleType(), True),
    StructField("pressure", DoubleType(), True),
    StructField("precipitation", DoubleType(), True),
    StructField("cloud_cover", DoubleType(), True),
    StructField("weather_condition", StringType(), True),
    StructField("timestamp", StringType(), True)
])

In [0]:
# Reading streaming data from the Delta table
df = spark.readStream\
    .format("delta")\
    .table("weather.bronze.bronze_data")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body", from_json(col("body"), json_schema))

# Casting all columns to string
df_string = df.select(
    col("body.city").cast("string").alias("city"),
    col("body.latitude").cast("string").alias("latitude"),
    col("body.longitude").cast("string").alias("longitude"),
    col("body.temperature").cast("string").alias("temperature"),
    col("body.humidity").cast("string").alias("humidity"),
    col("body.wind_speed").cast("string").alias("wind_speed"),
    col("body.pressure").cast("string").alias("pressure"),
    col("body.precipitation").cast("string").alias("precipitation"),
    col("body.cloud_cover").cast("string").alias("cloud_cover"),
    col("body.weather_condition").cast("string").alias("weather_condition"),
    col("body.timestamp").cast("string").alias("timestamp")
)

display(df_string)


# Writing the streaming data to the Delta table in append mode
df_string.writeStream\
    .option("checkpointLocation", "volume-path")\
    .outputMode("append")\
    .format("delta")\
    .toTable("weather.silver.silver_data")

##Gold Layer

In [0]:
# Reading streaming data from the Silver table
df_silver = spark.readStream \
    .format("delta") \
    .table("weather.silver.silver_data")

# Performing advanced aggregation and rounding to 2 decimal places
df_aggregated = df_silver.groupBy(col("city")).agg(
    round(avg(col("temperature")), 2).alias("avg_temperature"),
    round(avg(col("humidity")), 2).alias("avg_humidity"),
    round(avg(col("wind_speed")), 2).alias("avg_wind_speed"),
    count("*").alias("record_count")
)

display(df_aggregated)

# Writing the aggregated streaming data to the Delta table in append mode
df_aggregated.writeStream \
    .option("checkpointLocation", "volume-path") \
    .outputMode("complete") \
    .format("delta") \
    .toTable("weather.gold.gold_data")