#Experiment Tracking and Model Deployment with MLFlow and Azure Machine Learning
</br>
This notebook walks through a basic Machine Learning example. Training runs will be logged to Azure Machine Learning using MLFlow's open-source APIs.  </br> Resulting model will then be deployed using MLFlow APIs as web service in Azure Machine Learning

### In this problem, we create a machine learning model to predict active energy burned in calories based on date.

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import mlflow
import mlflow.spark
import mlflow.sklearn
import mlflow.azureml
import azureml
import azureml.mlflow
import azureml.core
from azureml.core import Workspace
from azureml.mlflow import get_portal_url
from azureml.core.authentication import ServicePrincipalAuthentication
service_principal_clientid = dbutils.secrets.get(scope = "Secret_Scope", key ="Service Principal ID") # Service Principal ID
service_principal_secret = dbutils.secrets.get(scope = "Secret_Scope", key ="Service Principal Secret") # Service Principal Secret
subscription_id = dbutils.secrets.get(scope = "Secret_Scope", key ="subscription-id") 
tenant_id = dbutils.secrets.get(scope = "Secret_Scope", key ="tenant-id") 

# Connect to Azure Machine Learning space
ws = Workspace(
       subscription_id=subscription_id,
       resource_group='resource_group_name',
       workspace_name='workspace_name',
       auth=ServicePrincipalAuthentication(tenant_id, service_principal_clientid , service_principal_secret))


##Setup
Setup requires that a) the Databricks Workspace is linked with the AML workspace and b) the MLFlow tracking URI is pointed to AML.

##Data import and cleaning
The training data for this notebook is active energy burned data from apple health app.  
The data can be downloaded in CSV from data folder in github repo.

In [None]:
activitysummarydf = spark.read.options(header='True').csv("/mnt/datalake/Bronze/apple_health_export/source/csv/ActivitySummary.csv")

display(activitysummarydf.limit(10))

activitysummarydf2 = activitysummarydf["dateComponents","activeEnergyBurned"]
activitysummarydf2 = activitysummarydf2.withColumnRenamed('dateComponents','date')

from pyspark.sql.functions import to_date
from pyspark.sql.types import FloatType
df1 = activitysummarydf2.withColumn('date',to_date(activitysummarydf2.date, 'yyyy-MM-dd'))
df1 = df1.withColumn("activeEnergyBurned",df1.activeEnergyBurned.cast('Float'))
df1.printSchema()


import pyspark.sql.functions as f
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
df2 = df1.withColumn("date", f.date_format(col("date"), "yyyyMMdd"))
df2 = df2.withColumn("date", df2['date'].cast(FloatType()))


df2.count()

dataDf = df2.filter((df2.activeEnergyBurned != 0))
dataDf.show()
dataDf.printSchema()

##Experiment Tracking with MLFlow and AML

MLFlow logging APIs will be used to log training experiments, metrics, and artifacts to AML.

In [None]:
mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())
#Set MLFlow Experiment
experimentName = "kdactiveenergyburned"
mlflow.set_experiment(experimentName)


# mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())

<img src="https://mcg1stanstor00.blob.core.windows.net/images/demos/Ignite/skl.jpg" alt="SciKit Learn" width="150">

In [None]:
import pandas as pd
from sklearn.linear_model import LinearRegression, Lasso
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.ensemble import RandomForestRegressor

#Setup Test/Train datasets
data = dataDf.toPandas()

x = data.drop(["activeEnergyBurned"], axis=1)
y = data[["activeEnergyBurned"]]
train_x, test_x, train_y, test_y = train_test_split(x,y,test_size=0.20, random_state=30)

#Train Models
Activity = "activeEnergyBurned"

with mlflow.start_run() as run:

  maxDepth = 10

 # Fit, train, and score the model
  model = RandomForestRegressor(max_depth = maxDepth)
  model.fit(train_x, train_y.values.ravel())
  preds = model.predict(test_x)

  # Get Metrics
  mse = mean_squared_error(test_y, preds)
  r2 = r2_score(test_y, preds)


  # Log Metrics and Model
  mlflow.log_metric('mse', mse)
  mlflow.log_metric('r2', r2)
  mlflow.sklearn.log_model(model, "model")

  run_id = run.info.run_id
  print('Run ID: ', run_id)
  model_uri = "runs:/" + run_id + "/model"
  print('model_uri: ', model_uri)

  # Build Metrics Table
  results = [[Activity, maxDepth, mse, r2]]
  runResults = pd.DataFrame(results, columns =['Activity', 'MaxDepth', 'MSE', 'r2'])

  mlflow.end_run()

  
display(runResults)

#### Deploy Model as a Web Service in AML

In [None]:
#Create the model and docker image in AML
model_image, azure_model = mlflow.azureml.build_image(model_uri=model_uri, 
                                                      workspace=ws,
                                                      model_name=experimentName + "-model",
                                                      image_name=experimentName + "-image",
                                                      synchronous=False)

model_image.wait_for_creation(show_output=True)

In [None]:
from azureml.core.webservice import AciWebservice, Webservice

#Create the web service in AML
webservice_name = experimentName + "-service"
webservice_deployment_config = AciWebservice.deploy_configuration(cpu_cores=1, memory_gb=1)
aci_webservice = Webservice.deploy_from_image(name=webservice_name, image=model_image, deployment_config=webservice_deployment_config, workspace=ws)
aci_webservice.wait_for_deployment(show_output=True) 

In [None]:
##Get the Web Service URI 
uri = aci_webservice.scoring_uri
# uri = "http://16ec85d9-e17c-4600-9da1-bfc737a2551b.eastus.azurecontainer.io/score"
print(uri)

##### Score Using Web Service URI with existing dataset

In [None]:
# Create a sample data 
from sklearn import datasets
import pandas as pd
import numpy as np
import requests
import json

sample = data.drop(["activeEnergyBurned"], axis=1).tail(10)
                                                
query_input = sample.to_json(orient='split')
query_input = eval(query_input)
query_input.pop('index', None)


# sending an HTTP request
def query_endpoint_example(scoring_uri, inputs, service_key=None):
  headers = {
    "Content-Type": "application/json",
  }
  if service_key is not None:
    headers["Authorization"] = "Bearer {service_key}".format(service_key=service_key)
    
  print("Sending batch prediction request with inputs: {}".format(inputs))
  response = requests.post(scoring_uri, data=json.dumps(inputs), headers=headers)
  preds = json.loads(response.text)
  print("Received response: {}".format(preds))
  return preds

In [None]:
aci_webservice.scoring_uri
prediction = query_endpoint_example(scoring_uri=aci_webservice.scoring_uri, inputs=query_input)

##### Score Using Web Service URI using new data

In [None]:
# Create input data for the API
sample_json = {
    "columns": [
        "date"
        ],
    "data": [
        [20210806]
    ]
}

print(sample_json)

# {'columns': ['date'], 'data': [[20180304.0]]}

In [None]:
import requests
import json

# Function for calling the API
def service_query(input_data):
  response = requests.post(
              url=uri, data=json.dumps(input_data),
              headers={"Content-type": "application/json"})
  prediction = response.text
  print(prediction)
  return prediction

# API Call
service_query(sample_json)

### 4. Deploy the model to production using [Azure Kubernetes Service (AKS)](https://azure.microsoft.com/en-us/services/kubernetes-service/)

#### Create a AKS cluster

If you do not have an existing AKS cluster for model deployment, create one using the Azure ML SDK.
</br></br> https://docs.microsoft.com/en-us/azure/machine-learning/how-to-deploy-azure-kubernetes-service?tabs=python
</br></br> https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.compute.aks.akscompute?view=azure-ml-py

In [None]:
from azureml.core.compute import AksCompute, ComputeTarget

# I am using the default configuration (you can provide parameters to customize this)
prov_config = AksCompute.provisioning_configuration()

aks_cluster_name = "aks-cluster" 
# Create the cluster
aks_target = ComputeTarget.create(workspace = ws, 
                                  name = aks_cluster_name, 
                                  provisioning_configuration = prov_config)

# Wait for the create process to complete
aks_target.wait_for_completion(show_output = True)
print(aks_target.provisioning_state)
print(aks_target.provisioning_errors)

#### Connect to AKS cluster in your workspace

In [None]:
from azureml.core.compute import AksCompute, ComputeTarget

# Give the cluster a local name
aks_cluster_name = "aks-cluster"

aks_target = ComputeTarget(workspace=ws, name=aks_cluster_name)
print(aks_target.provisioning_state)
print(aks_target.provisioning_errors)

### Deploy to the model's image to AKS cluster

In [None]:
from azureml.core.webservice import Webservice, AksWebservice

# Set configuration and service name
aks_webservice_name = "activeenergy-prod"
aks_webservice_deployment_config = AksWebservice.deploy_configuration()

# Deploy from image
aks_webservice = Webservice.deploy_from_image(workspace = ws, 
                                               name = aks_webservice_name,
                                               image = model_image,
                                               deployment_config = aks_webservice_deployment_config,
                                               deployment_target = aks_target)

In [None]:
# Wait for the deployment to complete
aks_webservice.wait_for_deployment(show_output = True)

Query the AKS webservice scoring endpoint by sending an HTTP POST request with input vector. Include authorization token (service key) for queries in HTTP request header if required.

In [None]:
import requests
import json

def query_endpoint_example(scoring_uri, inputs, service_key=None):
  headers = {
    "Content-Type": "application/json",
  }
  if service_key is not None:
    headers["Authorization"] = "Bearer {service_key}".format(service_key=service_key)
    
  print("Sending batch prediction request with inputs: {}".format(inputs))
  response = requests.post(scoring_uri, data=json.dumps(inputs), headers=headers)
  preds = json.loads(response.text)
  print("Received response: {}".format(preds))
  return preds

aks_scoring_uri = aks_webservice.scoring_uri
aks_service_key = aks_webservice.get_keys()[0] if len(aks_webservice.get_keys()) > 0 else None
aks_prediction1 = query_endpoint_example(scoring_uri=aks_scoring_uri, service_key=aks_service_key, inputs=query_input)

### Clean up the deployments

In [None]:
aci_webservice.delete()
aks_webservice.delete()
aks_target.delete()