## Packaging Champion Model (Mlean Flavor) for GCP deployment

Something to have in mind: 

Add the following jar files inside $SPARK_HOME/jars
1. mleap-spark-base_2.11-0.7.0.jar
2. mleap-core_2.11-0.7.0.jar
3. mleap-runtime_2.11-0.7.0.jar
4. mleap-spark_2.11-0.7.0.jar
5. bundle-ml_2.11-0.7.0.jar
6. config-0.3.0.jar
7. scalapb-runtime_2.11-0.6.1.jar
8. mleap-tensor_2.11-0.7.0.jar

and then 

9. installed using pip mleap (0.7.0) - MLeap Python API

### References

- https://docs.azuredatabricks.net/_static/notebooks/mleap-model-export-demo-scala.html
- https://github.com/combust/mleap/wiki/Setting-up-a-Spark-2.0-notebook-with-MLeap-and-Toree
- https://github.com/combust/mleap/issues/172
- https://cloud.google.com/dataproc/docs/tutorials/spark-scala

Let's create a simple MLflow project programmatically with:

1. Create a Working Dir

2. Create a scala job

2. Create score.py

<!-- 3. Create the .sh to run the score.py


2. Create the ML project:
  - MLProject file
  - Conda environment
  - Basic machine learning script

3. Create the scoring script
4. Test the scoring script
5. Create the entrypoint file:
  - execute .sh (Create a Spark cluster, Install Mlflow, Run Batch Scoring Job based on score python code in cloud bucket) -->

In [28]:
# %%bash
# spark-shell --packages ml.combust.mleap:mleap-spark_2.11:0.15.0

Spark context Web UI available at http://6f23552da701:4040
Spark context available as 'sc' (master = local[*], app id = local-1587031675237).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/
         
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :quit


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/spark-2.4.5-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
ml.combust.mleap#mleap-spark_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f163985b-3f0a-46ed-87fe-76debbf8df00;1.0
	confs: [default]
	found ml.combust.mleap#mleap-spark_2.11;0.15.0 in central
	found ml.combust.mleap#mleap-spark-base_2.11;0.15.0 in central
	found ml.combust.mleap#mleap-runtime_2.11;0.15.0 in central
	found ml.combust.mleap#mleap-core_2.11;0.15.0 in central
	found ml.combust.mleap#mleap-base_2.11;0.15.0 in central
	found ml.combust.mleap#mleap-tensor_2.11;0.15.0 in central
	found io.spray#spray-json_2.11;1.3.2 in central
	found com.github.rwl#jtransforms;2.4.0 in central
	found ml.combust.bundle#bundle-ml_2.11;0.15.0 in central
	found com.google.protobuf#protobuf-java;3.5.1 in

In [3]:
!pip install mleap==0.15.0

Collecting mleap==0.15.0
  Downloading mleap-0.15.0-py3-none-any.whl (45 kB)
[K     |████████████████████████████████| 45 kB 3.8 MB/s  eta 0:00:01
Installing collected packages: mleap
Successfully installed mleap-0.15.0


## 1. Create a working Dir

In [4]:
# MLpackagePath = "/FileStore/ModelProjects/Boston_ML"
# dbutils.fs.rm(MLpackagePath, True)
# dbutils.fs.mkdirs(MLpackagePath)
# dbutils.fs.ls(MLpackagePath)

In [5]:
# # Prepare the environment
# # Copy data to score
# dbutils.fs.cp("dbfs:/data/boston_house_prices.csv", "dbfs:/FileStore/ModelProjects/Boston_ML")
# # Copy model to consume for scoring
# dbutils.fs.cp("dbfs:/example/lrModel.zip","dbfs:/FileStore/ModelProjects/Boston_ML")
# # Check the content
# dbutils.fs.ls(MLpackagePath)

## 2. Download the folder programmatically

In [6]:
# # Copy model to consume for scoring
# dbutils.fs.cp("dbfs:/example/lrModel.zip","/tmp")
# # Check the content
# dbutils.fs.ls("/tmp")

## 2. Create score.py job

In [42]:
%%writefile score.py

#!/usr/bin/python

import numpy as np
import pandas as pd
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType
from pyspark.ml.feature import VectorAssembler
import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer
from pyspark.ml import PipelineModel
from pyspark.ml.evaluation import RegressionEvaluator


import os
import sys
import argparse
import tempfile
import warnings


def read_data_csv(spark, inputPath_CSV):
    
    '''
    Function to load data in the Spark Session 
    :param spark: spark session 
    :param inputPath: path to get the data 
    :return: df
    '''
    
    print('Trying to read the data...')
    
    try:
        schema = StructType([
          StructField('crim',DoubleType(),True),
          StructField('zn',DoubleType(),True),
          StructField('indus',DoubleType(),True),
          StructField('chas',IntegerType(),True),
          StructField('nox',DoubleType(),True),
          StructField('rm',DoubleType(),True),
          StructField('age',DoubleType(),True),
          StructField('dis',DoubleType(),True),
          StructField('rad',IntegerType(),True),
          StructField('tax',IntegerType(),True),
          StructField('ptratio',DoubleType(),True),
          StructField('b',DoubleType(),True),
          StructField('lstat',DoubleType(),True),
          StructField('medv',DoubleType(),True)]
        )
        
        df = (spark.read
          .option("HEADER", True)
          .schema(schema)
          .csv(inputPath_CSV))
    
    except ValueError:
        print('At least, one variable format is wrong! Please check the data')
      
    else:
        print('Data to score have been read successfully!')
        return df

def preprocessing(df):

    '''
    Function to preprocess data 
    :param df: A pyspark DataFrame 
    :return: abt_to_score
    '''
    
    print('Data preprocessing...')

    features = df.schema.names[:-1]
    assembler_features = VectorAssembler(inputCols=features, outputCol="features")
    abt_to_score = assembler_features.transform(df)
    
    print('Data have been processed successfully!')
    return abt_to_score

def score_data(abt_to_score, modelPath):
    
    '''
    Function to score data 
    :param abt_to_score: A pyspark DataFrame to score
    :param modelPath: The modelpath associated to .zip mleap flavor
    :return: scoredData
    '''
    print('Scoring process starts...')
    
    deserializedPipeline = PipelineModel.deserializeFromBundle("jar:file:{}".format(modelPath))
    scoredData = deserializedPipeline.transform(abt_to_score)
    return scoredData  
  
def write_output_csv(scoredData, outputPath_CSV):
    '''
    Function to write predictions
    :param scoredData: A pyspark DataFrame of predictions
    :param outputPath: The path to write the ouput table
    :return: scoredData
    '''
    print('Writing Prediction in {}'.format(outputPath_CSV))
    scoredData.toPandas().to_csv(outputPath_CSV, sep=',', index=False)
    return scoredData.toPandas().to_dict()

def evaluator(predictions):
    
    '''
    Function to produce some evaluation stats
    :param predictions: A pyspark DataFrame of predictions
    :return: rmse, mse, r2, mae
    '''
    evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="medv")
    rmse = evaluator.evaluate(predictions)
    mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})
    r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
    mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
    
    return rmse, mse, r2, mae

def main():
    
    parser = argparse.ArgumentParser(description='Score')
    
    parser.add_argument('--input', dest="inputpath_CSV",
                        required=True, help='Provide the input path of data to score')
    
    parser.add_argument('--model', dest="modelPath",
                        required=True, help='Provide the model path to score')
    
    parser.add_argument('--output', dest="outputpath_CSV",
                        required=True, help='Provide the model path to score')

    args = parser.parse_args()
    input_path_CSV = args.inputpath_CSV
    modelPath = args.modelPath
    output_path_CSV = args.outputpath_CSV
  
    try:
#         spark = SparkSession \
#         .builder \
#         .master(SPARK_MASTER) \
#         .config('spark.executor.memory', TOTAL_MEMORY) \
#         .config('spark.cores.max', TOTAL_CORES) \
#         .config('spark.jars.packages',
#                 'ml.combust.mleap:mleap-spark-base_2.11:0.9.3,ml.combust.mleap:mleap-spark_2.11:0.9.3') \
#         .appName("ClassifierTraining") \
#         .getOrCreate()
        spark = SparkSession.builder.appName('MyApp').getOrCreate()
        spark.sparkContext.setLogLevel("OFF")
        print('Created a SparkSession')
    
    except ValueError:
        warnings.warn('Check')
  
    #Read data
    data_to_process = read_data_csv(spark, input_path_CSV)
    #Preprocessing
    abt = preprocessing(data_to_process)
    #Scoring
    abt_scored = score_data(abt, modelPath)
    #Write data
    write_output_csv(abt_scored, output_path_CSV)
    #Evaluate Model
    evalstats = evaluator(abt_scored)
    return evalstats
    
    
if __name__=="__main__":
    
    stats = main()
    print('-'*20)
    print('Process Log')
    print('-'*20)
    print('Scoring Job ends successfully!')
    print("RMSE for the model: {}".format(stats[0]))
    print("MSE for the model: {}".format(stats[1]))
    print("R2 for the model: {}".format(stats[2]))
    print("MAE for the model: {}".format(stats[3]))
    print('Look at the Storage Bucket to get predictions!')
    

Overwriting score.py


In [43]:
%%bash
python score.py --input "/home/jovyan/work/1_data/boston_house_prices.csv" \
    --model "/home/jovyan/work/2_notebooks/output/ModelProjects_Boston_ML_lrModel.zip"\
    --output  "/home/jovyan/work/1_data/boston_house_prices_scored.csv" 

Created a SparkSession
Trying to read the data...
Data to score have been read successfully!
Data preprocessing...
Data have been processed successfully!
Scoring process starts...
Writing Prediction in /home/jovyan/work/1_data/boston_house_prices_scored.csv
--------------------
Process Log
--------------------
Scoring Job ends successfully!
RMSE for the model: 4.696684029858866
MSE for the model: 22.05884087633132
R2 for the model: 0.7386998714429953
MAE for the model: 3.3284024432759862
Look at the Storage Bucket to get predictions!


20/04/16 12:20:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## 3.  Create a Spark Scala job to run on Cloud Dataproc for deploying the model in batch

In [None]:
dbutils.fs.put(f"{MLpackagePath}/score.py",
"""
#!/usr/bin/python

import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType
from pyspark.ml import PipelineModel
from pyspark.ml.evaluation import RegressionEvaluator


import os
import sys
import argparse
import tempfile
import warnings

# Read Data

# def read_data_csv(spark, inputPath_CSV):
  
#   '''
#   Function to load data in the Spark Session 
#   :param spark: spark session 
#   :param inputPath: path to get the data 
#   :return: df
#   '''
  
#   print('trying to read the data...')
  
#   try:
#     # define the schema
#     schema = StructType([
#       StructField('crim',DoubleType(),True),
#       StructField('zn',DoubleType(),True),
#       StructField('indus',DoubleType(),True),
#       StructField('chas',IntegerType(),True),
#       StructField('nox',DoubleType(),True),
#       StructField('rm',DoubleType(),True),
#       StructField('age',DoubleType(),True),
#       StructField('dis',DoubleType(),True),
#       StructField('rad',IntegerType(),True),
#       StructField('tax',IntegerType(),True),
#       StructField('ptratio',DoubleType(),True),
#       StructField('b',DoubleType(),True),
#       StructField('lstat',DoubleType(),True),
#       StructField('medv',DoubleType(),True)]
#     )

#     df = (spark.read
#           .option("HEADER", True)
#           .schema(schema)
#           .csv(datapath))
    
#   except ValueError:
#     print('At least, one variable format is wrong! \
#     Please check the data')
      
#   else:
#     print('Data to score have been read successfully!')
#     return df
  
# #Preprocessing

# def preprocessing(df):
  
#   '''
#   Function to preprocess data 
#   :param df: A pyspark DataFrame 
#   :return: abt_to_score
#   '''
  
#   print('Data preprocessing...')
  
#   features = df.schema.names[:-1]
#   assembler_features = VectorAssembler(inputCols=features, outputCol="features")
#   abt_to_score = assembler_features.transform(df)
#   return abt_to_score

# #Scoring
# def score_data(abt_to_score, modelPath):
  
#   '''
#   Function to score data 
#   :param abt_to_score: A pyspark DataFrame to score
#   :param modelPath: The modelpath associated to .zip mleap flavor
#   :return: scoredData
#   '''
  
#   print('Scoring process starts...')
  
#   deserializedPipeline = PipelineModel.deserializeFromBundle("jar:file:{}".format(modelpath))
#   scoredData = deserializedPipeline.transform(abt_to_score)
#   return scoredData  
  
# def write_output_csv(scoredData, outputPath_CSV):
  
#   '''
#   Function to write predictions
#   :param scoredData: A pyspark DataFrame of predictions
#   :param outputPath: The path to write the ouput table
#   :return: scoredData
#   '''

#   scoredData.toPandas().to_csv(outputPath_CSV, sep=',', index=False)
#   return outputDf.toPandas().to_dict()
  
def main():

  parser = argparse.ArgumentParser(description='Score')
  
  parser.add_argument('-i', dest="inputpath_CSV",
                        required=True, help='Provide the input path of data to score')

  args = parser.parse_args()
  input_path_CSV = args.inputpath_CSV
  
#   #Create a Spark Session
#   spark = SparkSession.builder.appName('MyApp').config("spark.master", "local").getOrCreate()
  
  #Read data
  #read_data_csv(spark, input_path_CSV)
  
  
if __name__=="__main__":

  from pyspark import SparkContext
  from pyspark.sql import SQLContext

  try:
    conf = pyspark.SparkConf().setMaster("local").setAppName("My app")
    sc = SparkContext.getOrCreate(conf)
   # sqlContext = SQLContext.getOrCreate(sc)
#     sc = pyspark.SparkContext()
#     sc.setLogLevel("ERROR")
#     sqlContext = pyspark.sql.SQLContext(sc)
    print('Created a SparkContext')
      
  except ValueError:
      warnings.warn('SparkContext already exists in this scope')
      
  sys.exit(main())

""".strip(), True)

In [None]:
import subprocess
# errors in the created process are raised here too
try:
  output = subprocess.check_output(["python","/dbfs/FileStore/ModelProjects/Boston_ML/score.py", "-i", "/dbfs/FileStore/ModelProjects/Boston_ML/boston_house_prices.csv"], stderr=subprocess.STDOUT, universal_newlines=True)
except subprocess.CalledProcessError as exc:
    print("Status : FAIL", exc.returncode, exc.output)
else:
    print("Output: \n{}\n".format(output))

## Test score.py

In [None]:
#!/usr/bin/python
import click


import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType
from pyspark.ml import PipelineModel
from pyspark.ml.evaluation import RegressionEvaluator


import os
import argparse
import tempfile
import warnings

@click.command()
@click.option("--inputPath_CSV", type=str, )

# Read Data

def read_data_csv(spark, inputPath_CSV):
  
  """
  Function to load data in the Spark Session 
  :param spark: spark session 
  :param inputPath: path to get the data 
  :return: df
  """
  
  print('trying to read the data...')
  
  try:
    # define the schema
    schema = StructType([
      StructField('crim',DoubleType(),True),
      StructField('zn',DoubleType(),True),
      StructField('indus',DoubleType(),True),
      StructField('chas',IntegerType(),True),
      StructField('nox',DoubleType(),True),
      StructField('rm',DoubleType(),True),
      StructField('age',DoubleType(),True),
      StructField('dis',DoubleType(),True),
      StructField('rad',IntegerType(),True),
      StructField('tax',IntegerType(),True),
      StructField('ptratio',DoubleType(),True),
      StructField('b',DoubleType(),True),
      StructField('lstat',DoubleType(),True),
      StructField('medv',DoubleType(),True)]
    )

    df = (spark.read
          .option("HEADER", True)
          .schema(schema)
          .csv(datapath))
    
  except ValueError:
    print('At least, one variable format is wrong! \
    Please check the data')
      
  else:
    print('Data to score have been read successfully!')
    return df
  
# #Preprocessing

# def preprocessing(df):
  
#   """
#   Function to preprocess data 
#   :param df: A pyspark DataFrame 
#   :return: abt_to_score
#   """
  
#   print('Data preprocessing...')
  
#   features = df.schema.names[:-1]
#   assembler_features = VectorAssembler(inputCols=features, outputCol="features")
#   abt_to_score = assembler_features.transform(df)
#   return abt_to_score

# #Scoring
# def score_data(abt_to_score, modelPath):
  
#   """
#   Function to score data 
#   :param abt_to_score: A pyspark DataFrame to score
#   :param modelPath: The modelpath associated to .zip mleap flavor
#   :return: scoredData
#   """
  
#   print('Scoring process starts...')
  
#   deserializedPipeline = PipelineModel.deserializeFromBundle("jar:file:{}".format(modelpath))
#   scoredData = deserializedPipeline.transform(abt_to_score)
#   return scoredData  
  
# def write_output_csv(scoredData, outputPath_CSV):
  
#   """
#   Function to write predictions
#   :param scoredData: A pyspark DataFrame of predictions
#   :param outputPath: The path to write the ouput table
#   :return: scoredData
#   """

#   scoredData.toPandas().to_csv(outputPath_CSV, sep=',', index=False)
#   return outputDf.toPandas().to_dict()
  
# def main():

#   parser = argparse.ArgumentParser(description='Score')

#   parser.add_argument('-s', dest="Spark_Session",
#                       help='Provide the name of Spark Session')
  
#   parser.add_argument('-i', dest="inputpath_CSV",
#                         required=True, help='Provide the input path of data to score')

#   args = parser.parse_args()
#   spark_session = args.Spark_session
#   input_path_CSV = args.Input_path_CSV
  
#   #Create a Spark Session
#   spark = SparkSession.builder.appName(spark_session).getOrCreate()
  
#   #Read data
#   read_data_csv(spark, inputPath_CSV)
  
# if __name__=="__main__":
#   sys.exit(main())

In [None]:
from click.testing import CliRunner

runner = CliRunner()
result1 = runner.invoke(read_data_csv, ['--datapath', '/data/boston_house_prices.csv'], catch_exceptions=True)

assert result1.exit_code == 0, "Code failed" # Check to see that it worked

print("Success!")

In [None]:
print(result1.output)

In [None]:
dbutils.fs.put(f"{MLpackagePath}/score.py", 

"""
#!/usr/bin/python

print('suca')

""".strip(), True)
               

In [None]:
import subprocess

# errors in the created process are raised here too
output = subprocess.check_output(["python","/dbfs/FileStore/ModelProjects/Boston_ML/score.py"], universal_newlines=True)

print(output)