# An End to End ML Demo with Azure Databricks and MLflow integrated with Azure ML
### Read Data, Build ML Model (Spark ML & Scikit learn), Track with MLflow, Compare Models, Model Registry, Deploy to production as batch with UDF & as REST endpoint with AML

### ML Objective:
Here we use a timeseries data from 5 sensors. Goal is to create a ML model that can predict Sensor 5 value based on other sensors

###Import Training Data
<img src="https://mcg1stanstor00.blob.core.windows.net/images/demos/Ignite/delta.jpg" alt="Delta" width="600">
</br></br>
The training data for this notebook is simply some time series data from devices that includes a collection of sensor readings.  
The data is stored in the Delta Lake format.  The data can be downloaded in CSV [here](https://mcg1stanstor00.blob.core.windows.net/publicdata/sensors/sensordata.csv).

### Initial Setup

In [0]:
#Install required modules
dbutils.library.installPyPI("azureml-mlflow")
dbutils.library.restartPython()

In [0]:
!from pyspark.sql.types import *
from pyspark.sql.functions import *
import mlflow
import mlflow.spark
import mlflow.sklearn
import mlflow.azureml
import azureml
import azureml.core
from azureml.core import Workspace

### Review Data

In [0]:
%sh

ls -la

In [0]:
# Here data is already in Delta lake and registered as table within Databricks
# To simulate, download the data mentioned above, use the 'Data' tab on the left sidebar to upload and set as table.
dataDf = spark.table("sensor").where(col('Device') == 'Device001')
display(dataDf)

Device,Time,Sensor1,Sensor2,Sensor3,Sensor4,Sensor5
Device001,2016-02-14T10:10:42.000+0000,64.69588,20140.758,96.5266,62.397552,4.0006847
Device001,2016-02-14T11:26:30.000+0000,63.9157,20210.74,217.18486,91.76922,4.780864
Device001,2016-02-14T11:16:33.000+0000,65.18122,20280.723,291.51035,129.75934,3.565236
Device001,2016-02-14T09:15:48.000+0000,66.40139,19902.82,89.35606,60.260178,2.3450725
Device001,2016-02-14T09:17:15.000+0000,66.582825,19902.82,108.316635,68.46494,2.086525
Device001,2016-02-14T09:25:57.000+0000,65.58492,19944.809,218.01222,94.80291,3.0572126
Device001,2016-02-14T09:48:15.000+0000,65.7709,20056.78,115.28034,71.084946,2.8984551
Device001,2016-02-14T10:01:15.000+0000,64.75031,20056.78,96.04397,63.500713,3.9326458
Device001,2016-02-14T10:14:36.000+0000,64.34208,20056.78,147.75465,83.7713,4.354487
Device001,2016-02-14T11:04:33.000+0000,65.27648,20238.732,98.59503,62.397552,3.4064786


#Experiment Tracking and Model Deployment 
##with MLFlow and Azure Machine Learning
<img src="https://raw.githubusercontent.com/iheartdatascience/ignite2020/master/aml_adb.jpg" alt="Better Together" width="800">
</br></br>
This notebook walks through a basic Machine Learning example. Training runs will be logged to Azure Machine Learning using MLFlow's open-source APIs.  </br> A resulting model from one of the models will then be deployed using MLFlow APIs as a) a Spark Pandas UDF for batch scoring and b) a web service in Azure Machine Learning

##Basic Setup
<img src="https://raw.githubusercontent.com/iheartdatascience/ignite2020/master/notebookimage1.JPG" alt="Basic Setup" width="600">
</br></br>

Basic setup requires that the Databricks Workspace is linked with the AML workspace

##Experiment Tracking with MLFlow and AML
<img src="https://raw.githubusercontent.com/iheartdatascience/ignite2020/master/experiment.jpg" alt="Experiment Tracking" width="750">
</br>
MLFlow logging APIs will be used to log training experiments, metrics, and artifacts to AML.

In [0]:
#Set MLFlow Experiment
experimentName = "/AML-MLFLOW-DBX-DEMO/ML_with_ADB_and_AML-LINGARO"
mlflow.set_experiment(experimentName)

### ML Model with Spark ML
<img src="https://mcg1stanstor00.blob.core.windows.net/images/demos/Ignite/spark.jpg" alt="Spark" width="150">

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# Split the data into training and test sets (30% held out for testing)
(train_data, test_data) = dataDf.randomSplit([0.7, 0.3])

In [0]:
# Incorporate all input fields as vector for regression pipeline
assembler = VectorAssembler(
    inputCols=["Sensor1", "Sensor2", "Sensor3", "Sensor4"],
    outputCol="features")

In [0]:
def regresionModel(stages, params, train, test):
  pipeline = Pipeline(stages=stages)
  
  with mlflow.start_run(run_name="Sensor Regression") as ml_run:
    for k,v in params.items():
      mlflow.log_param(k, v)

    model = pipeline.fit(train)
    predictions = model.transform(test)

    # Select (prediction, true label) and compute test error
    evaluator = RegressionEvaluator(
        labelCol="Sensor5", predictionCol="prediction", metricName="mse")
    mse = evaluator.evaluate(predictions)

    evaluator = RegressionEvaluator(
        labelCol="Sensor5", predictionCol="prediction", metricName="r2")
    r2 = evaluator.evaluate(predictions)

    #Log MLFlow Metrics and Model
    mlflow.log_metric("mse", mse)
    mlflow.log_metric("r2", r2)
    mlflow.spark.log_model(model, "model")

    print("Documented with MLflow Run id %s" % ml_run.info.run_uuid)
  
  return mse, r2, ml_run.info

In [0]:
numTreesList = [10, 25]
maxDepthList = [5, 10]
for numTrees, maxDepth in [(numTrees,maxDepth) for numTrees in numTreesList for maxDepth in maxDepthList]:
  params = {"numTrees":numTrees, "maxDepth":maxDepth, "model": "Radom Forest Regressor - SparkML"}
  rf = RandomForestRegressor(featuresCol="features", labelCol="Sensor5", numTrees=numTrees, maxDepth=maxDepth)
  mse, r2, ml_run_info = regresionModel([assembler, rf], params, train_data, test_data)
  print("Trees: %s, Depth: %s, MSE: %s, R2: %s\n" % (numTrees, maxDepth, mse, r2))

### ML Model with Scikit Learn
<img src="https://mcg1stanstor00.blob.core.windows.net/images/demos/Ignite/skl.jpg" alt="SciKit Learn" width="150">

In [0]:
import pandas as pd
from sklearn.linear_model import LinearRegression, Lasso
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.ensemble import RandomForestRegressor

#Setup Test/Train datasets
data = dataDf.toPandas()

x = data.drop(["Device", "Time", "Sensor5"], axis=1)
y = data[["Sensor5"]]
train_x, test_x, train_y, test_y = train_test_split(x,y,test_size=0.20, random_state=30)

#Train Models
device = "Device001"

resultsPdf = pd.DataFrame()
for numTrees, maxDepth in [(numTrees,maxDepth) for numTrees in numTreesList for maxDepth in maxDepthList]:
  with mlflow.start_run(run_name="Sensor Regression"):
    
    mlflow.log_param("maxDepth", maxDepth)
    mlflow.log_param("numTrees", numTrees)
    mlflow.log_param("model", "Radom Forest Regressor - scikit")
    
    # Fit, train, and score the model
    model = RandomForestRegressor(max_depth = maxDepth, n_estimators = numTrees)
    model.fit(train_x, train_y)
    preds = model.predict(test_x)

    # Get Metrics
    mse = mean_squared_error(test_y, preds)
    r2 = r2_score(test_y, preds)

    # Log Metrics and Model
    mlflow.log_metric('mse', mse)
    mlflow.log_metric('r2', r2)
    mlflow.sklearn.log_model(model, "model")

    # Build Metrics Table
    results = [[device, maxDepth, numTrees, mse, r2]]
    runResultsPdf = pd.DataFrame(results, columns =['Device', 'MaxDepth', 'NumTrees', 'MSE', 'r2'])
    resultsPdf = resultsPdf.append(runResultsPdf)

    last_run_id = mlflow.active_run().info.run_id
  
display(resultsPdf)

Device,MaxDepth,NumTrees,MSE,r2
Device001,5,10,0.0108029270131244,0.995850167699943
Device001,10,10,0.0041177568865175,0.9984182064258424
Device001,5,25,0.0069467140941208,0.9973314918732652
Device001,10,25,0.0042928883260452,0.99835093150085


## Model Deployment
<img src="https://raw.githubusercontent.com/iheartdatascience/ignite2020/master/model_deployment.jpg" alt="Model Deployment" width="800">
</br></br>
Using MLFlow APIs, models can be deployed to AML and turned into web services, or they can be deployed as MLFlow model objects 
</br>and used in streaming or batch pipelines as Python functions or Pandas UDFs.

### Deploy Model for Batch Scoring
<img src="https://mcg1stanstor00.blob.core.windows.net/images/demos/Ignite/deploylake.jpg" alt="Model Deployment" width="800">
</br></br>
Using MLFlow APIs, the Scikit Learn MLFlow Model will be exported out of AML and put in the Data Lake where it can be more widely accessed.

In [0]:
model_uri = "runs:/"+last_run_id+"/model"

#### Use Apache Spark for Batch Scoring
<img src="https://raw.githubusercontent.com/iheartdatascience/ignite2020/master/batch_scoring.jpg" alt="Model Deployment" width="800">
</br></br>
The MLFlow model will be loaded and used as a Spark Pandas UDF to score new data.

In [0]:
from pyspark.sql.types import ArrayType, FloatType

#Create a Spark UDF for the MLFlow model
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri)

#Load Scoring Data into Spark Dataframe
scoreDf = spark.table("sensor").where(col('Device') == 'Device100')

#Make Prediction
preds = (scoreDf
           .withColumn('Sensor5-prediction', pyfunc_udf('Sensor1', 'Sensor2', 'Sensor3', 'Sensor4'))
        )
display(preds)

Device,Time,Sensor1,Sensor2,Sensor3,Sensor4,Sensor5,Sensor5-prediction
Device100,2016-02-14T09:11:21.000+0000,66.433136,19846.834,371.21375,166.30154,2.249818,2.2576666357508683
Device100,2016-02-14T09:25:24.000+0000,65.512344,19944.809,276.27292,138.44673,3.206898,3.1960942203979723
Device100,2016-02-14T09:31:09.000+0000,65.7845,19888.824,177.60895,82.11656,2.902991,2.897775554438775
Device100,2016-02-14T09:56:51.000+0000,64.85918,20084.773,100.80135,65.50019,3.8419273,3.839833536104482
Device100,2016-02-14T09:57:48.000+0000,65.131325,20014.791,99.83608,63.84545,3.565236,3.560113397311342
Device100,2016-02-14T10:36:18.000+0000,65.3173,20126.762,236.28333,96.18186,3.3974068,3.388259711994757
Device100,2016-02-14T10:37:51.000+0000,65.33998,20140.758,101.008194,63.569664,3.3565836,3.3485159751992013
Device100,2016-02-14T10:58:51.000+0000,66.10655,20196.744,99.28451,63.08703,2.5809405,2.579870296023809
Device100,2016-02-14T11:34:51.000+0000,67.01374,20336.707,96.59555,59.432808,1.5739655,1.7622244935999998
Device100,2016-02-14T09:00:12.000+0000,65.43977,19832.838,173.47209,74.18759,3.2295778,3.243645210583316


### Deploy Model as a Web Service in AML
<img src="https://mcg1stanstor00.blob.core.windows.net/images/demos/Ignite/deploywebservice.jpg" alt="Model Deployment" width="800">
</br></br>
The MLFlow model will conainerized and deployed as a web service with AML and Azure Container Instances

In [0]:
workspace_name = ""
workspace_location=""
resource_group = ""
subscription_id = ""

workspace = Workspace.create(name = workspace_name,
                             subscription_id = subscription_id,
                             resource_group = resource_group,
                             location = workspace_location,
                             exist_ok=True)


In [0]:
workspace.get_details()

In [0]:
experimentName = "lingaro-wed-morning"

azure_service, azure_model = mlflow.azureml.deploy(model_uri=model_uri,
                                                   service_name=experimentName + "-service",
                                                   workspace=workspace,
                                                   synchronous=True)

##### Score Using Web Service URI

In [0]:
# Create input data for the API
sample_json = {
    "columns": [
        "Sensor1",
        "Sensor2",
        "Sensor3",
        "Sensor4"
    ],
    "data": [
        [65.7845, 16613.676, 101.69767,	60.329124]
    ]
}

print(sample_json)

In [0]:
##Get the Web Service URI 
# uri = azure_service.scoring_uri

uri = "http://90d6b311-766d-4cc6-824a-aa32893d914a.westeurope.azurecontainer.io/score"

In [0]:
import requests
import json

# Function for calling the API
def service_query(input_data):
  response = requests.post(
              url=uri, data=json.dumps(input_data),
              headers={"Content-type": "application/json"})
  prediction = response.text
  print(prediction)
  return prediction



# API Call
service_query(sample_json)