# Project: Managing the ModelLife Cycle with MLflow and GCP

This notebook walks through the process of:

    1. Training a PySpark model on Boston House Prices
    2. Saving the model with MLflow (Mleap flavor)
    3. Store Model in Github

#### Author: 

**Nardini, Ivan - Sr. Customer Advisor | CI & Analytics Team | ModelOps & Decisioning**

## Setup

    1. Launch a Python 3 cluster running Databricks Runtime 5.0
    2. Install the MLeap Scala libraries (maven)
    3. Install MLflow and MLeap libraries


## Create a cluster and install MLflow and MLeap on the cluster

    1. Create a cluster specifying:
      - Databricks Runtime Version: Databricks Runtime 5.0 or above
      - Python Version: Python 3

    2. Install required libraries: 
      - Create library with Source Maven Coordinate and the fully-qualified Maven artifact coordinate: ml.combust.mleap:mleap-spark_2.11:0.13.0
      - Install the libraries into the cluster.

    3. Install required Python library 
      - Create required library: Source PyPI and enter mlflow[extras].
      - Install the libraries into the cluster.

    4. Attach this notebook to the cluster.
    
**Notice**: You can install mlflow and mleap libraries from notebook as well. Below the commands

    - dbutils.library.installPyPI("mlflow", "1.7.0", extras="extras")
    - dbutils.library.installPyPI("mleap", "0.15.0", extras="extras")
    - dbutils.library.restartPython()

Boston House Prices
-------------------
[https://archive.ics.uci.edu/ml/machine-learning-databases/housing/]( https://archive.ics.uci.edu/ml/machine-learning-databases/housing/)

Contains information collected by the U.S. Census Service regarding housing in the Boston, Massachusetts area.

Originally published by Harrison, D. and Rubinfeld, D.L. `Hedonic prices and the demand for clean air', J. Environ. Economics & Management, vol.5, 81-102, 1978.

Rows: 506  

|Column|Type|Description        |
|------| :---: |----------------|
|crim|float|per capita crime rate by town|
|zn|float|proportion of residential land zoned for lots over 25,000 sq.ft|
|indus|float|proportion of non-retail business acres per town|
|chas|int|Charles River dummy variable (= 1 if tract bounds river; 0 otherwise)|
|nox|float|nitric oxides concentration (parts per 10 million)|
|rm|float|average number of rooms per dwelling|
|age|float|proportion of owner-occupied units built prior to 1940|
|dis|float|weighted distances to five Boston employment centres|
|rad|float|index of accessibility to radial highways|
|tax|float|full-value property-tax rate per 10,000 dollars|
|ptratio|float|pupil-teacher ratio by town|
|b|float|1000(Bk - 0.63)^2 where Bk is the proportion of blacks by town|
|lstat|float|% lower status of the population|
|medv|float|median value of owner-occupied homes in 1000’s dollars|

## Spark session

In the Databricks notebook, when you create a cluster, the SparkSession is created for you. In both cases it’s accessible through a variable called spark.

In [None]:
spark

In [None]:
dbutils.library.installPyPI("mlflow", "1.7.0", extras="extras")
dbutils.library.installPyPI("mleap", "0.15.0", extras="extras")
dbutils.library.restartPython()

## Import Libraries

In [None]:
#Starting libraries
import numpy as np
import pandas as pd

#Machine Learning libraries
import pyspark
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.sql.functions import avg
from pyspark.sql.functions import lit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml import PipelineModel

#Charts library
import matplotlib.pyplot as plt
import seaborn as sns

#MLflow
import mlflow
from mlflow.tracking import MlflowClient
from mlflow import log_metric,  log_artifact
import mlflow.spark
import mlflow.mleap
from mleap.pyspark.spark_support import SimpleSparkSerializer

#utils
import os
from urllib import request
import warnings
import tempfile

## Import Data

In [None]:
request.urlretrieve("https://github.com/sassoftware/python-sasctl/raw/master/examples/data/boston_house_prices.csv","/tmp/boston_house_prices.csv")
dbutils.fs.mv("file:/tmp/boston_house_prices.csv","dbfs:/data/boston_house_prices.csv")

In [None]:
df = (spark.read
  .option("HEADER", True)
  .option("inferSchema", True)
  .csv("/data/boston_house_prices.csv")
)

display(df)

In [None]:
df.schema

## Data Exploration

In [None]:
display(df.describe())

In [None]:
#median value of owner-occupied homes in $1000’s
display(df[['medv']])

In [None]:
#median value of owner-occupied homes in $1000’
#average number of rooms per dwelling

display(df[['medv', 'rm']])

In [None]:
# Look at other relationships
# crim - per capita crime rate by town
# lower - % lower status of the population

fig, ax = plt.subplots()
plotdf = df[["rm", "crim", "lstat", "medv", "rad", "tax"]].toPandas()

pd.plotting.scatter_matrix(plotdf)
# ax.set_title('Scatter plot')

display(fig.figure)

In [None]:
# Let's calculate correlation

assembler = VectorAssembler(inputCols=list(df.columns), outputCol="features")
df_ftz = assembler.transform(df)

pearsonCorr = Correlation.corr(df_ftz, 'features').collect()

corrdf = pd.DataFrame(pearsonCorr[0][0].toArray())

In [None]:
corrdf.index, corrdf.columns = df.columns, df.columns
fig, ax = plt.subplots()
sns.heatmap(corrdf)
display(fig.figure)

## Model Development and Model Tracking with Mlflow

We will fit: 

  1. **Baseline Model** (by calculating the average housing value in the training dataset)

and then we challenge it with 

  2. **Linear Regression**

## BaseLine Model

In [None]:
# Train and Test splitting
train, test= df.randomSplit([0.7, 0.3], seed=12345)

print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

train.schema

In [None]:
#Baseline model

fit = train.groupby().avg('medv').collect()[0][0]
print("Average home value: {}".format(fit))

predict = test.withColumn("prediction", lit(fit))
display(predict)

In [None]:
# Evaluate BaseModel

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="medv")
rmse = evaluator.evaluate(predict)
mse = evaluator.evaluate(predict, {evaluator.metricName: "mse"})
r2 = evaluator.evaluate(predict, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predict, {evaluator.metricName: "mae"})

print("rmse on the test set for the baseline model: {}".format(rmse))
print("mse on the test set for the baseline model: {}".format(mse))
print("r2 on the test set for the baseline model: {}".format(r2))
print("mae on the test set for the baseline model: {}".format(mae))

In [None]:
# Track the Baseline experiment

with mlflow.start_run(run_name="Basic RF Experiment") as run:
  
  # Log a metrics
  log_metric("rmse", rmse)
  log_metric("mse", mse)
  log_metric("r2", r2)
  log_metric("mae", mae)
  
  #Log artefacts (Scored Test data)
  scored_df = predict.toPandas()
  scored_df.to_csv('scored_df.csv')
  log_artifact("scored_df.csv")

  runID = run.info.run_uuid
  experimentID = run.info.experiment_id
  
  print("Inside MLflow Run with run_id {} and experiment_id {}".format(runID, experimentID))

## Multivariate Linear Regression Model

### Prepare Data

In [None]:
features = df.schema.names[:-1]
assembler_features = VectorAssembler(inputCols=features, outputCol="features")
abt_train = assembler_features.transform(train)
abt_test = assembler_features.transform(test)

#display
display(abt_train)

### Fit Regression

In [None]:
lr = LinearRegression(featuresCol = 'features', labelCol = 'medv', maxIter=10)
lrModel = lr.fit(abt_train)

### Evaluate Model

In [None]:
# Interpret the Coefficients

beta = pd.DataFrame(np.array(lrModel.coefficients), columns=['betacoeff'])
beta['coeffnames'] = features
beta.sort_values(by='betacoeff', inplace=True)
display(beta)

In [None]:
print("Model explains {}% of Total Variance".format(round(lrModel.summary.r2*100)))

print("β0 (intercept pval): {}".format(lrModel.summary.pValues[0]))
for i, (col, coef) in enumerate(zip(features, lrModel.summary.pValues[1:])):
  if lrModel.summary.pValues[i] > 0.05:
    print("β{} (coefficient pval for {}): {}".format(i+1, col, coef))
  else:
    print("β{} coefficient not significant at 5%".format(i+1))

In [None]:
# Make predictions
predictions = lrModel.transform(abt_test)
display(predictions.select('medv', 'prediction'))

In [None]:
fig, ax = plt.subplots()
predictionsDF = predictions.toPandas()
sns.residplot('prediction', 'medv', data=predictionsDF)
plt.xlabel("Predicted values for medv")
plt.ylabel("Residual")
plt.title("Residual Plot")
display(plt.show())

In [None]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="medv")
rmse = evaluator.evaluate(predictions)
mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})

print("rmse on the test set for the Linear model: {}".format(rmse))
print("mse on the test set for the Linear model: {}".format(mse))
print("r2 on the test set for the Linear model: {}".format(r2))
print("mae on the test set for the Linear model: {}".format(mae))

## Create a series of Multivariate Linear Regression Experiments with MLflow
Use MlFlow to record model, log model parameters, metrics, and artifacts

### Tracking different experiments

In [None]:
# Define tracking function
def log_lineareg(experimentID, run_name, params, abt_train, abt_test, debug=False):
  
  """
  Function to start a run within a existing experiment
  :param experimentID: unique ID associated to original experiment
  :param run_name: label for the name of the run
  :param params: ters used for the run, such as arguments
  :param abt_train: analytical base table for training 
  :param abt_test: analytical base table for testing
  :param debug: for debugging purpose
  :return: run ID
  """

  with mlflow.start_run(experiment_id=experimentID, run_name=run_name) as run:
  
  #Define variables
#   params = {'featuresCol' : 'features', 'labelCol' : 'medv', 'maxIter' : 10}

  # Create Model Instance
    lr = LinearRegression(**params)
    
    if debug:
      print(lr.params)

    # Fit Model and Predict
    lrModel = lr.fit(abt_train)
    predictions = lrModel.transform(abt_test)

    # Log params and metrics using the MLflow APIs
    mlflow.log_params(params)
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("mse", mse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

    #Log artefacts (Scored Test data & Coefficients Summary)

    ## Scored Test data
    temp1 = tempfile.NamedTemporaryFile(prefix='scored_df_', suffix='.csv')
    temp1_name = temp1.name
    try: 
      scored_df = predictions.drop('features').toPandas()
      scored_df.to_csv(temp1_name, index=False)
      mlflow.log_artifact(temp1_name)
    except SystemError:
      print('Check the log!')
    finally:
      temp1.close()

    ## Coefficients Summary
    temp2 = tempfile.NamedTemporaryFile(prefix='Coefficients_summary_', suffix='.csv')
    temp2_name = temp2.name
    try: 
      summary = pd.DataFrame(features, columns=['features'])
      summary['betacoeff'] = np.array(lrModel.coefficients)
      summary['pvalues'] = [round(pval, 4) for (col, pval) in zip(features, lrModel.summary.pValues[1:])]
      summary.sort_values(by='pvalues', inplace=True)
      summary.to_csv(temp2_name, index=False)
      mlflow.log_artifact(temp2_name)
    except SystemError:
      print('Check the log!')
    finally:
      temp2.close()

    # Log residuals using a temporary file
    temp3 = tempfile.NamedTemporaryFile(prefix="residuals-", suffix=".png")
    temp3_name = temp3.name

    try:
      ## Create Residual plots
      fig, ax = plt.subplots()
      sns.residplot('prediction', 'medv', data=scored_df)
      plt.xlabel("Predicted values for medv")
      plt.ylabel("Residual")
      plt.title("Residual Plot")
      fig.savefig(temp3_name)
      mlflow.log_artifact(temp3_name, "residuals.png")

    finally:
      temp3.close() # Delete the temp file

    # Log the model both in python and in spark and mleap flavors
    mlflow.spark.log_model(spark_model=lrModel, 
                           artifact_path="pyspark-multi-linear-model", 
                           sample_input=abt_test)

    runID = run.info.run_uuid
    experimentID = run.info.experiment_id

    return runID
  

### Experiment 1...2...n

In [None]:
params = {'featuresCol' : 'features', 'labelCol' : 'medv', 'maxIter' : 10}

log_lineareg(experimentID, '1th run', params, abt_train, abt_test)

In [None]:
params = {'featuresCol' : 'features', 'labelCol' : 'medv', 'maxIter' : 50}

log_lineareg(experimentID, '2th run', params, abt_train, abt_test)

In [None]:
params = {'featuresCol' : 'features', 'labelCol' : 'medv', 'maxIter' : 50, 'fitIntercept': True, 'solver': 'normal'}

log_lineareg(experimentID, '3rd run', params, abt_train, abt_test, debug=True)

## Model Testing

Assume that:

    1. you get the champion model (Lack in Model Registry functionality on current Databricks community edition). 
    2. And because IT asks you, you need to deploy in GCP with mleap flavor.
    
Then you would like to test locally the model and then pass it to ML engineering for implementation

Because we cannot load the MLeap model flavor in Python (https://mlflow.org/docs/latest/python_api/mlflow.mleap.html#module-mlflow.mleap), we have 

1. Download it using the Java API method downloadArtifacts(String runId) 
2. Load the model using the method MLeapLoader.loadPipeline(String modelRootPath)

For reference: https://docs.databricks.com/applications/machine-learning/model-export-import/mleap-model-export.html

In [None]:
client = MlflowClient()

#Get the list of all the runs for last experiment
client.list_run_infos(experimentID)

#Store run info in a dataframe
runs = pd.DataFrame([(run.run_uuid, run.start_time, run.artifact_uri) for run in client.list_run_infos(experimentID)])
runs.columns = ["run_uuid", "start_time", "artifact_uri"]

#Sort by start_time and pick the last run
last_run = runs.sort_values("start_time", ascending=False).iloc[0]

dbutils.fs.ls(last_run["artifact_uri"]+"/pyspark-multi-linear-model/")

In [None]:
%sh 
rm -rf /tmp/mleap_python_model_export
mkdir /tmp/mleap_python_model_export
ls -la /tmp/mleap_python_model_export

In [None]:
# Serialize Model to Bundle
lrModel.serializeToBundle("jar:file:/tmp/mleap_python_model_export/lrModel.zip", predictions)

In [None]:
%sh 
ls -la /tmp/mleap_python_model_export/

In [None]:
dbutils.fs.cp("file:/tmp/mleap_python_model_export/lrModel.zip", "dbfs:/example/lrModel.zip")
display(dbutils.fs.ls("dbfs:/example"))

In [None]:
# DeSerialize Model to Bundle
deserializedPipeline = PipelineModel.deserializeFromBundle("jar:file:/tmp/mleap_python_model_export/lrModel.zip")

In [None]:
exampleResults = deserializedPipeline.transform(abt_test)
display(exampleResults)

## Last but not least...

Everythings works fine! 

At this point, Data Scientist generally **Packages the ML project**

**BUT....**

The main assumption here is : ***Batch Model Deployment in GCP with mleap flavor***

Now, the primary use of MLeap is to import models into applications without Spark available. These applications should be implemented in Scala or Java

This implies that the best artchitecture is the one that ***Consume Mleap model in a Scala (or use the mlflow/java package) job in the GCP dataproc***

Anyway, **you can do that in Pyspark as well**

So we need to **Move the Mleap flavor from local MLflow server to GCP**
    
Because you're planning on deployment in GCP Dataproc and Databricks does not provide an out-of-the-box deployment mechanism, you have two possible way to go: 
    
        a. Git 
        b. Mlflow
        
    About (b), MLflow tracking server has two components for storage: a backend store and an artifact store. 
    The backend store is where MLflow Tracking Server stores experiment and run metadata as well as params, metrics, and tags for runs.
    The artifact store is a location suitable for large data (such as an S3 bucket or shared NFS file system) and is where clients log their artifact output (for example, models).
    
    Then you can create a MLflow Tracking server on GCP (create a SQL instance for backend store and Google Cloud Storage for artifact store) and store information remotely by making use of Tracking URI option
    
Cool solution. 

But it misses CI/CD logics...So I decide to go with (a). And:

    - Download the mleap flavor and push into a Git repo (because we're using the Databricks' Community edition. No Integration with GitHub)
    
    - Store it in Google Cloud Storage

Work in progress: Consume the artefact in GitHub Actions (GitOps logics)

## Download the mleap flavor


In [None]:
MLpackagePath = "/FileStore/ModelProjects/Boston_ML"
dbutils.fs.rm(MLpackagePath, True)
dbutils.fs.mkdirs(MLpackagePath)
dbutils.fs.ls(MLpackagePath)

In [None]:
# Prepare the environment
# Copy data to score
dbutils.fs.cp("dbfs:/data/boston_house_prices.csv", "dbfs:/FileStore/ModelProjects/Boston_ML")
# Copy model to consume for scoring
dbutils.fs.cp("dbfs:/example/lrModel.zip","dbfs:/FileStore/ModelProjects/Boston_ML")
# Check the content
dbutils.fs.ls(MLpackagePath)

Finally, to download the Model Artefact, below you can find an example: 

https://community.cloud.databricks.com/files/ModelProjects/Boston_ML/lrModel.zip?o=5798411837794065
