# Scoring streaming data using AutoML

__Author       : Belvin Thomas__

__Problem__    : How to train a forecasting model using __Automated Machine Learning (Auto ML)__ and __Azure Machine Learning Python SDK__ to predict battery failure. The best trained model pipeline, as identified by Auto ML, can then be applied to incoming streaming telemetry. This will further help to determine if a battery replacement is needed within the next 30 days.

__Thanks__     : Hands on Azure machine learning workshop conducted by Microsoft, Auckland, Dec 2019

## Train Battery Life Forecasting Model

This section uses the Azure Machine Learning Python SDK to find a best performing model training pipeline using Auto ML.

### Connect to the Azure Machine Learning Workspace

In [None]:
import pandas as pd
import numpy as np
import logging

import azureml
from azureml.core import Run
from azureml.core import Workspace
from azureml.core.model import Model
from azureml.core.experiment import Experiment
from azureml.train.automl.run import AutoMLRun
from azureml.train.automl import AutoMLConfig
from azureml.core import Dataset

# Verify AML SDK Installed
# view version history at https://pypi.org/project/azureml-sdk/#history
#print("Pandas Version:", pd.__version__)
#print("Numpy Version:", np.__version__)
#print("SDK Version:", azureml.core.VERSION)

#### Configure access to the Azure Machine Learning resources
To begin, provide the following information about the Azure Subscription.

**If you are using your own Azure subscription, please provide names for subscription_id, resource_group, workspace_name and workspace_region to use.** Note that the workspace needs to be of type [Machine Learning Workspace](https://docs.microsoft.com/en-us/azure/machine-learning/service/setup-create-workspace).

In the following cell, be sure to set the values for `subscription_id`, `resource_group`, `workspace_name` and `workspace_region` as directed by the comments (*these values can be acquired from the Azure Portal*).

To get these values, do the following:
1. Navigate to the Azure Portal and login with the credentials provided.
2. From the left hand menu, under Favorites, select `Resource Groups`.
3. In the list, select the resource group with the name similar to `MCW-AI-Lab`.
4. From the Overview tab, capture the desired values.

In addition to these, be sure to set the `experiment_name` with the name of the experiment in training the model with Automated Machine Learning.


In [6]:
#Provide the Subscription ID of your existing Azure subscription
subscription_id = "281b526e-0f57-4142-ae7c-b89b634fd26e" # <- subscription id used for hands-on lab

#Provide values for the existing Resource Group 
resource_group = "MCW-AI-Lab"

#Provide the Workspace Name and Azure Region of the Azure Machine Learning Workspace
workspace_name = "AML-workspace-181384"
workspace_region = "westus2" # <- region of your resource group

#Provide the name of the Automated ML experiment you executed previously
experiment_name = "Battery-Cycles"

Connect to your **Azure Machine Learning Workspace**

**Important Note**: You will be prompted to login in the text that is output below the cell. Be sure to navigate to the URL displayed and enter the code that is provided. Once you have entered the code, return to this notebook and wait for the output to read `Workspace Provisioning complete`.

In [8]:
# By using the exist_ok param, if the worskpace already exists we get a reference to the existing workspace
ws = Workspace.create(
    name = "AML-workspace-181384",
    subscription_id = "281b526e-0f57-4142-ae7c-b89b634fd26e",
    resource_group = "ODL-ml-181384", 
    location = "westus2",
    exist_ok = True)

print("Workspace Provisioning complete")

### Retrieve and Review the Registered Training Dataset
The next cell will retrieve the training dataset that was registered in **Exercise 1** with the Azure Machine Learning Workspace.

In [10]:
dataset_name = 'daily-battery-time-series'

dataset = ws.datasets[dataset_name]

dataset.take(5).to_pandas_dataframe()

Unnamed: 0,Date,Battery_ID,Battery_Age_Days,Daily_Trip_Duration,Daily_Cycles_Used
0,2013-01-01,0,0,67.845608,0.16692
1,2013-01-02,0,1,53.450798,0.131505
2,2013-01-03,0,2,58.841433,0.144767
3,2013-01-04,0,3,60.638403,0.149188
4,2013-01-05,0,4,62.64691,0.15413


### Instantiate an Auto ML Config
Next, configure the forecasting Auto ML run. Some of the important configurations include, target column, time column, groupby attributes, maximum horizon, primary evaluation metric, training dataset, number of iterations, and number of cross-validations.

In [12]:
target_column_name = 'Daily_Cycles_Used'
time_column_name = 'Date'
groupby_column_names = ['Battery_ID']
forecast_horizon = 30
primary_metric = 'normalized_root_mean_squared_error'
project_folder = './automl-forecasting'

time_series_settings = {
    'time_column_name': time_column_name,
    'grain_column_names': groupby_column_names,
    'max_horizon': forecast_horizon
}

automl_config = AutoMLConfig(task='forecasting',
                             training_data = dataset, 
                             label_column_name=target_column_name, 
                             iterations=3, 
                             iteration_timeout_minutes = 5, 
                             max_cores_per_iteration = 1, 
                             primary_metric=primary_metric, 
                             preprocess=True, 
                             n_cross_validations = 3, 
                             debug_log = 'automl.log', 
                             verbosity = logging.DEBUG, 
                             path = project_folder, 
                             **time_series_settings)

### Create and Run Experiment

create and run the experiment on the Azure Databricks cluster. Note this will take a few minutes.

In [14]:
# create a new experiment
experiment = Experiment(ws, experiment_name)

# submit the run
automl_run = experiment.submit(automl_config, show_output=True)

### Get the best run and the best trained model

At this point you have multiple runs, each with a different trained model pipeline. How can you get the model pipeline that performed the best? Run the following cells to learn how.

In [16]:
best_run, best_model = automl_run.get_output()

## Understand how to score time-series data using a static DataFrame
Before implementing the scoring against a Spark Structured Streaming DataFrame, it is often useful to prototype the approach using a static data set. The following cells will do just that. It uses the above trained model and forecasts using that data loaded directly from disk.

The next cell downloads the sample data set that represents the incoming data you will process with the Spark streaming query. While this approach leverages a CSV file based source, this same approach used here could be applied to process data from IoT Hub, Event Hubs and Kafka sources.

In [19]:
import uuid

# Create a temporary folder to store locally relevant content for this notebook
session_id = uuid.uuid4()
tempFolderName = '/FileStore/temp_{0}'.format(session_id)
dbutils.fs.mkdirs(tempFolderName)
print('Content files will be saved to {0}'.format(tempFolderName))

import os
filesToDownload = ['daily-battery-time-series-v2.csv']

for fileToDownload in filesToDownload:
  downloadCommand = 'wget -O ''/dbfs{0}/{1}'' ''https://databricksdemostore.blob.core.windows.net/data/connected-car/{1}'''.format(tempFolderName, fileToDownload)
  print(downloadCommand)
  os.system(downloadCommand)
  
#List all downloaded files
dbutils.fs.ls(tempFolderName)

Now, the data is loaded from the provided CSV to createbthe test data sets. In this case I am using the last 500 rows as my test dataset.

In [21]:
data_path = '/dbfs{0}/daily-battery-time-series-v2.csv'.format(tempFolderName)
pandas_df = pd.read_csv(data_path, sep=',')
data = pandas_df

field_to_predict = 'Daily_Cycles_Used'
X_test = data.iloc[-500:][['Date','Battery_ID','Battery_Age_Days','Daily_Trip_Duration',field_to_predict]] 
y_test = X_test.pop(field_to_predict).values

Next, the best trained model pipeline is applied to forecast the battery cycles used for some rows of the test data.

In [23]:
y_predict = best_model.predict(X_test.iloc[0:30])
y_actual = y_test[0:30].flatten()

print("predicted: ", y_predict)
print("actual: ", y_actual)

## Create the Spark Streaming Query

With an idea of how to score against a static data set, the scoring is now incorporated into the processing of the streaming microbatches. 

The next cell defines the schema of the streaming data frame.

In [26]:
from pyspark.sql.types import StructField, StructType, StringType, Row
userSchema = StructType().add("Idx", "integer").add("Date", "date").add("Battery_ID", "integer").add("Battery_Age_Days", "integer") \
             .add("Number_Of_Trips", "integer").add("Daily_Trip_Duration", "float").add("Daily_Cycles_Used", "float") \
             .add("Lifetime_Cycles_Used", "float").add("Battery_Rated_Cycles", "float")

To leverage the model for scoring microbatches, a function need to be defined with the scoring logic that will write the scored results to the desired destination. 

In this case, the scored results will be written out to a Databricks Delta table. The following cells  will create the helper functions for scoring and saving.

In [28]:
deltaDataPath = '/tmp/{0}'.format(session_id)
deltaDataPath

def foreach_batch_scorerer(df, epoch_id):
    # Transform and write batchDF
    scored_df = score_batch(df)
    write_scored_results(scored_df)
    pass

def score_batch(df):
  pandas_df = df.toPandas()
  pandas_df = pandas_df[['Date','Battery_ID','Battery_Age_Days','Daily_Trip_Duration']] 
  y_predict = best_model.predict(pandas_df)
  pandas_df['forecast'] = y_predict
  pandas_df = pandas_df[['Date','Battery_ID','Battery_Age_Days','Daily_Trip_Duration', 'forecast']] 
  scored_df = spark.createDataFrame(pandas_df)
  return scored_df

def write_scored_results(scored_df):
  (scored_df
    .write
    .format("delta")
    .partitionBy("Battery_ID")
    .mode("append")
    .save(deltaDataPath)
  )

The next cell defines the streaming source.

In [30]:
csvDF = spark \
    .readStream \
    .option("sep", ",") \
    .schema(userSchema) \
    .csv(tempFolderName, header=True) 

The next cell starts the streaming query. Notice the use of `foreachBatch` as sink (the destination for the data).

In [32]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

# csvDF.writeStream.format("memory").queryName("metrics").start()
# streaming_query = csvDF.writeStream.foreach(ForeachScorer()).start()
streaming_query = csvDF.writeStream.foreachBatch(foreach_batch_scorerer).start()   

After the stream has started processing (e.g., the above cell's output displays a GUID instead of the phrase `stream initializing`),  query the Delta table. The next cell shows the scored result.

In [34]:
reloaded_df = spark.read.format("delta").load(deltaDataPath)
display(reloaded_df)

Date,Battery_ID,Battery_Age_Days,Daily_Trip_Duration,forecast
2015-02-10T00:00:00.000+0000,0,770,49.45682,0.1389919886632906
2015-02-11T00:00:00.000+0000,0,771,58.761208,0.1424662690718341
2015-02-12T00:00:00.000+0000,0,772,58.99486,0.1425533108069138
2015-02-13T00:00:00.000+0000,0,773,51.401432,0.1397175233935393
2015-02-14T00:00:00.000+0000,0,774,56.159653,0.1414941516957995
2015-02-15T00:00:00.000+0000,0,775,64.706314,0.1446854786906007
2015-02-16T00:00:00.000+0000,0,776,61.473648,0.143478109661949
2015-02-17T00:00:00.000+0000,0,777,62.008133,0.1436774891715231
2015-02-18T00:00:00.000+0000,0,778,54.35206,0.1408183085608427
2015-02-19T00:00:00.000+0000,0,779,56.89744,0.141768607994534
