# Structured Streaming in Synapse

## Configuration

In [1]:
storage_account = '<your-storage-account-name>'
event_hub = '<your-iot-hub-name>'
key_vault = '<your-key-vault-name>'

StatementMeta(spark002v3, 14, 1, Finished, Available)

In [2]:
# Setup access to storage account for temp data when pushing to Synapse
spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", mssparkutils.credentials.getSecret(key_vault,"adls-key"))

# Setup storage locations for all data
ROOT_PATH = f"abfss://iot@{storage_account}.dfs.core.windows.net/"
BRONZE_PATH = ROOT_PATH + "bronze/"
SILVER_PATH = ROOT_PATH + "silver/"
GOLD_PATH = ROOT_PATH + "gold/"
SYNAPSE_PATH = ROOT_PATH + "synapse/"
CHECKPOINT_PATH = ROOT_PATH + "checkpoints/"

# Other initializations
IOT_CS = mssparkutils.credentials.getSecret(key_vault,'hub-cs') # IoT Hub connection string (Event Hub Compatible)
ehConf = { 
  'eventhubs.connectionString':sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(IOT_CS),
  'ehName':event_hub
}

# Enable auto compaction and optimized writes in Delta
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled","true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled","true")

# Pyspark and ML Imports
import os, json, requests
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType

StatementMeta(spark002v3, 14, 2, Finished, Available)

In [3]:
%%sql
-- Clean up tables & views
DROP TABLE IF EXISTS turbine_raw;
DROP TABLE IF EXISTS weather_raw;
DROP TABLE IF EXISTS turbine_agg;
DROP TABLE IF EXISTS weather_agg;
DROP TABLE IF EXISTS turbine_enriched;

StatementMeta(, 14, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [4]:
# Make sure root path is empty
mssparkutils.fs.rm(ROOT_PATH, True)

StatementMeta(spark002v3, 14, 8, Finished, Available)

True

## Set up streams from IoT Hub 

### Bronze

In [5]:
# Schema of incoming data from IoT hub
schema = "timestamp timestamp, deviceId string, temperature double, humidity double, windspeed double, winddirection string, rpm double, angle double"

# Read directly from IoT Hub using the EventHubs library for Spark
iot_stream = (
	# Read from IoT Hubs directly
	spark.readStream.format("eventhubs")                                               
	# Use the Event-Hub-enabled connect string
    .options(**ehConf)                                                               
	# Load the data
    .load()
    # Extract the "body" payload from the messages
	.withColumn('reading', F.from_json(F.col('body').cast('string'), schema))        
	# Create a "date" field for partitioning
    .select('reading.*', F.to_date('reading.timestamp').alias('date'))               
)

# Split our IoT Hub stream into separate streams and write them both into their own Delta locations
write_turbine_to_delta = (
  iot_stream.filter('temperature is null')                                           # Filter out turbine telemetry from other data streams
    .select('date','timestamp','deviceId','rpm','angle')                             # Extract the fields of interest
    .writeStream.format('delta')                                                     # Write our stream to the Delta format
    .partitionBy('date')                                                             # Partition our data by Date for performance
    .option("checkpointLocation", CHECKPOINT_PATH + "turbine_raw")                   # Checkpoint so we can restart streams gracefully
    .start(BRONZE_PATH + "turbine_raw")                                              # Stream the data into an ADLS Path
)

write_weather_to_delta = (
  iot_stream.filter(iot_stream.temperature.isNotNull())                              # Filter out weather telemetry only
    .select('date','deviceid','timestamp','temperature','humidity','windspeed','winddirection') 
    .writeStream.format('delta')                                                     # Write our stream to the Delta format
    .partitionBy('date')                                                             # Partition our data by Date for performance
    .option("checkpointLocation", CHECKPOINT_PATH + "weather_raw")                   # Checkpoint so we can restart streams gracefully
    .start(BRONZE_PATH + "weather_raw")                                              # Stream the data into an ADLS Path
)

# Create the external tables once data starts to stream in
while True:
  try:
    spark.sql(f'CREATE TABLE IF NOT EXISTS turbine_raw USING DELTA LOCATION "{BRONZE_PATH + "turbine_raw"}"')
    spark.sql(f'CREATE TABLE IF NOT EXISTS weather_raw USING DELTA LOCATION "{BRONZE_PATH + "weather_raw"}"')
    break
  except:
    pass

StatementMeta(spark002v3, 14, 9, Finished, Available)

DataFrame[]

DataFrame[]

In [6]:
%%sql 
-- We can query the data directly from storage immediately as soon as it starts streams into Delta 
SELECT * FROM turbine_raw LIMIT 10

StatementMeta(spark002v3, 14, 10, Finished, Available)

<Spark SQL result set with 0 rows and 5 fields>

### Silver

In [7]:
# Create functions to merge turbine and weather data into their target Delta tables
def merge_delta(incremental, target): 
  
  incremental.dropDuplicates(['date','windowStart', 'windowEnd','deviceid']).createOrReplaceTempView("incremental")
  try:
    # MERGE records into the target table using the specified join key
    incremental._jdf.sparkSession().sql(f"""
      MERGE INTO delta.`{target}` t
      USING incremental i
      ON i.date=t.date AND i.windowStart = t.windowStart AND i.windowEnd = t.windowEnd AND i.deviceId = t.deviceid
      WHEN MATCHED THEN UPDATE SET *
      WHEN NOT MATCHED THEN INSERT *
    """)
  except:
    # If the †arget table does not exist, create one
    incremental.write.format("delta").partitionBy("date").save(target)
    

turbine_b_to_s = (
  spark.readStream.format('delta').table("turbine_raw")                        # Read data as a stream from our source Delta table
    .groupBy('deviceId','date',F.window('timestamp','5 minutes'))              # Aggregate readings to hourly intervals
    .agg(F.avg('rpm').alias('rpm'), F.avg("angle").alias("angle"))
    .withColumn("windowStart", F.col('window').start)
    .withColumn("windowEnd", F.col('window').end)
    .drop("window")
    .writeStream                                                               # Write the resulting stream
    .foreachBatch(lambda i, b: merge_delta(i, SILVER_PATH + "turbine_agg"))    # Pass each micro-batch to a function
    .outputMode("update")                                                      # Merge works with update mode
    .option("checkpointLocation", CHECKPOINT_PATH + "turbine_agg")             # Checkpoint so we can restart streams gracefully
    .start()
)

weather_b_to_s = (
  spark.readStream.format('delta').table("weather_raw")                        # Read data as a stream from our source Delta table
    .groupBy('deviceid','date',F.window('timestamp','5 minutes'))              # Aggregate readings to hourly intervals
    .agg({"temperature":"avg","humidity":"avg","windspeed":"avg","winddirection":"last"})
    .withColumn("windowStart", F.col('window').start)
    .withColumn("windowEnd", F.col('window').end)
    .selectExpr('date','windowStart','windowEnd','deviceid','`avg(temperature)` as temperature','`avg(humidity)` as humidity',
                '`avg(windspeed)` as windspeed','`last(winddirection)` as winddirection')
    .writeStream                                                               # Write the resulting stream
    .foreachBatch(lambda i, b: merge_delta(i, SILVER_PATH + "weather_agg"))    # Pass each micro-batch to a function
    .outputMode("update")                                                      # Merge works with update mode
    .option("checkpointLocation", CHECKPOINT_PATH + "weather_agg")             # Checkpoint so we can restart streams gracefully
    .start()
)

# Create the external tables once data starts to stream in
while True:
  try:
    spark.sql(f'CREATE TABLE IF NOT EXISTS turbine_agg USING DELTA LOCATION "{SILVER_PATH + "turbine_agg"}"')
    spark.sql(f'CREATE TABLE IF NOT EXISTS weather_agg USING DELTA LOCATION "{SILVER_PATH + "weather_agg"}"')
    break
  except:
    pass

StatementMeta(spark002v3, 14, 11, Finished, Available)

DataFrame[]

DataFrame[]

### Gold

In [8]:
# Read streams from Delta Silver tables and join them together on common columns (date & window)
turbine_agg = spark.readStream.format('delta').option("ignoreChanges", True).table('turbine_agg')
weather_agg = spark.readStream.format('delta').option("ignoreChanges", True).table('weather_agg').drop('deviceid')
turbine_enriched = turbine_agg.join(weather_agg, ['date','windowStart', 'windowEnd'])

# Write the stream to a foreachBatch function which performs the MERGE as before
merge_gold_stream = (
  turbine_enriched
    .selectExpr('date','deviceid','windowStart','windowEnd','rpm','angle','temperature','humidity','windspeed','winddirection')
    .writeStream 
    .foreachBatch(lambda i, b: merge_delta(i, GOLD_PATH + "turbine_enriched"))
    .option("checkpointLocation", CHECKPOINT_PATH + "turbine_enriched")         
    .start()
)

# Create the external tables once data starts to stream in
while True:
  try:
    spark.sql(f'CREATE TABLE IF NOT EXISTS turbine_enriched USING DELTA LOCATION "{GOLD_PATH + "turbine_enriched"}"')
    break
  except:
    pass

StatementMeta(spark002v3, 14, 12, Finished, Available)

DataFrame[]

In [11]:
%%sql 
-- We can query the data directly from storage immediately as soon as it starts streams into Delta 
SELECT * FROM turbine_enriched LIMIT 10

StatementMeta(spark002v3, 14, 15, Finished, Available)

<Spark SQL result set with 10 rows and 10 fields>