# Real-time data streaming from Azure Event Hubs

### Create catalog

In [0]:
%sql
-- Create a managed catalog
create catalog if not exists streaming
managed location 'abfss://data@dbextstore.dfs.core.windows.net/catalogs';

### Create bronze, silver and gold schemas

In [0]:
%sql
create schema if not exists streaming.bronze;
create schema if not exists streaming.silver;
create schema if not exists streaming.gold;

### Create connection details

In [0]:
# Azure Event Hubs connection details
connectionString = "Endpoint=sb://event-hub-databricks.servicebus.windows.net/;SharedAccessKeyName=databricks-event-hubs-connector;SharedAccessKey=X6ArX9tlKsn8E9xg5xsKjGDgNwesqxZkz+AEhJJW4Kg=;EntityPath=weather-data-hub"
eventHubName = "weather-data-hub"

ehConf = {
    "eventhubs.connectionString": sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
    "eventhubs.eventHubName": eventHubName
}

### Test

In [0]:
df = spark.readStream.format("eventhubs").options(**ehConf).load()

In [0]:
df.display()

### Bronze layer

Read events raw as an eventhubs stream and write to a streaming table in the bronze schema

In [0]:
df = spark.readStream.format("eventhubs").options(**ehConf).load()

In [0]:
df.display()

In [0]:
df.writeStream.format("delta").option("checkpointLocation", "/mnt/streaming/bronze/weather").outputMode("append").toTable("streaming.bronze.weather")

In [0]:
%sql
-- View data
select * from streaming.bronze.weather;
-- Works

### Silver layer
Process data with some transformations (read JSON string into appropriate columns) and store in a streaming table in the silver schema

In [0]:
# Necessary imports to define JSON schema to decode JSON string
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

json_schema = StructType([
  StructField("temperature", IntegerType()),
  StructField("humidity", IntegerType()),
  StructField("windSpeed", IntegerType()),
  StructField("windDirection", StringType()),
  StructField("precipitation", IntegerType()),
  StructField("conditions", StringType()),
])

In [0]:
from pyspark.sql.functions import col, from_json

# Decode JSON string from binary, read the string as a JSON object and then select specific columns
df = spark.readStream\
    .format("delta")\
    .table("streaming.bronze.weather")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body", from_json(col("body"), json_schema))\
    .select("body.temperature", "body.humidity", "body.windSpeed", "body.windDirection", "body.precipitation", "body.conditions", col("enqueuedTime").alias("timestamp"))

In [0]:
df.display()

In [0]:
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/silver/weather").outputMode("append").format("delta").toTable("streaming.silver.weather")

In [0]:
%sql
-- View data
select * from streaming.silver.weather;
-- Works

### Gold layer

Perform aggregations on data and store in streaming table in gold schema

In [0]:
from pyspark.sql.functions import window, avg

df = spark.readStream\
    .format("delta")\
    .table("streaming.silver.weather")\
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(window("timestamp", "5 minutes")) \
    .agg(avg("temperature").alias('temperature'), avg("humidity").alias('humidity'), avg("windSpeed").alias('windSpeed'), avg("precipitation").alias('precipitation'))\
	.select('window.start', 'window.end', 'temperature', 'humidity', 'windSpeed', 'precipitation')

In [0]:
df.display()

In [0]:
df.writeStream\
    .option("checkpointLocation", "/mnt/streaming/gold/weather_avgs")\
    .outputMode("append")\
    .format("delta")\
    .toTable("streaming.gold.weather_avgs")

In [0]:
%sql
-- View data
select * from streaming.gold.weather_avgs;
-- Works