# End to End Industrial IoT (IIoT) on Azure Databricks 
## Part 2 - Machine Learning
This notebook demonstrates the following architecture for IIoT Ingest, Processing and Analytics on Azure. The following architecture is implemented for the demo. 
<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/end_to_end_architecture.png" width=800>

The notebook is broken into sections following these steps:
**Machine Learning** - train XGBoost regression models using distributed ML to predict power output and asset remaining life on historical sensor data

In [0]:
# Widgets for user specific inputs
dbutils.widgets.text("Subscription ID","","Subscription ID")
dbutils.widgets.text("Resource Group","","Resource Group")
dbutils.widgets.text("Region","","Region")
dbutils.widgets.text("Storage Account","","Storage Account")
dbutils.widgets.text("ADLS Key", "", "ADLS Key")


## Environment Setup

The pre-requisites are listed below:

### Azure Services Required
* ADLS Gen 2 Storage account with a container called `iot`
* Azure Machine Learning Workspace called `iot`

### Azure Databricks Configuration Required
* 3-node (min) Databricks Cluster running **DBR 10.4 ML+** and the following libraries:
 * **MLflow[AzureML]** - PyPI library `azureml-mlflow` version
 * **Azure Event Hubs Connector for Databricks** - Maven coordinates `com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.21`
 * **Azure ML Package client library for Python** - PyPI library `azure-ai-ml` version 1.5.0
 * **MLflow** - PyPI library `mlflow` version 1.30.0
 * **Azure Machine Learning core packages, modules, and classes** - PyPI library `azureml.core` version 1.47.0
 * **

* The following notebook widgets populated:
 * `Subscription ID` - subscription ID of your Azure ML Workspace
 * `Resource Group` - resource group name of your Azure ML Workspace
 * `Region` - Azure region of your Azure ML Workspace
 * `Storage Account` - Name of your storage account
 * `adls_key` - Access Key to ADLS storage account **(Important - use the [Access Key](https://raw.githubusercontent.com/tomatoTomahto/azure_databricks_iot/master/bricks.com/blog/2020/03/27/data-exfiltration-protection-with-azure-databricks.html))** Note that in the original notebook this was handled as a secret. 
 * **

* **Part 1 Notebook Run to generate and process the data**. 
* Ensure the following tables have been created:
 * **turbine_maintenance** - Maintenance dates for each Wind Turbine
 * **turbine_power** - Hourly power output for each Wind Turbine
 * **turbine_enriched** - Hourly turbine sensor readinigs (RPM, Angle) enriched with weather readings (temperature, wind speed/direction, humidity)
 * **gold_readings** - Combined view containing all 3 tables

In [0]:
# Verify azureml.core is installed
!pip show azureml.core

In [0]:
# Verify azureml-mlflow is installed
!pip show azureml-mlflow

In [0]:
# Verify azure-ai-ml is installed
!pip show azure-ai-ml

In [0]:
# Verify azureml-mlflow is insatlled
!pip show azureml-mlflow

In [0]:
# This is automatically installed
!pip show azure.core

In [0]:
# This cell needs to be run each time
# Import libraries and print out the versions
import xgboost as xgb
import azureml.mlflow
import azureml.core
import mlflow
import pandas as pd
import sklearn
import numpy as np
import matplotlib


# cell to check for versions
print("XGBoost: {}".format(xgb.__version__))
print("Pandas: {}".format(np.__version__))
print("MLFlow: {}".format(mlflow.__version__))
print("Matplotlib: {}".format(matplotlib.__version__))
print("Scikit-Learn: {}".format(sklearn.__version__))
print("azureml-mlflow: {}".format(azureml.mlflow.__version__))
print("azureml.core: {}".format(azureml.core.__version__))
print("NumPy: {}".format(np.__version__))

In [0]:
# If there is an issue with typing_extensions, uncomment out this cell and run it
# from https://docs.microsoft.com/en-us/azure/machine-learning/how-to-use-mlflow
# import typing_extensions
# from importlib import reload
# reload(typing_extensions)

In [0]:
# This cell needs to run each time
# Set important id's and keys from widgets

storage_account = dbutils.widgets.get("Storage Account")
adls_key = dbutils.widgets.get("ADLS Key")

BLOB_CONTAINER_NAME = "iot"
spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", adls_key)

# Setup storage locations for all data
ROOT_PATH = f"abfss://iot@{storage_account}.dfs.core.windows.net/"

# Pyspark and ML Imports
import os, json, requests
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
import numpy as np 
import pandas as pd
import xgboost as xgb
import mlflow
import mlflow.xgboost
import mlflow.azureml
import azureml.core
from azureml.core.webservice import AciWebservice, Webservice
import random, string

# Random String generator for ML models served in AzureML
random_string = lambda length: ''.join(random.SystemRandom().choice(string.ascii_lowercase) for _ in range(length))

## Machine Learning
Now that our data is flowing reliably from our sensor devices into an enriched Delta table in Data Lake storage, we can start to build ML models to predict power output and remaining life of our assets using historical sensor, weather, power and maintenance data. 

We create two models ***for each Wind Turbine***:
1. Turbine Power Output - using current readings for turbine operating parameters (angle, RPM) and weather (temperature, humidity, etc.), predict the expected power output 6 hours from now
2. Turbine Remaining Life - predict the remaining life in days until the next maintenance event

<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/turbine_models.png" width=800>

We will use the XGBoost framework to train regression models. Due to the size of the data and number of Wind Turbines, we will use Spark UDFs to distribute training across all the nodes in our cluster.

### Feature Engineering
In order to predict power output 6 hours ahead, we need to first time-shift our data to create our label column. We can do this easily using Spark Window partitioning. 

In order to predict remaining life, we need to backtrace the remaining life from the maintenance events. We can do this easily using cross joins. The following diagram illustrates the ML Feature Engineering pipeline:

<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/ml_pipeline.png" width=800>

In [0]:
%sql
-- examine the SQL - piece by piece - note SQL from original Pt 2 notebook
-- once a view is created, it doesn't need to be created again upon re-entry of notebook unless it is dropped
CREATE OR REPLACE VIEW ex001_rd AS
SELECT distinct date, deviceid FROM turbine_power



In [0]:
%sql
-- test query against ex001_rd
select deviceid, date from ex001_rd
ORDER by deviceid, date
LIMIT 500

deviceid,date
WindTurbine-0,2021-09-28
WindTurbine-0,2021-09-29
WindTurbine-0,2021-09-30
WindTurbine-0,2021-10-01
WindTurbine-0,2021-10-02
WindTurbine-0,2021-10-03
WindTurbine-0,2021-10-04
WindTurbine-0,2021-10-05
WindTurbine-0,2021-10-06
WindTurbine-0,2021-10-07


In [0]:
%sql
-- deviceid, date, maintenance (true/false)
-- dates maintenance was performed ... 
SELECT deviceid, date, maintenance from turbine_maintenance
ORDER by deviceid, date
limit 500

deviceid,date,maintenance
WindTurbine-0,2021-09-29,True
WindTurbine-0,2021-10-07,True
WindTurbine-0,2021-11-22,True
WindTurbine-0,2021-12-26,True
WindTurbine-0,2022-01-16,True
WindTurbine-0,2022-02-18,True
WindTurbine-0,2022-03-17,True
WindTurbine-0,2022-04-25,True
WindTurbine-0,2022-05-28,True
WindTurbine-0,2022-06-10,True


In [0]:
%sql
-- examine the SQL - piece by piece
CREATE OR REPLACE VIEW ex001_md AS
    SELECT d.*, datediff(nm.date, d.date) as datediff_next, datediff(d.date, lm.date) as datediff_last 
    FROM ex001_rd d LEFT JOIN turbine_maintenance nm ON (d.deviceid=nm.deviceid AND d.date<=nm.date)
    LEFT JOIN turbine_maintenance lm ON (d.deviceid=lm.deviceid AND d.date>=lm.date )
    


In [0]:
%sql
SELECT deviceid, date, datediff_next from ex001_md 
ORDER by deviceid, date, datediff_next, datediff_last
limit 500

deviceid,date,datediff_next
WindTurbine-0,2021-09-28,1
WindTurbine-0,2021-09-28,9
WindTurbine-0,2021-09-28,55
WindTurbine-0,2021-09-28,89
WindTurbine-0,2021-09-28,110
WindTurbine-0,2021-09-28,143
WindTurbine-0,2021-09-28,170
WindTurbine-0,2021-09-28,209
WindTurbine-0,2021-09-28,242
WindTurbine-0,2021-09-28,255


In [0]:
%sql
-- Now we have reviewed all the elements composing the SQL for turbine_age
-- Calculate the age of each turbine and the remaining life in days
-- This is the original view analyzed in preceeding cells
CREATE OR REPLACE VIEW turbine_age AS
WITH reading_dates AS (SELECT distinct date, deviceid FROM turbine_power),
  maintenance_dates AS (
    SELECT d.*, datediff(nm.date, d.date) as datediff_next, datediff(d.date, lm.date) as datediff_last 
    FROM reading_dates d LEFT JOIN turbine_maintenance nm ON (d.deviceid=nm.deviceid AND d.date<=nm.date)
    LEFT JOIN turbine_maintenance lm ON (d.deviceid=lm.deviceid AND d.date>=lm.date ))
SELECT date, deviceid, ifnull(min(datediff_last),0) AS age, ifnull(min(datediff_next),0) AS remaining_life
FROM maintenance_dates 
GROUP BY deviceid, date;

In [0]:
%sql
--Review gold_readings used to create feature_table
SELECT deviceid, date, window, rpm, angle, temperature, humidity, windspeed, winddirection, power, maintenance from gold_readings
ORDER BY deviceid, date, window
limit 500

deviceid,date,window,rpm,angle,temperature,humidity,windspeed,winddirection,power,maintenance
WindTurbine-0,2021-09-28,2021-09-28T00:00:00.000+0000,8.082760247888954,7.024481258389998,27.265490780187687,73.29382790537916,7.329382790537912,NE,173.08678580857463,False
WindTurbine-0,2021-09-28,2021-09-28T00:05:00.000+0000,7.614942727533601,6.617915270621038,25.68740682163027,69.05169579005361,6.905169579005357,N,153.63062668867892,False
WindTurbine-0,2021-09-28,2021-09-28T00:10:00.000+0000,8.379000178481432,7.281934377944932,28.264793846043467,75.9801080653304,7.598010806533035,NE,186.0068417394524,False
WindTurbine-0,2021-09-28,2021-09-28T00:15:00.000+0000,8.074031293550496,7.016895189488368,27.23604548962645,73.2146744407884,7.321467444078837,N,172.71313849614816,False
WindTurbine-0,2021-09-28,2021-09-28T00:20:00.000+0000,8.142028399766238,7.075989407749016,27.465419418302485,73.83126679886831,7.383126679886828,W,175.63446624848436,False
WindTurbine-0,2021-09-28,2021-09-28T00:25:00.000+0000,8.217476763508863,7.141559287447561,27.719928596221266,74.51542656834047,7.451542656834044,N,178.90459241122502,False
WindTurbine-0,2021-09-28,2021-09-28T00:30:00.000+0000,8.270364953741772,7.187522806058454,27.8983357763121,74.99501247640575,7.499501247640572,SW,181.2148852680123,False
WindTurbine-0,2021-09-28,2021-09-28T00:35:00.000+0000,8.35898877229737,7.2645430730709935,28.19728957843233,75.79864622358116,7.579864622358112,N,185.1194295221348,False
WindTurbine-0,2021-09-28,2021-09-28T00:40:00.000+0000,8.820176813857216,7.665347582319781,29.753010385364224,79.98066275177582,7.998066275177578,E,206.11001841453967,False
WindTurbine-0,2021-09-28,2021-09-28T00:45:00.000+0000,9.105771620625005,7.91354934831399,30.716404366130924,82.57041377443856,8.257041377443851,SW,219.6736807096,False


In [0]:
%sql
-- CURRENT_DATE is used to filter rows in creating feature_table
SELECT CURRENT_DATE()


current_date()
2023-05-17


In [0]:
%sql
-- Fields from turbine_age used in feature_table
SELECT deviceid, date, age, remaining_life FROM turbine_age
ORDER BY deviceid, date 
LIMIT 500


deviceid,date,age,remaining_life
WindTurbine-0,2021-09-28,0,1
WindTurbine-0,2021-09-29,0,0
WindTurbine-0,2021-09-30,1,7
WindTurbine-0,2021-10-01,2,6
WindTurbine-0,2021-10-02,3,5
WindTurbine-0,2021-10-03,4,4
WindTurbine-0,2021-10-04,5,3
WindTurbine-0,2021-10-05,6,2
WindTurbine-0,2021-10-06,7,1
WindTurbine-0,2021-10-07,0,0


In [0]:
%sql
-- Calculate the power 6 hours ahead using Spark Windowing and build a feature_table to feed into our ML models
-- This is the original view analyzed in preceeding cells
CREATE OR REPLACE VIEW feature_table AS
SELECT r.*, age, remaining_life,
  LEAD(power, 72, power) OVER (PARTITION BY r.deviceid ORDER BY window) as power_6_hours_ahead
FROM gold_readings r JOIN turbine_age a ON (r.date=a.date AND r.deviceid=a.deviceid)
WHERE r.date < CURRENT_DATE();


In [0]:
%sql
SELECT deviceid, date, window, rpm, angle, temperature, humidity, windspeed, winddirection, power, maintenance, age, remaining_life, power_6_hours_ahead
FROM feature_table WHERE deviceid = 'WindTurbine-1'
ORDER BY deviceid, date, window

deviceid,date,window,rpm,angle,temperature,humidity,windspeed,winddirection,power,maintenance,age,remaining_life,power_6_hours_ahead
WindTurbine-1,2021-09-28,2021-09-28T00:00:00.000+0000,8.082760247888954,7.024481258389998,27.265490780187687,73.29382790537916,7.329382790537912,E,173.08678580857463,False,0,1,189.3184804443683
WindTurbine-1,2021-09-28,2021-09-28T00:05:00.000+0000,7.614942727533601,6.617915270621038,25.68740682163027,69.05169579005361,6.905169579005357,W,153.63062668867892,False,0,1,259.56169697015656
WindTurbine-1,2021-09-28,2021-09-28T00:10:00.000+0000,8.379000178481432,7.281934377944932,28.264793846043467,75.9801080653304,7.598010806533035,W,186.0068417394524,False,0,1,151.480818208873
WindTurbine-1,2021-09-28,2021-09-28T00:15:00.000+0000,8.074031293550496,7.016895189488368,27.23604548962645,73.2146744407884,7.321467444078837,E,172.71313849614816,False,0,1,202.1526608995715
WindTurbine-1,2021-09-28,2021-09-28T00:20:00.000+0000,8.142028399766238,7.075989407749016,27.465419418302485,73.83126679886831,7.383126679886828,S,175.63446624848436,False,0,1,125.06154135885734
WindTurbine-1,2021-09-28,2021-09-28T00:25:00.000+0000,8.217476763508863,7.141559287447561,27.719928596221266,74.51542656834047,7.451542656834044,NW,178.90459241122502,False,0,1,190.6211087015269
WindTurbine-1,2021-09-28,2021-09-28T00:30:00.000+0000,8.270364953741772,7.187522806058454,27.8983357763121,74.99501247640575,7.499501247640572,NW,181.2148852680123,False,0,1,228.6683622382571
WindTurbine-1,2021-09-28,2021-09-28T00:35:00.000+0000,8.35898877229737,7.2645430730709935,28.19728957843233,75.79864622358116,7.579864622358112,NW,185.1194295221348,False,0,1,234.4633655105253
WindTurbine-1,2021-09-28,2021-09-28T00:40:00.000+0000,8.820176813857216,7.665347582319781,29.753010385364224,79.98066275177582,7.998066275177578,W,206.11001841453967,False,0,1,169.04765533721894
WindTurbine-1,2021-09-28,2021-09-28T00:45:00.000+0000,9.105771620625005,7.91354934831399,30.716404366130924,82.57041377443856,8.257041377443851,NE,219.6736807096,False,0,1,175.97316346677007


In [0]:
%sql
-- power from piece-by-piece feature_table
SELECT window, power, power_6_hours_ahead FROM feature_table WHERE deviceid='WindTurbine-1'
ORDER BY window

window,power,power_6_hours_ahead
2021-09-28T00:00:00.000+0000,173.08678580857463,189.3184804443683
2021-09-28T00:05:00.000+0000,153.63062668867892,259.56169697015656
2021-09-28T00:10:00.000+0000,186.0068417394524,151.480818208873
2021-09-28T00:15:00.000+0000,172.71313849614816,202.1526608995715
2021-09-28T00:20:00.000+0000,175.63446624848436,125.06154135885734
2021-09-28T00:25:00.000+0000,178.90459241122502,190.6211087015269
2021-09-28T00:30:00.000+0000,181.2148852680123,228.6683622382571
2021-09-28T00:35:00.000+0000,185.1194295221348,234.4633655105253
2021-09-28T00:40:00.000+0000,206.11001841453967,169.04765533721894
2021-09-28T00:45:00.000+0000,219.6736807096,175.97316346677007


In [0]:
%sql
SELECT window, power, power_6_hours_ahead FROM feature_table WHERE deviceid='WindTurbine-1'
order by window
LIMIT 1000

window,power,power_6_hours_ahead
2021-09-28T00:00:00.000+0000,173.08678580857463,189.3184804443683
2021-09-28T00:05:00.000+0000,153.63062668867892,259.56169697015656
2021-09-28T00:10:00.000+0000,186.0068417394524,151.480818208873
2021-09-28T00:15:00.000+0000,172.71313849614816,202.1526608995715
2021-09-28T00:20:00.000+0000,175.63446624848436,125.06154135885734
2021-09-28T00:25:00.000+0000,178.90459241122502,190.6211087015269
2021-09-28T00:30:00.000+0000,181.2148852680123,228.6683622382571
2021-09-28T00:35:00.000+0000,185.1194295221348,234.4633655105253
2021-09-28T00:40:00.000+0000,206.11001841453967,169.04765533721894
2021-09-28T00:45:00.000+0000,219.6736807096,175.97316346677007


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT date, avg(age) as age, avg(remaining_life) as life FROM feature_table WHERE deviceid='WindTurbine-1' GROUP BY date ORDER BY date

date,age,life
2021-09-28,0.0,1.0
2021-09-29,0.0,0.0
2021-09-30,1.0,7.0
2021-10-01,2.0,6.0
2021-10-02,3.0,5.0
2021-10-03,4.0,4.0
2021-10-04,5.0,3.0
2021-10-05,6.0,2.0
2021-10-06,7.0,1.0
2021-10-07,0.0,0.0


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

### Distributed Model Training - Predict Power Output
[Pandas UDFs](https://docs.microsoft.com/en-us/azure/databricks/spark/latest/spark-sql/udf-python-pandas?toc=https%3A%2F%2Fdocs.microsoft.com%2Fen-us%2Fazure%2Fazure-databricks%2Ftoc.json&bc=https%3A%2F%2Fdocs.microsoft.com%2Fen-us%2Fazure%2Fbread%2Ftoc.json) allow us to vectorize Pandas code across multiple nodes in a cluster. Here we create a UDF to train an XGBoost Regressor model against all the historic data for a particular Wind Turbine. We use a Grouped Map UDF as we perform this model training on the Wind Turbine group level.

In [0]:
# Create a function to train a XGBoost Regressor on a turbine's data
def train_distributed_xgb(readings_pd, model_type, label_col, prediction_col):
  mlflow.xgboost.autolog()
  with mlflow.start_run():
    # Log the model type and device ID
    mlflow.log_param('deviceid', readings_pd['deviceid'][0])
    mlflow.log_param('model', model_type)

    # Train an XGBRegressor on the data for this Turbine
    alg = xgb.XGBRegressor() 
    train_dmatrix = xgb.DMatrix(data=readings_pd[feature_cols].astype('float'),label=readings_pd[label_col])
    params = {'learning_rate': 0.5, 'alpha':10, 'colsample_bytree': 0.5, 'max_depth': 5}
    model = xgb.train(params=params, dtrain=train_dmatrix, evals=[(train_dmatrix, 'train')])

    # Make predictions on the dataset and return the results
    readings_pd[prediction_col] = model.predict(train_dmatrix)
  return readings_pd

# Create a Spark Dataframe that contains the features and labels we need
non_feature_cols = ['date','window','deviceid','winddirection','remaining_life']
feature_cols = ['angle','rpm','temperature','humidity','windspeed','power','age']
label_col = 'power_6_hours_ahead'
prediction_col = label_col + '_predicted'

# Read in our feature table and select the columns of interest
feature_df = spark.table('feature_table').selectExpr(non_feature_cols + feature_cols + [label_col] + [f'0 as {prediction_col}'])

# Register a Pandas UDF to distribute XGB model training using Spark
@pandas_udf(feature_df.schema, PandasUDFType.GROUPED_MAP)
def train_power_models(readings_pd):
  return train_distributed_xgb(readings_pd, 'power_prediction', label_col, prediction_col)

# Run the Pandas UDF against our feature dataset - this will train 1 model for each turbine
power_predictions = feature_df.groupBy('deviceid').apply(train_power_models)

# Save predictions to storage
power_predictions.write.format("delta").mode("overwrite").partitionBy("date").saveAsTable("turbine_power_predictions")

In [0]:
%sql 
-- Plot actuals vs. predicted
SELECT date, deviceid, avg(power_6_hours_ahead) as actual, avg(power_6_hours_ahead_predicted) as predicted FROM turbine_power_predictions GROUP BY date, deviceid
LIMIT 5000

date,deviceid,actual,predicted
2022-02-05,WindTurbine-8,135.98429809601365,153.20486111111111
2021-10-10,WindTurbine-0,184.90707020970305,164.17708333333334
2022-05-30,WindTurbine-6,155.68573913559624,158.61458333333334
2022-07-17,WindTurbine-0,158.07608040633474,158.84722222222223
2022-02-20,WindTurbine-4,138.07740839769855,154.57291666666666
2022-01-20,WindTurbine-0,141.88007880478605,158.67013888888889
2021-09-30,WindTurbine-7,186.1599243605557,164.49305555555554
2022-04-17,WindTurbine-2,159.4204969919252,156.05902777777777
2021-10-06,WindTurbine-3,187.61774880113447,163.48611111111111
2022-06-04,WindTurbine-9,151.6305253008507,159.07638888888889


Databricks visualization. Run in Databricks to view.

In [0]:
# This cell has to be run when restarting processing after power prediction model is already complete
# Create a Spark Dataframe that contains the features and labels we need
# use this cell if not running power prediction above
non_feature_cols = ['date','window','deviceid','winddirection','remaining_life']
feature_cols = ['angle','rpm','temperature','humidity','windspeed','power','age']
label_col = 'power_6_hours_ahead'
prediction_col = label_col + '_predicted'

colexpr = non_feature_cols + feature_cols + [label_col] + [f'0 as {prediction_col}']
print(colexpr)

In [0]:
# This cell has to be run when restarting processing after power prediction model is already complete
# Use this function definition, if not running power prediction above before life prediction
# Create a function to train a XGBoost Regressor on a turbine's data
def train_distributed_xgb(readings_pd, model_type, label_col, prediction_col):
  mlflow.xgboost.autolog()
  with mlflow.start_run():
    # Log the model type and device ID
    mlflow.log_param('deviceid', readings_pd['deviceid'][0])
    mlflow.log_param('model', model_type)

    # Train an XGBRegressor on the data for this Turbine
    alg = xgb.XGBRegressor() 
    train_dmatrix = xgb.DMatrix(data=readings_pd[feature_cols].astype('float'),label=readings_pd[label_col])
    params = {'learning_rate': 0.5, 'alpha':10, 'colsample_bytree': 0.5, 'max_depth': 5}
    model = xgb.train(params=params, dtrain=train_dmatrix, evals=[(train_dmatrix, 'train')])

    # Make predictions on the dataset and return the results
    readings_pd[prediction_col] = model.predict(train_dmatrix)
  return readings_pd

### Distributed Model Training - Predict Remaining Life
Our second model predicts the remaining useful life of each Wind Turbine based on the current operating conditions. We have historical maintenance data that indicates when a replacement activity occured - this will be used to calculate the remaining life as our training label. 

Once again, we train an XGBoost model for each Wind Turbine to predict the remaining life given a set of operating parameters and weather conditions

In [0]:
# Create a Spark Dataframe that contains the features and labels we need
non_feature_cols = ['date','window','deviceid','winddirection','power_6_hours_ahead_predicted']
label_col = 'remaining_life'
prediction_col = label_col + '_predicted'

# Read in our feature table and select the columns of interest
feature_df = spark.table('turbine_power_predictions').selectExpr(non_feature_cols + feature_cols + [label_col] + [f'0 as {prediction_col}'])

# Register a Pandas UDF to distribute XGB model training using Spark
@pandas_udf(feature_df.schema, PandasUDFType.GROUPED_MAP)
def train_life_models(readings_pd):
  return train_distributed_xgb(readings_pd, 'life_prediction', label_col, prediction_col)

# Run the Pandas UDF against our feature dataset - this will train 1 model per turbine and write the predictions to a table
# Changed from apply to applyInPandas per SPARK-28264
life_predictions = (
  feature_df.groupBy('deviceid').apply(train_life_models)
    .write.format("delta").mode("overwrite")
    .partitionBy("date")
    .saveAsTable("turbine_life_predictions")
)

In [0]:
%sql 
SELECT date, avg(remaining_life) as Actual_Life, avg(remaining_life_predicted) as Predicted_Life 
FROM turbine_life_predictions 
WHERE deviceid='WindTurbine-1' 
GROUP BY date ORDER BY date
LIMIT 500

date,Actual_Life,Predicted_Life
2021-09-28,1.0,0.0243055555555555
2021-09-29,0.0,0.0659722222222222
2021-09-30,7.0,27.631944444444443
2021-10-01,6.0,27.20486111111111
2021-10-02,5.0,26.24305555555556
2021-10-03,4.0,25.47222222222222
2021-10-04,3.0,24.4375
2021-10-05,2.0,23.59375
2021-10-06,1.0,22.756944444444443
2021-10-07,0.0,0.0138888888888888


Databricks visualization. Run in Databricks to view.

#### Automated Model Tracking in Databricks
As you train the models, notice how Databricks-managed MLflow automatically tracks each run in the "Experiments" tab of the notebook. You can open each run and view the parameters, metrics, models and model artifacts that are captured by MLflow Autologging. For XGBoost Regression models, MLflow tracks: 
1. Any model parameters (alpha, colsample, learning rate, etc.) passed to the `params` variable
2. Metrics specified in `evals` (RMSE by default)
3. The trained XGBoost model file
4. Feature importances

<img src="https://sguptasa.blob.core.windows.net/random/iiot_blog/iiot_mlflow_tracking.gif" width=800>