# Model Building and Training with Databricks and Azure ML Services

This notebook constructs a machine learning model designed to predict component failure in the machine. <br> This includes running the `2_Feature_engineering` notebooks which takes the raw data as it would arrive from the machines we're interested in, <br> manipulates and transforms the raw data sets into a training data set which we then use to train the machine learning model to accurately predict the outcome of interest. <br> You must have already run the 1_data_ingestion notebook to download the raw predictive maintenance scenario data before running this notebook.

## Setup

The `2_Feature_Engineering` notebook takes parameters for which model to build (model), where to store the training data (features_table), and the start (start_date) and end (to_date) dates to use when creating the training data. 

Using these parameters, it creates the training data by calling the `./notebooks/2_Feature_Engineering` with the correct parameters. When the `./notebooks/2_Feature_Engineering` notebook completes, we can start running the other cells to build our model and start tracking our Experiment on Azure Machine Learning Services.

In [0]:
# Setup our environment by importing required libraries and secifying what data we want to examine using databricks parameters.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

#Used data files
training_table= 'training_data'
model_type = 'RandomForest' 

#Databricks paramaters to customize the runs
# Input widgets allow you to add parameters to your notebooks and dashboards
dbutils.widgets.removeAll()
dbutils.widgets.text("features_table", training_table)
dbutils.widgets.text("model", model_type)


dbutils.widgets.text("start_date", '2000-01-01')
dbutils.widgets.text("to_date", '2015-10-30')

### Feature Engineering

The `2_Feature_Engineering` notebook run below creates a labeled training data set using the parameters `start_date` and `to_date` to select the time period for training. This data set is stored in the `features_table` specified. After this cell completes, you should see the dataset named `training_data` under the Databricks `Data` icon.

In [0]:
# Run the feature engineering notebook

dbutils.notebook.run("2_Feature_Engineering", 600, {"features_table": dbutils.widgets.get("features_table"), 
                                                     "start_date": dbutils.widgets.get("start_date"), 
                                                     "to_date": dbutils.widgets.get("to_date")})

### Load the Azure ML Workspace

Before the model can be deployed and tracked on Azure ML, you must first create or define your Azure ML Workspace object. 

This defines the workspace you will be deploying your model to. 

Creating the object can be done by passing the name of your workspace, your Azure subscription ID and the resource group where your Azure ML workspace is located.

For more information about creating an Azure ML workspace, see the [Azure ML Workspace managmenet documentation](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-manage-workspace?tabs=python).

In [0]:
##Load an Azure Ml Workspace to link Databricks workspace to Azure Machine Learning Service

import mlflow
import mlflow.azureml
from azureml.core import Workspace

subscription_id = '52cbf6c7-01f2-4df2-bae9-c80cee4db7eb'

# Azure Machine Learning resource group NOT the managed resource group
resource_group = 'Peak-MLTemplates-RG' 

#Azure Machine Learning workspace name, NOT Azure Databricks workspace
workspace_name = 'peak-PM-ws'  

# Instantiate Azure Machine Learning workspace
ws = Workspace.get(name=workspace_name,
                   subscription_id=subscription_id,
                   resource_group=resource_group)

Load the training data and add databricks paramaters

In [0]:
#Databricks paramaters to customize the runs

spark = SparkSession.builder.getOrCreate()
dbutils.widgets.text("training_table",training_table)
dbutils.widgets.text("Model", model_type)

In [0]:
spark.catalog.refreshTable(dbutils.widgets.get("training_table")) 
train_data = spark.table(dbutils.widgets.get("training_table"))

# Prepare the Training data

A fundamental practice in machine learning is to calibrate and test your model parameters on data that has not been used to train the model. <br> Evaluation of the model requires splitting the available data into a training portion, a calibration portion and an evaluation portion.<br> Typically, 80% of data is used to train the model and 10% each to calibrate any parameter selection and evaluate your model.

In general random splitting can be used, but since time series data have an inherent correlation between observations; for predictive maintenance problems,<br> a time-dependent spliting strategy is often a better approach to estimate performance. <br> For a time-dependent split, a single point in time is chosen, the model is trained on examples up to that point in time, and validated on the examples after that point. <br> This simulates training on current data and score data collected in the future data after the splitting point is not known. <br> However, care must be taken on labels near the split point. <br> In this case, feature records within 7 days of the split point can not be labeled as a failure, since that is unobserved data.

In [0]:
# define list of input columns for downstream modeling

# We'll use the known label, and key variables.
label_var = ['label_e']
key_cols =['machineID','dt_truncated']

# Then get the remaining feature names from the data
input_features = train_data.columns

# Remove the known label, key variables and a few extra columns we won't need.
remove_names = label_var + key_cols + ['failure','model_encoded','model' ]

# Create the iout features 
input_features = [x for x in input_features if x not in set(remove_names)]

Spark models require a vectorized data frame. We transform the dataset here and then split the data into a training and test set. <br>
We use this split data to train the model on 9 months of data (training data), and evaluate on the remaining 3 months (test data) going forward.

In [0]:
# Import the libraries 
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier

# for creating pipelines and model
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer

# assemble features
va = VectorAssembler(inputCols=(input_features), outputCol='features')
train_data = va.transform(train_data).select('machineID','dt_truncated','label_e','features')

# set maxCategories so features with > 10 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", 
                               outputCol="indexedFeatures", 
                               maxCategories=10).fit(train_data)

# fit on whole dataset to include all labels in index
labelIndexer = StringIndexer(inputCol="label_e", outputCol="indexedLabel").fit(train_data)

training = train_data

### Prepare the Testing Data

To evaluate this model, we predict the component failures over the test data set.<br> Since the test set has been created from data the model has not been seen before, it simulates future data. <br> The evaluation can then be generalized to assess how the model could perform when operationalized and used to score new data.

In [0]:
# Add databricks paramaters

testing_table = 'testing_data'
dbutils.widgets.removeAll()
dbutils.widgets.text("Testing_table",testing_table)
dbutils.widgets.text("Model", model_type)
dbutils.widgets.text("start_date", '2015-11-30')
dbutils.widgets.text("to_date", '2016-02-01')

In [0]:
spark.catalog.setCurrentDatabase("default")
exists = False
for tbl in spark.catalog.listTables():
  if tbl.name == dbutils.widgets.get("Testing_table"):
    exists = True
    break

In [0]:
if not exists:
  dbutils.notebook.run("2_Feature_Engineering", 600, {"features_table": dbutils.widgets.get("Testing_table"), 
                                                       "start_date": dbutils.widgets.get("start_date"), 
                                                       "to_date": dbutils.widgets.get("to_date")})

In [0]:
#Load the data

test_data = spark.table(dbutils.widgets.get("Testing_table"))

# Testing data is prepared using the same steps used for the traning data 

# define list of input columns for downstream modeling

# We'll use the known label, and key variables.
label_var = ['label_e']
key_cols =['machineID','dt_truncated']

# Then get the remaining feature names from the data
input_features = test_data.columns

# Remove the known label, key variables and a few extra columns we won't need.
remove_names = label_var + key_cols + ['failure','model_encoded','model' ]

# Create the iout features 
input_features = [x for x in input_features if x not in set(remove_names)]

# assemble features
va = VectorAssembler(inputCols=(input_features), outputCol='features')

# assemble features
test_data = va.transform(test_data).select('machineID','dt_truncated','label_e','features')

# set maxCategories so features with > 10 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", 
                               outputCol="indexedFeatures", 
                               maxCategories=10).fit(test_data)

# fit on whole dataset to include all labels in index
labelIndexer = StringIndexer(inputCol="label_e", outputCol="indexedLabel").fit(test_data)

testing = test_data

## Classification Models

A particular problem in predictive maintenance is machine failures are usually rare occurrences compared to normal operation. This is fortunate for the business as maintenance and saftey issues are few, but causes an imbalance in the label distribution. This imbalance leads to poor performance as algorithms tend to classify majority class examples at the expense of minority class, since the total misclassification error is much improved when majority class is labeled correctly. This causes low recall or precision rates, although accuracy can be high. It becomes a larger problem when the cost of false alarms is very high. To help with this problem, sampling techniques such as oversampling of the minority examples can be used. These methods are not covered in this notebook. Because of this, it is also important to look at evaluation metrics other than accuracy alone.

We will build a Random Forest Classifier:

- **Random Forest Classifier**: A random forest is an ensemble of decision trees. Random forests combine many decision trees in order to reduce the risk of overfitting. Tree ensemble algorithms such as random forests and boosting are among the top performers for classification and regression tasks.

The next code block creates the model. A series of model hyperparametershave also been included to guide your exploration of the model space.

In [0]:
# import the libraries for creating pipelines and model

import mlflow
import mlflow.spark
from pyspark.sql import SparkSession
import numpy as np
import mlflow.pyfunc
from pyspark.ml import PipelineModel
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer

# Start the experient which we can now start tracking on Azure ML Service and build the model

with mlflow.start_run(): # Naming it will allow you to register the model
  rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", 
                              # Maximum depth of the tree. (>= 0) 
                              # E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'
                              maxDepth=15,
                              # Max number of bins for discretizing continuous features. 
                              # Must be >=2 and >= number of categories for any categorical feature.
                              maxBins=32,
                              # Minimum number of instances each child must have after split. 
                              # If a split causes the left or right child to have fewer than 
                              # minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.
                              minInstancesPerNode=1,
                              # Minimum information gain for a split to be considered at a tree node.
                              minInfoGain=0.0,
                              # Criterion used for information gain calculation (case-insensitive). 
                              # Supported options: entropy, gini')
                              impurity="gini",
                              # Number of trees to train (>= 1)
                              numTrees=200, 
                              # The number of features to consider for splits at each tree node. 
                              # Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n].
                              featureSubsetStrategy="sqrt", 
                              # Fraction of the training data used for learning each  
                              # decision tree, in range (0, 1].' 
                              subsamplingRate = 0.632)
  
  # chain indexers and model in a Pipeline
  pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])
  
  # train model.  This also runs the indexers.
  model = pipeline.fit(training)
  
  
  # Evaluate the model 
  
  # make predictions. The Pipeline does all the same operations on the test data
  predictions = model.transform(testing)
  
  # Create the confusion matrix for the multiclass prediction results
  # This result assumes a decision boundary of p = 0.5
  conf_table = predictions.stat.crosstab('indexedLabel', 'prediction')
  confuse = conf_table.toPandas()
  confuse.head()
  
  # Log MLflow Metrics and Model
  # select (prediction, true label) and compute test error
  # select (prediction, true label) and compute test error
  # True positives - diagonal failure terms
  tp = confuse['1.0'][1]+confuse['2.0'][2]+confuse['3.0'][3]+confuse['4.0'][4]
  # False positves - All failure terms - True positives
  fp = np.sum(np.sum(confuse[['1.0', '2.0','3.0','4.0']])) - tp
  # True negatives 
  tn = confuse['0.0'][0]
  # False negatives total of non-failure column - TN
  fn = np.sum(np.sum(confuse[['0.0']])) - tn
  
  # Accuracy is diagonal/total 
  acc_n = tn + tp
  acc_d = np.sum(np.sum(confuse[['0.0','1.0', '2.0','3.0','4.0']]))
  acc = acc_n/acc_d
  
  # Calculate precision and recall.
  prec = tp/(tp+fp)
  rec = tp/(tp+fn)
  
  # Calculate F1
  FOne = 2.0 * prec * rec/(prec + rec)
  
  # These metrics are logged onto Azure ML Service where you can track them
  
  # Log the evaluation metrics and model
  mlflow.log_metric("acc", acc)
  mlflow.log_metric("pre", prec)
  mlflow.log_metric("rec", rec)
  mlflow.log_metric("f1", FOne)
  
  # Log and register the model
  mlflow.spark.log_model(model, artifact_path = "model",
                        registered_model_name = "PM-RandomForest")
  
  last_run_id = mlflow.active_run().info.run_id

  # Print the evaluation metrics to the notebook
  print("Accuracy = %g" % acc)
  print("Precision = %g" % prec)
  print("Recall = %g" % rec )
  print("F1 = %g" % (2.0 * prec * rec/(prec + rec)))
  print("")
  
  mlflow.end_run()
  

After running the cell above go back to your Azure ML Workspace you should be able to see your runs. The runs are also logged on ML Flow UI on your Azure Databricks workspace. If you click on the `Experiment` symbol above, followed by the arrow showing `View Run Detail` you will be able to view the run details on your Datbricks Workspace.  


![](https://github.com/felicity-borg/BatchSparkScoringPredictiveMaintenance/blob/master/images/AzureML_Experiments.PNG?raw=true)

###Build an Azure Container Image for model deployment

### Use MLflow to build a Container Image for the trained model

Use the `mlflow.azuereml.build_image` function to build an Azure Container Image for the trained MLflow model. This function also registers the MLflow model with a specified Azure ML workspace. The resulting image can be deployed to Azure Container Instances (ACI) or Azure Kubernetes Service (AKS) for real-time serving.

Specify the last run ID associated with the model's training. You can find a run ID and model path from the experiment run, which can be found on the run details.

![](https://docs.azuredatabricks.net/_static/images/mlflow/mlflow-deployment-example-run-info.png)

In [0]:
# The latest run ID
run_id1 = "b05532fdd06e41be9d9c31ae3425c631"
# Builds URI for the model associated with the latest run
model_uri = "runs:/" + run_id1 + "/model"

Use the `mlflow.azuereml.build_image` function to build an Azure Container Image for the trained MLflow model and register the MLflow model with the specified Azure ML workspace

In [0]:
# first part of mlflow.azureml.build_image option which is getting depracted

import mlflow.azureml

azure_image, azure_model = mlflow.azureml.build_image(model_uri=model_uri, 
                                                      workspace=ws,
                                                      image_name = "pm-randomforest",
                                                      model_name= "predictive-maintenance-model",
                                                      #service_name = "predictive-maintenance",
                                                      description="RandomForest for predicting machine failure",
                                                      synchronous=False)


Once successfully run you can go back to your Azure ML workspace and click on `Models` on the right hand side. <br>
Here you can select the model you have ust registered and you will be preseneted with information about the model itself. <br>


![](https://github.com/felicity-borg/BatchSparkScoringPredictiveMaintenance/blob/master/images/azure_ml_model.PNG?raw=true)

If you go on `Artifacts`- this is ML flow's common model format; here you will see the pickle file (if there is one) and the format of the model (the configuration of the model)

![](https://github.com/felicity-borg/BatchSparkScoringPredictiveMaintenance/blob/master/images/Artificat.PNG?raw=true)

### Conclusion

This container image can then be deployed to Azure ML for staging and developmental model deployments using Azure Container Instances (ACI) <br>
or using Azure Kubernetes Service (AKS) for real-time serving