# End to End Industrial IoT (IIoT) on Azure Databricks
This notebook demonstrates the following architecture for IIoT Ingest, Processing and Analytics on Azure. The following architecture is implemented for the demo. 
<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/end_to_end_architecture.png" width=800>

The notebook is broken into sections following these steps:
1. **Data Ingest** - stream real-time raw sensor data from Azure IoT Hubs into the Delta format in Azure Storage
2. **Data Processing** - stream process sensor data from raw (Bronze) to silver (aggregated) to gold (enriched) Delta tables on Azure Storage
3. **Machine Learning** - train XGBoost regression models using distributed ML to predict power output and asset remaining life on historical sensor data
4. **Model Deployment** - deploy trained models for real-time serving in Azure ML services 
5. **Model Inference** - score real data instantly against hosted models via REST API

In [2]:
# AzureML Workspace info (name, region, resource group and subscription ID) for model deployment
dbutils.widgets.text("Subscription ID","3f2e4d32-8e8d-46d6-82bc-5bb8d962328b","Subscription ID")
dbutils.widgets.text("Resource Group","sgupta_rg","Resource Group")
dbutils.widgets.text("Region","eastus2","Region")
dbutils.widgets.text("Storage Account","sguptaadlsg2","Storage Account")

## Step 1 - Environment Setup

The pre-requisites are listed below:

### Azure Services Required
* Azure IoT Hub 
* [Azure IoT Simulator](https://azure-samples.github.io/raspberry-pi-web-simulator/) running with the code provided in [this github repo] and configured for your IoT Hub
* ADLS Gen 2 Storage account with a container called `iot`
* Azure Synapse SQL Pool call `iot`
* Azure Machine Learning Workspace called `iot`

### Azure Databricks Configuration Required
* 3-node (min) Databricks Cluster running **DBR 7.0ML+** and the following libraries:
 * **MLflow[AzureML]** - PyPI library `azureml-mlflow`
 * **Azure Event Hubs Connector for Databricks** - Maven coordinates `com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.16`
* The following Secrets defined in scope `iot`
 * `iothub-cs` - Connection string for your IoT Hub **(Important - use the [Event Hub Compatible](https://devblogs.microsoft.com/iotdev/understand-different-connection-strings-in-azure-iot-hub/) connection string)**
 * `adls_key` - Access Key to ADLS storage account **(Important - use the [Access Key](bricks.com/blog/2020/03/27/data-exfiltration-protection-with-azure-databricks.html))**
 * `synapse_cs` - JDBC connect string to your Synapse SQL Pool **(Important - use the [SQL Authentication](https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/azure/synapse-analytics#spark-driver-to-azure-synapse) with username/password connection string)**
* The following notebook widgets populated:
 * `Subscription ID` - subscription ID of your Azure ML Workspace
 * `Resource Group` - resource group name of your Azure ML Workspace
 * `Region` - Azure region of your Azure ML Workspace
 * `Storage Account` - Name of your storage account

In [4]:
# Setup access to storage account for temp data when pushing to Synapse
storage_account = dbutils.widgets.get("Storage Account")
spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", dbutils.secrets.get("iot","adls_key"))

# Setup storage locations for all data
ROOT_PATH = f"abfss://iot@{storage_account}.dfs.core.windows.net/"
BRONZE_PATH = ROOT_PATH + "bronze/"
SILVER_PATH = ROOT_PATH + "silver/"
GOLD_PATH = ROOT_PATH + "gold/"
SYNAPSE_PATH = ROOT_PATH + "synapse/"
CHECKPOINT_PATH = ROOT_PATH + "checkpoints/"

# Other initializations
IOT_CS = dbutils.secrets.get('iot','iothub-cs') # IoT Hub connection string
ehConf = { 'eventhubs.connectionString':sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(IOT_CS) }

# Enable auto compaction and optimized writes in Delta
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled","true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled","true")

# Pyspark and ML Imports
import os, json, requests
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
import numpy as np 
import pandas as pd
import xgboost as xgb
import mlflow.xgboost
import mlflow.azureml
from azureml.core import Workspace
from azureml.core.webservice import AciWebservice, Webservice

In [5]:
# Make sure root path is empty
dbutils.fs.rm(ROOT_PATH, True)

In [6]:
%sql
-- Clean up tables & views
DROP TABLE IF EXISTS turbine_raw;
DROP TABLE IF EXISTS weather_raw;
DROP TABLE IF EXISTS turbine_agg;
DROP TABLE IF EXISTS weather_agg;
DROP TABLE IF EXISTS turbine_enriched;
DROP TABLE IF EXISTS turbine_power;
DROP TABLE IF EXISTS turbine_maintenance;
DROP VIEW IF EXISTS turbine_combined;
DROP VIEW IF EXISTS feature_view;
DROP TABLE IF EXISTS turbine_life_predictions;
DROP TABLE IF EXISTS turbine_power_predictions;

## Step 2 - Data Ingest from IoT Hubs
Azure Databricks provides a native connector to IoT and Event Hubs. Below, we will use PySpark Structured Streaming to read from an IoT Hub stream of data and write the data in it's raw format directly into Delta. 

Make sure that your IoT Simulator is sending payloads to IoT Hub as shown below.

<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/iot_simulator.gif" width=800>

We have two separate types of data payloads in our IoT Hub:
1. **Turbine Sensor readings** - this payload contains `date`,`timestamp`,`deviceid`,`rpm` and `angle` fields
2. **Weather Sensor readings** - this payload contains `date`,`timestamp`,`temperature`,`humidity`,`windspeed`, and `winddirection` fields

We split out the two payloads into separate streams and write them both into Delta locations on Azure Storage. We are able to query these two Bronze tables *immediately* as the data streams in.

In [8]:
# Schema of incoming data from IoT hub
schema = "timestamp timestamp, deviceId string, temperature double, humidity double, windspeed double, winddirection string, rpm double, angle double"

# Read directly from IoT Hub using the EventHubs library for Databricks
iot_stream = (
  spark.readStream.format("eventhubs")                                               # Read from IoT Hubs directly
    .options(**ehConf)                                                               # Use the Event-Hub-enabled connect string
    .load()                                                                          # Load the data
    .withColumn('reading', F.from_json(F.col('body').cast('string'), schema))        # Extract the "body" payload from the messages
    .select('reading.*', F.to_date('reading.timestamp').alias('date'))               # Create a "date" field for partitioning
)

# Split our IoT Hub stream into separate streams and write them both into their own Delta locations
write_turbine_to_delta = (
  iot_stream.filter('temperature is null')                                           # Filter out turbine telemetry from other data streams
    .select('date','timestamp','deviceId','rpm','angle')                             # Extract the fields of interest
    .writeStream.format('delta')                                                     # Write our stream to the Delta format
    .partitionBy('date')                                                             # Partition our data by Date for performance
    .option("checkpointLocation", CHECKPOINT_PATH + "turbine_raw")                   # Checkpoint so we can restart streams gracefully
    .start(BRONZE_PATH + "turbine_raw")                                              # Stream the data into an ADLS Path
)

write_weather_to_delta = (
  iot_stream.filter(iot_stream.temperature.isNotNull())                              # Filter out weather telemetry only
    .select('date','deviceid','timestamp','temperature','humidity','windspeed','winddirection') 
    .writeStream.format('delta')                                                     # Write our stream to the Delta format
    .partitionBy('date')                                                             # Partition our data by Date for performance
    .option("checkpointLocation", CHECKPOINT_PATH + "weather_raw")                   # Checkpoint so we can restart streams gracefully
    .start(BRONZE_PATH + "weather_raw")                                              # Stream the data into an ADLS Path
)

# Create the external tables once data starts to stream in
while True:
  try:
    spark.sql(f'CREATE TABLE IF NOT EXISTS turbine_raw USING DELTA LOCATION "{BRONZE_PATH + "turbine_raw"}"')
    spark.sql(f'CREATE TABLE IF NOT EXISTS weather_raw USING DELTA LOCATION "{BRONZE_PATH + "weather_raw"}"')
    break
  except:
    pass

In [9]:
%sql 
-- We can query the data directly from storage immediately as it streams into Delta 
SELECT * FROM turbine_raw WHERE deviceid = 'WindTurbine-1'

date,timestamp,deviceId,rpm,angle
2020-06-29,2020-06-29T14:05:42.000+0000,WindTurbine-1,7.682084857957655,6.721824250712947
2020-06-29,2020-06-29T14:06:02.000+0000,WindTurbine-1,8.226280050385688,6.197995044087477
2020-06-29,2020-06-29T14:06:22.000+0000,WindTurbine-1,7.868842904253821,6.885237541222094


## Step 2 - Data Processing in Delta
While our raw sensor data is being streamed into Bronze Delta tables on Azure Storage, we can create streaming pipelines on this data that flow it through Silver and Gold data sets.

We will use the following schema for Silver and Gold data sets:

<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/iot_delta_bronze_to_gold.png" width=800>

### 2a. Delta Bronze (Raw) to Delta Silver (Aggregated)
The first step of our processing pipeline will clean and aggregate the measurements to 1 hour intervals. 

Since we are aggregating time-series values and there is a likelihood of late-arriving data and data changes, we will use the [**MERGE**](https://docs.microsoft.com/en-us/azure/databricks/spark/latest/spark-sql/language-manual/merge-into?toc=https%3A%2F%2Fdocs.microsoft.com%2Fen-us%2Fazure%2Fazure-databricks%2Ftoc.json&bc=https%3A%2F%2Fdocs.microsoft.com%2Fen-us%2Fazure%2Fbread%2Ftoc.json) functionality of Delta to upsert records into target tables. 

MERGE allows us to upsert source records into a target storage location. This is useful when dealing with time-series data as:
1. Data often arrives late and requires aggregation states to be updated
2. Historical data needs to be backfilled while streaming data is feeding into the table

When streaming source data, `foreachBatch()` can be used to perform a merges on micro-batches of data.

In [12]:
# Create functions to merge turbine and weather data into their target Delta tables
def merge_delta(incremental, target): 
  incremental.dropDuplicates(['date','window','deviceid']).createOrReplaceTempView("incremental")
  
  try:
    # MERGE records into the target table using the specified join key
    incremental._jdf.sparkSession().sql(f"""
      MERGE INTO delta.`{target}` t
      USING incremental i
      ON i.date=t.date AND i.window = t.window AND i.deviceId = t.deviceid
      WHEN MATCHED THEN UPDATE SET *
      WHEN NOT MATCHED THEN INSERT *
    """)
  except:
    # If the †arget table does not exist, create one
    incremental.write.format("delta").partitionBy("date").save(target)
    
turbine_b_to_s = (
  spark.readStream.format('delta').table("turbine_raw")                        # Read data as a stream from our source Delta table
    .groupBy('deviceId','date',F.window('timestamp','5 minutes'))              # Aggregate readings to hourly intervals
    .agg(F.avg('rpm').alias('rpm'), F.avg("angle").alias("angle"))
    .writeStream                                                               # Write the resulting stream
    .foreachBatch(lambda i, b: merge_delta(i, SILVER_PATH + "turbine_agg"))    # Pass each micro-batch to a function
    .outputMode("update")                                                      # Merge works with update mode
    .option("checkpointLocation", CHECKPOINT_PATH + "turbine_agg")             # Checkpoint so we can restart streams gracefully
    .start()
)

weather_b_to_s = (
  spark.readStream.format('delta').table("weather_raw")                        # Read data as a stream from our source Delta table
    .groupBy('deviceid','date',F.window('timestamp','5 minutes'))              # Aggregate readings to hourly intervals
    .agg({"temperature":"avg","humidity":"avg","windspeed":"avg","winddirection":"last"})
    .selectExpr('date','window','deviceid','`avg(temperature)` as temperature','`avg(humidity)` as humidity',
                '`avg(windspeed)` as windspeed','`last(winddirection)` as winddirection')
    .writeStream                                                               # Write the resulting stream
    .foreachBatch(lambda i, b: merge_delta(i, SILVER_PATH + "weather_agg"))    # Pass each micro-batch to a function
    .outputMode("update")                                                      # Merge works with update mode
    .option("checkpointLocation", CHECKPOINT_PATH + "weather_agg")             # Checkpoint so we can restart streams gracefully
    .start()
)

# Create the external tables once data starts to stream in
while True:
  try:
    spark.sql(f'CREATE TABLE IF NOT EXISTS turbine_agg USING DELTA LOCATION "{SILVER_PATH + "turbine_agg"}"')
    spark.sql(f'CREATE TABLE IF NOT EXISTS weather_agg USING DELTA LOCATION "{SILVER_PATH + "weather_agg"}"')
    break
  except:
    pass

In [13]:
%sql
-- As data gets merged in real-time to our hourly table, we can query it immediately
SELECT * FROM turbine_agg t JOIN weather_agg w ON (t.date=w.date AND t.window=w.window) WHERE t.deviceid='WindTurbine-1' ORDER BY t.window DESC

deviceId,date,window,rpm,angle,date.1,window.1,deviceid,temperature,humidity,windspeed,winddirection
WindTurbine-1,2020-06-29,"List(2020-06-29T14:30:00.000+0000, 2020-06-29T14:35:00.000+0000)",8.220723769563222,8.193133298367822,2020-06-29,"List(2020-06-29T14:30:00.000+0000, 2020-06-29T14:35:00.000+0000)",WeatherCapture,26.077478912739,69.42049020271926,6.942049020271925,SW
WindTurbine-1,2020-06-29,"List(2020-06-29T14:25:00.000+0000, 2020-06-29T14:30:00.000+0000)",7.839844911378133,6.926530964122536,2020-06-29,"List(2020-06-29T14:25:00.000+0000, 2020-06-29T14:30:00.000+0000)",WeatherCapture,26.322607823416583,70.01732251583414,7.001732251583413,SW
WindTurbine-1,2020-06-29,"List(2020-06-29T14:20:00.000+0000, 2020-06-29T14:25:00.000+0000)",7.887520014905126,7.151580013041986,2020-06-29,"List(2020-06-29T14:20:00.000+0000, 2020-06-29T14:25:00.000+0000)",WeatherCapture,26.3941664032214,70.43385077859313,7.043385077859312,W
WindTurbine-1,2020-06-29,"List(2020-06-29T14:15:00.000+0000, 2020-06-29T14:20:00.000+0000)",8.142472584324846,6.924663511284241,2020-06-29,"List(2020-06-29T14:15:00.000+0000, 2020-06-29T14:20:00.000+0000)",WeatherCapture,26.30710090869518,70.862603087636,7.0862603087636025,NW
WindTurbine-1,2020-06-29,"List(2020-06-29T14:10:00.000+0000, 2020-06-29T14:15:00.000+0000)",7.794775063111931,7.353761513556274,2020-06-29,"List(2020-06-29T14:10:00.000+0000, 2020-06-29T14:15:00.000+0000)",WeatherCapture,25.71527190652245,70.38294876472042,7.038294876472042,SW
WindTurbine-1,2020-06-29,"List(2020-06-29T14:05:00.000+0000, 2020-06-29T14:10:00.000+0000)",7.984654461209084,6.755803422788716,2020-06-29,"List(2020-06-29T14:05:00.000+0000, 2020-06-29T14:10:00.000+0000)",WeatherCapture,26.38987832627805,68.16561638410347,6.816561638410348,NW


### 2b. Delta Silver (Aggregated) to Delta Gold (Enriched)
Next we perform a streaming join of weather and turbine readings to create one enriched dataset we can use for data science and model training.

In [15]:
dbutils.fs.rm(GOLD_PATH, True)
dbutils.fs.rm(CHECKPOINT_PATH + "turbine_enriched", True)

In [16]:
# Read streams from Delta Silver tables and join them together on common columns (date & window)
turbine_agg = spark.readStream.format('delta').option("ignoreChanges", True).table('turbine_agg')
weather_agg = spark.readStream.format('delta').option("ignoreChanges", True).table('weather_agg').drop('deviceid')
turbine_enriched = turbine_agg.join(weather_agg, ['date','window'])

# Write the stream to a foreachBatch function which performs the MERGE as before
merge_gold_stream = (
  turbine_enriched
    .selectExpr('date','deviceid','window.start as window','rpm','angle','temperature','humidity','windspeed','winddirection')
    .writeStream 
    .foreachBatch(lambda i, b: merge_delta(i, GOLD_PATH + "turbine_enriched"))
    .option("checkpointLocation", CHECKPOINT_PATH + "turbine_enriched")         
    .start()
)

# Create the external tables once data starts to stream in
while True:
  try:
    spark.sql(f'CREATE TABLE IF NOT EXISTS turbine_enriched USING DELTA LOCATION "{GOLD_PATH + "turbine_enriched"}"')
    break
  except:
    pass

In [17]:
%sql SELECT * FROM turbine_enriched WHERE deviceid='WindTurbine-1'

date,deviceid,window,rpm,angle,temperature,humidity,windspeed,winddirection
2020-06-29,WindTurbine-1,2020-06-29T14:30:00.000+0000,8.220723769563222,8.193133298367822,26.077478912739,69.42049020271926,6.942049020271925,SW
2020-06-29,WindTurbine-1,2020-06-29T14:05:00.000+0000,7.984654461209084,6.755803422788716,26.38987832627805,68.16561638410347,6.816561638410348,NW
2020-06-29,WindTurbine-1,2020-06-29T14:10:00.000+0000,7.794775063111931,7.353761513556274,25.71527190652245,70.38294876472042,7.038294876472042,SW
2020-06-29,WindTurbine-1,2020-06-29T14:25:00.000+0000,7.839844911378133,6.926530964122536,26.322607823416583,70.01732251583414,7.001732251583413,SW
2020-06-29,WindTurbine-1,2020-06-29T14:15:00.000+0000,8.142472584324846,6.924663511284241,26.30710090869518,70.862603087636,7.0862603087636025,NW
2020-06-29,WindTurbine-1,2020-06-29T14:20:00.000+0000,7.887520014905126,7.151580013041986,26.3941664032214,70.43385077859313,7.043385077859312,W


### 2c: Stream Delta GOLD Table to Synapse
Synapse Analytics provides on-demand SQL directly on Data Lake source formats. Databricks can also directly stream data to Synapse SQL Pools for Data Warehousing workloads like BI dashboarding and reporting. 

<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/synapse_databricks_delta.png" width=800>

In [19]:
spark.conf.set("spark.databricks.sqldw.writeSemantics", "copy")                           # Use COPY INTO for faster loads to Synapse from Databricks

write_to_synapse = (
  spark.readStream.format('delta').option('ignoreChanges',True).table('turbine_enriched') # Read in Gold turbine readings from Delta as a stream
    .writeStream.format("com.databricks.spark.sqldw")                                     # Write to Synapse (SQL DW connector)
    .option("url",dbutils.secrets.get("iot","synapse_cs"))                                # SQL Pool JDBC connection (with SQL Auth) string
    .option("tempDir", SYNAPSE_PATH)                                                      # Temporary ADLS path to stage the data (with forwarded permissions)
    .option("forwardSparkAzureStorageCredentials", "true")
    .option("dbTable", "turbine_enriched")                                                # Table in Synapse to write to
    .option("checkpointLocation", CHECKPOINT_PATH+"synapse")                              # Checkpoint for resilient streaming
    .start()
)

### 2d. Backfill Historical Data
In order to train a model, we will need to backfill our streaming data with historical data. The cell below generates 1 year of historical hourly turbine and weather data and inserts it into our Gold Delta table.

In [21]:
# Function to simulate generating time-series data given a baseline, slope, and some seasonality
def generate_series(time_index, baseline, slope=0.01, period=365*24*12):
  rnd = np.random.RandomState(time_index)
  season_time = (time_index % period) / period
  seasonal_pattern = np.where(season_time < 0.4, np.cos(season_time * 2 * np.pi), 1 / np.exp(3 * season_time))
  return baseline * (1 + 0.1 * seasonal_pattern + 0.1 * rnd.randn(len(time_index)))
  
# Get start and end dates for our historical data
dates = spark.sql('select max(date)-interval 365 days as start, max(date) as end from turbine_enriched').toPandas()
  
# Get the baseline readings for each sensor for backfilling data
turbine_enriched_pd = spark.table('turbine_enriched').toPandas()
baselines = turbine_enriched_pd.min()[3:8]
devices = turbine_enriched_pd['deviceid'].unique()

# Iterate through each device to generate historical data for that device
print("---Generating Historical Enriched Turbine Readings---")
for deviceid in devices:
  print(f'Backfilling device {deviceid}')
  windows = pd.date_range(start=dates['start'][0], end=dates['end'][0], freq='5T') # Generate a list of hourly timestamps from start to end date
  historical_values = pd.DataFrame({
    'date': windows.date,
    'window': windows, 
    'winddirection': np.random.choice(['N','NW','W','SW','S','SE','E','NE'], size=len(windows)),
    'deviceId': deviceid
  })
  time_index = historical_values.index.to_numpy()                                 # Generate a time index

  for sensor in baselines.keys():
    historical_values[sensor] = generate_series(time_index, baselines[sensor])    # Generate time-series data from this sensor

  # Write dataframe to enriched_readings Delta table
  spark.createDataFrame(historical_values).write.format("delta").mode("append").saveAsTable("turbine_enriched")
  
# Create power readings based on weather and operating conditions
print("---Generating Historical Turbine Power Readings---")
spark.sql(f'CREATE TABLE turbine_power USING DELTA PARTITIONED BY (date) LOCATION "{GOLD_PATH + "turbine_power"}" AS SELECT date, window, deviceId, 0.1 * (temperature/humidity) * (3.1416 * 25) * windspeed * rpm AS power FROM turbine_enriched')

# Create a maintenance records based on peak power usage
print("---Generating Historical Turbine Maintenance Records---")
spark.sql(f'CREATE TABLE turbine_maintenance USING DELTA LOCATION "{GOLD_PATH + "turbine_maintenance"}" AS SELECT DISTINCT deviceid, FIRST(date) OVER (PARTITION BY deviceid, year(date), month(date) ORDER BY power) AS date, True AS maintenance FROM turbine_power')

In [22]:
%sql
-- Optimize all 3 tables for querying and model training performance
OPTIMIZE turbine_enriched WHERE date<current_date() ZORDER BY deviceid, window;
OPTIMIZE turbine_power ZORDER BY deviceid, window;
OPTIMIZE turbine_maintenance ZORDER BY deviceid;

path,metrics
,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, List(minCubeSize(107374182400), List(0, 0), List(1, 1091), 0, List(0, 0), 0), 0)"


Our Delta Gold tables are now ready for predictive analytics! We now have hourly weather, turbine operating and power measurements, and daily maintenance logs going back one year. We can see that there is significant correlation between most of the variables.

In [24]:
%sql
-- Query all 3 tables
CREATE OR REPLACE VIEW gold_readings AS
SELECT r.*, 
  p.power, 
  ifnull(m.maintenance,False) as maintenance
FROM turbine_enriched r 
  JOIN turbine_power p ON (r.date=p.date AND r.window=p.window AND r.deviceid=p.deviceid)
  LEFT JOIN turbine_maintenance m ON (r.date=m.date AND r.deviceid=m.deviceid);
  
SELECT * FROM gold_readings ORDER BY deviceid, window

date,deviceid,window,rpm,angle,temperature,humidity,windspeed,winddirection,power,maintenance
2019-06-30,WindTurbine-0,2019-06-30T00:00:00.000+0000,8.250487390592669,7.054875549321888,28.18322793204275,72.79787651648205,7.279787651648206,SE,182.62542299009337,True
2019-06-30,WindTurbine-0,2019-06-30T00:05:00.000+0000,7.772962085571003,6.646550387535558,26.552026782630765,68.58444929179473,6.858444929179473,NW,162.0970546780076,True
2019-06-30,WindTurbine-0,2019-06-30T00:10:00.000+0000,8.55287466139928,7.313442645087909,29.216166832914013,75.46597964278163,7.546597964278164,NE,196.25749009683216,True
2019-06-30,WindTurbine-0,2019-06-30T00:15:00.000+0000,8.241577299795244,7.047256656190718,28.152791533809435,72.71925865320756,7.271925865320756,S,182.231184353318,True
2019-06-30,WindTurbine-0,2019-06-30T00:20:00.000+0000,8.310985428976894,7.106606569754141,28.389886026832176,73.33167876588683,7.333167876588684,NW,185.31350351460355,True
2019-06-30,WindTurbine-0,2019-06-30T00:25:00.000+0000,8.38799943837572,7.172460164352993,28.652961803824752,74.01120908704671,7.401120908704671,SW,188.76384301285952,True
2019-06-30,WindTurbine-0,2019-06-30T00:30:00.000+0000,8.44198512312269,7.218622562924629,28.837374043521777,74.48754982012252,7.448754982012253,W,191.2014537653543,True
2019-06-30,WindTurbine-0,2019-06-30T00:35:00.000+0000,8.532447994106658,7.295976089620997,29.14639041935857,75.28574568413006,7.528574568413006,SE,195.3211734924356,True
2019-06-30,WindTurbine-0,2019-06-30T00:40:00.000+0000,9.003206250554442,7.698514843494093,30.754475689264776,79.4394641009445,7.94394641009445,E,217.46853244522217,True
2019-06-30,WindTurbine-0,2019-06-30T00:45:00.000+0000,9.294727498221263,7.947790555934729,31.75029683061886,82.01169126582421,8.201169126582423,NE,231.7796744099802,True


## Step 3 - Machine Learning
Now that our data is flowing reliably from our sensor devices into an enriched Delta table in Data Lake storage, we can start to build ML models to predict power output and remaining life of our assets using historical sensor, weather, power and maintenance data. 

We create two models ***for each Wind Turbine***:
1. Turbine Power Output - using current readings for turbine operating parameters (angle, RPM) and weather (temperature, humidity, etc.), predict the expected power output 6 hours from now
2. Turbine Remaining Life - predict the remaining life in days until the next maintenance event

<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/turbine_models.png" width=800>

We will use the XGBoost framework to train regression models. Due to the size of the data and number of Wind Turbines, we will use Spark UDFs to distribute training across all the nodes in our cluster.

### 3a. Feature Engineering
In order to predict power output 6 hours ahead, we need to first time-shift our data to create our label column. We can do this easily using Spark Window partitioning. 

In order to predict remaining life, we need to backtrace the remaining life from the maintenance events. We can do this easily using cross joins. The following diagram illustrates the ML Feature Engineering pipeline:

<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/ml_pipeline.png" width=800>

In [27]:
%sql
-- Calculate the age of each turbine and the remaining life in days
CREATE OR REPLACE VIEW turbine_age AS
WITH reading_dates AS (SELECT distinct date, deviceid FROM turbine_power),
  maintenance_dates AS (
    SELECT d.*, datediff(nm.date, d.date) as datediff_next, datediff(d.date, lm.date) as datediff_last 
    FROM reading_dates d LEFT JOIN turbine_maintenance nm ON (d.deviceid=nm.deviceid AND d.date<=nm.date)
    LEFT JOIN turbine_maintenance lm ON (d.deviceid=lm.deviceid AND d.date>=lm.date ))
SELECT date, deviceid, ifnull(min(datediff_last),0) AS age, ifnull(min(datediff_next),0) AS remaining_life
FROM maintenance_dates 
GROUP BY deviceid, date;

-- Calculate the power 6 hours ahead using Spark Windowing and build a feature_table to feed into our ML models
CREATE OR REPLACE VIEW feature_table AS
SELECT r.*, age, remaining_life,
  LEAD(power, 72, power) OVER (PARTITION BY r.deviceid ORDER BY window) as power_6_hours_ahead
FROM gold_readings r JOIN turbine_age a ON (r.date=a.date AND r.deviceid=a.deviceid)
WHERE r.date < CURRENT_DATE();

In [28]:
%sql
SELECT window, power, power_6_hours_ahead FROM feature_table WHERE deviceid='WindTurbine-1' AND date=CURRENT_DATE() - INTERVAL 1 DAYS

window,power,power_6_hours_ahead
2020-06-28T00:00:00.000+0000,167.5188686326256,121.35164292376794
2020-06-28T00:05:00.000+0000,175.0479544514008,120.23979663176787
2020-06-28T00:10:00.000+0000,141.1197270710557,171.15077686906142
2020-06-28T00:15:00.000+0000,197.6314345624256,149.74897646959562
2020-06-28T00:20:00.000+0000,139.29452159804313,116.61264103609422
2020-06-28T00:25:00.000+0000,138.72656239393712,145.0410838353922
2020-06-28T00:30:00.000+0000,174.09608081770722,153.30762362186732
2020-06-28T00:35:00.000+0000,199.46480250374472,145.8707601745569
2020-06-28T00:40:00.000+0000,165.68681062384812,189.27479465225207
2020-06-28T00:45:00.000+0000,195.57528652237303,165.34019181151467


In [29]:
%sql
SELECT date, avg(age) as age, avg(remaining_life) as life FROM feature_table WHERE deviceid='WindTurbine-1' GROUP BY date ORDER BY date

date,age,life
2019-06-30,0.0,0.0
2019-07-01,1.0,8.0
2019-07-02,2.0,7.0
2019-07-03,3.0,6.0
2019-07-04,4.0,5.0
2019-07-05,5.0,4.0
2019-07-06,6.0,3.0
2019-07-07,7.0,2.0
2019-07-08,8.0,1.0
2019-07-09,0.0,0.0


### 3b. Distributed Model Training - Predict Power Output
[Pandas UDFs](https://docs.microsoft.com/en-us/azure/databricks/spark/latest/spark-sql/udf-python-pandas?toc=https%3A%2F%2Fdocs.microsoft.com%2Fen-us%2Fazure%2Fazure-databricks%2Ftoc.json&bc=https%3A%2F%2Fdocs.microsoft.com%2Fen-us%2Fazure%2Fbread%2Ftoc.json) allow us to vectorize Pandas code across multiple nodes in a cluster. Here we create a UDF to train an XGBoost Regressor model against all the historic data for a particular Wind Turbine. We use a Grouped Map UDF as we perform this model training on the Wind Turbine group level.

In [31]:
# Create a function to train a XGBoost Regressor on a turbine's data
def train_distributed_xgb(readings_pd, model_type, label_col, prediction_col):
  mlflow.xgboost.autolog()
  with mlflow.start_run():
    # Log the model type and device ID
    mlflow.log_param('deviceid', readings_pd['deviceid'][0])
    mlflow.log_param('model', model_type)

    # Train an XGBRegressor on the data for this Turbine
    alg = xgb.XGBRegressor() 
    train_dmatrix = xgb.DMatrix(data=readings_pd[feature_cols].astype('float'),label=readings_pd[label_col])
    params = {'learning_rate': 0.5, 'alpha':10, 'colsample_bytree': 0.5, 'max_depth': 5}
    model = xgb.train(params=params, dtrain=train_dmatrix, evals=[(train_dmatrix, 'train')])

    # Make predictions on the dataset and return the results
    readings_pd[prediction_col] = model.predict(train_dmatrix)
  return readings_pd

# Create a Spark Dataframe that contains the features and labels we need
non_feature_cols = ['date','window','deviceid','winddirection','remaining_life']
feature_cols = ['angle','rpm','temperature','humidity','windspeed','power','age']
label_col = 'power_6_hours_ahead'
prediction_col = label_col + '_predicted'

# Read in our feature table and select the columns of interest
feature_df = spark.table('feature_table').selectExpr(non_feature_cols + feature_cols + [label_col] + [f'0 as {prediction_col}'])

# Register a Pandas UDF to distribute XGB model training using Spark
@pandas_udf(feature_df.schema, PandasUDFType.GROUPED_MAP)
def train_power_models(readings_pd):
  return train_distributed_xgb(readings_pd, 'power_prediction', label_col, prediction_col)

# Run the Pandas UDF against our feature dataset
power_predictions = feature_df.groupBy('deviceid').apply(train_power_models)

# Save predictions to storage
power_predictions.write.format("delta").mode("overwrite").partitionBy("date").saveAsTable("turbine_power_predictions")

In [32]:
%sql 
-- Plot actuals vs. predicted
SELECT date, deviceid, avg(power_6_hours_ahead) as actual, avg(power_6_hours_ahead_predicted) as predicted FROM turbine_power_predictions GROUP BY date, deviceid

date,deviceid,actual,predicted
2019-08-05,WindTurbine-0,188.89791095078505,167.13541666666666
2019-07-02,WindTurbine-3,196.4190089458926,173.12847222222223
2019-10-07,WindTurbine-6,156.77513730249547,165.24305555555554
2020-05-06,WindTurbine-0,164.41021100962382,165.0625
2020-03-26,WindTurbine-0,165.02576822965,165.47222222222223
2019-11-15,WindTurbine-4,136.85110344798477,160.61805555555554
2019-09-16,WindTurbine-2,165.51413944326302,165.17708333333334
2019-07-28,WindTurbine-1,191.02691012495745,167.97569444444446
2020-02-20,WindTurbine-7,163.2832069943679,165.25
2019-11-04,WindTurbine-1,142.87533290654252,162.37152777777777


#### Automated Model Tracking in Databricks
As you train the models, notice how Databricks-managed MLflow automatically tracks each run in the "Runs" tab of the notebook. You can open each run and view the parameters, metrics, models and model artifacts that are captured by MLflow Autologging. For XGBoost Regression models, MLflow tracks: 
1. Any model parameters (alpha, colsample, learning rate, etc.) passed to the `params` variable
2. Metrics specified in `evals` (RMSE by default)
3. The trained XGBoost model file
4. Feature importances

<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/iiot_mlflow_tracking.gif" width=800>

### 3c. Distributed Model Training - Predict Remaining Life
Our second model predicts the remaining useful life of each Wind Turbine based on the current operating conditions. We have historical maintenance data that indicates when a replacement activity occured - this will be used to calculate the remaining life as our training label. 

Once again, we train an XGBoost model for each Wind Turbine to predict the remaining life given a set of operating parameters and weather conditions

In [35]:
# Create a Spark Dataframe that contains the features and labels we need
non_feature_cols = ['date','window','deviceid','winddirection','power_6_hours_ahead_predicted']
label_col = 'remaining_life'
prediction_col = label_col + '_predicted'

# Read in our feature table and select the columns of interest
feature_df = spark.table('turbine_power_predictions').selectExpr(non_feature_cols + feature_cols + [label_col] + [f'0 as {prediction_col}'])

# Register a Pandas UDF to distribute XGB model training using Spark
@pandas_udf(feature_df.schema, PandasUDFType.GROUPED_MAP)
def train_life_models(readings_pd):
  return train_distributed_xgb(readings_pd, 'life_prediction', label_col, prediction_col)

# Run the Pandas UDF against our feature dataset
life_predictions = feature_df.groupBy('deviceid').apply(train_life_models)

# Save predictions to storage
life_predictions.write.format("delta").mode("overwrite").partitionBy("date").saveAsTable("turbine_life_predictions")

In [36]:
%sql 
SELECT date, avg(remaining_life) as Actual_Life, avg(remaining_life_predicted) as Predicted_Life 
FROM turbine_life_predictions 
WHERE deviceid='WindTurbine-1' 
GROUP BY date

date,Actual_Life,Predicted_Life
2019-11-18,2.0,4.486111111111111
2019-11-01,19.0,19.02430555555556
2020-01-21,4.0,4.298611111111111
2019-09-22,5.0,7.46875
2019-11-21,26.0,28.375
2020-04-30,22.0,18.61458333333333
2020-03-07,4.0,22.08680555555556
2020-03-13,33.0,27.82638888888889
2020-02-04,23.0,21.538194444444443
2020-02-15,12.0,13.96875


The models to predict remaining useful life have been trained and logged by MLflow. We can now move on to model deployment in AzureML.

## Step 4 - Model Deployment to AzureML
Now that our models have been trained, we can deploy them in an automated way directly to a model serving environment like Azure ML. Below, we connect to an AzureML workspace, build a container image for the model, and deploy that image to Azure Container Instances (ACI) to be hosted for REST API calls. 

**Note:** This step requires authentication to Azure - open the link provided in the output of the cell in a new browser tab and use the code provided.

In [39]:
# AML Workspace Information - replace with your workspace info
aml_resource_group = dbutils.widgets.get("Resource Group")
aml_subscription_id = dbutils.widgets.get("Subscription ID")
aml_region = dbutils.widgets.get("Region")
aml_workspace_name = "iot"
turbine = "WindTurbine-1"
power_model = "power_prediction"
life_model = "life_prediction"

# Connect to a workspace (replace widgets with your own workspace info)
workspace = Workspace.create(name = aml_workspace_name,
                             subscription_id = aml_subscription_id,
                             resource_group = aml_resource_group,
                             location = aml_region,
                             exist_ok=True)

# Retrieve the remaining_life and power_output experiments on WindTurbine-1, and get the best performing model (min RMSE)
best_life_model = mlflow.search_runs(filter_string=f'params.deviceid="{turbine}" and params.model="{life_model}"')\
  .dropna().sort_values("metrics.train-rmse")['artifact_uri'].iloc[0] + '/model'
best_power_model = mlflow.search_runs(filter_string=f'params.deviceid="{turbine}" and params.model="{power_model}"')\
  .dropna().sort_values("metrics.train-rmse")['artifact_uri'].iloc[0] + '/model'

scoring_uris = {}
for model, path in [('life',best_life_model),('power',best_power_model)]:
  # Build images for each of our two models in Azure Container Instances
  print(f"-----Building image for {model} model-----")
  model_image, azure_model = mlflow.azureml.build_image(model_uri=path, 
                                                        workspace=workspace, 
                                                        model_name=model,
                                                        image_name=model,
                                                        description=f"XGBoost model to predict {model} of a turbine", 
                                                        synchronous=True)
  model_image.wait_for_creation(show_output=True)

  # Deploy web services to host each model as a REST API
  print(f"-----Deploying image for {model} model-----")
  dev_webservice_name = model 
  dev_webservice_deployment_config = AciWebservice.deploy_configuration()
  dev_webservice = Webservice.deploy_from_image(name=dev_webservice_name, image=model_image, deployment_config=dev_webservice_deployment_config, workspace=workspace)
  dev_webservice.wait_for_deployment()

  # Get the URI for sending REST requests to
  scoring_uris[model] = dev_webservice.scoring_uri

print(f"-----Model URIs for Scoring:-----")
scoring_uris

You can view your model, it's deployments and URL endpoints by navigating to https://ml.azure.com/.

<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/iiot_azureml.gif" width=800>

## Step 5 - Model Inference: Real-time Scoring
We can now make HTTP REST calls from a web app, PowerBI, or directly from Databricks to the hosted model URI to score data directly

In [42]:
# Retrieve the Scoring URL provided by AzureML
power_uri = scoring_uris['power'] 
life_uri = scoring_uris['life'] 

# Construct a payload to send with the request
payload = {
  'angle':8,
  'rpm':6,
  'temperature':25,
  'humidity':50,
  'windspeed':5,
  'power':150,
  'age':10
}

def score_data(uri, payload):
  rest_payload = json.dumps({"data": [list(payload.values())]})
  response = requests.post(uri, data=rest_payload, headers={"Content-Type": "application/json"})
  return json.loads(response.text)

print(f'Current Operating Parameters: {payload}')
print(f'Predicted power (in kwh) from model: {score_data(power_uri, payload)}')
print(f'Predicted remaining life (in days) from model: {score_data(life_uri, payload)}')

### Step 6: Asset Optimization
We can now identify the optimal operating conditions for maximizing power output while also maximizing asset useful life. 

\\(Revenue = Price\displaystyle\sum_1^{365} Power_t\\)

\\(Cost = {365 \over Life_{rpm}} Price \displaystyle\sum_1^{24} Power_t \\)

Price\displaystyle\sum_{t=1}^{24})\\)

\\(Profit = Revenue - Cost\\)

\\(Power_t\\) and \\(Life\\) will be calculated by scoring many different RPM values in AzureML. The results can be visualized to identify the RPM that yields the highest profit.

In [44]:
# Construct a payload to send with the request
payload = {
  'angle':8,
  'rpm':6,
  'temperature':25,
  'humidity':50,
  'windspeed':5,
  'power':150,
  'age':10
}

# Iterate through 50 different RPM configurations and capture the predicted power and remaining life at each RPM
results = []
for rpm in range(1,15):
  payload['rpm'] = rpm
  expected_power = score_data(power_uri, payload)[0]
  payload['power'] = expected_power
  expected_life = -score_data(life_uri, payload)[0]
  results.append((rpm, expected_power, expected_life))
  
# Calculalte the Revenue, Cost and Profit generated for each RPM configuration
optimization_df = pd.DataFrame(results, columns=['RPM', 'Expected Power', 'Expected Life'])
optimization_df['Revenue'] = optimization_df['Expected Power'] * 24 * 365
optimization_df['Cost'] = optimization_df['Expected Power'] * 24 * 365 / optimization_df['Expected Life']
optimization_df['Profit'] = optimization_df['Revenue'] + optimization_df['Cost']

display(optimization_df)

RPM,Expected Power,Expected Life,Revenue,Cost,Profit
1,162.70347595214844,-13.420654296875,1425282.4493408203,-106200.66785214022,1319081.78148868
2,162.70347595214844,-13.420654296875,1425282.4493408203,-106200.66785214022,1319081.78148868
3,162.70347595214844,-13.420654296875,1425282.4493408203,-106200.66785214022,1319081.78148868
4,162.70347595214844,-13.420654296875,1425282.4493408203,-106200.66785214022,1319081.78148868
5,162.70347595214844,-18.624601364135746,1425282.4493408203,-76526.87010447374,1348755.5792363463
6,162.70347595214844,-20.62579345703125,1425282.4493408203,-69101.94520807378,1356180.5041327465
7,165.82839965820312,-20.977718353271484,1452656.7810058594,-69247.60627169527,1383409.174734164
8,169.49905395507812,-21.40289688110352,1484811.7126464844,-69374.333806066,1415437.3788404183
9,171.86351013183594,-22.02106666564941,1505524.3487548828,-68367.45792624773,1437156.890828635
10,174.05905151367188,-22.27279281616211,1524757.2912597656,-68458.28917123206,1456299.0020885337


The optimal operating parameters for **WindTurbine-1** given the specified weather conditions is **37 RPM** for generating a maximum profit of **$1.2M**!