# Model Deployment using PySpark

In this notebook we are going to deploy as microservice the model built in the previous [notebook](./model.ipynb).

## Load libraries and data


In [133]:
from mleap import pyspark
from pyspark.ml.linalg import Vectors
from mleap.pyspark.spark_support import SimpleSparkSerializer
from pyspark.sql import functions as f
from pyspark.ml.feature import VectorAssembler, MaxAbsScaler, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import NaiveBayes

In [136]:
file_path = "/sparta/dibesa/german_credit_data_labels.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

Remove unusable columns

In [137]:
df = df.drop('Sex', 'Job', 'ID')
df.show()

+---+-------+---------------+----------------+-------------+--------+-------------------+----+
|Age|Housing|Saving accounts|Checking account|Credit amount|Duration|            Purpose|Risk|
+---+-------+---------------+----------------+-------------+--------+-------------------+----+
| 67|    own|             NA|          little|         1169|       6|           radio/TV|good|
| 22|    own|         little|        moderate|         5951|      48|           radio/TV| bad|
| 49|    own|         little|              NA|         2096|      12|          education|good|
| 45|   free|         little|          little|         7882|      42|furniture/equipment|good|
| 53|   free|         little|          little|         4870|      24|                car| bad|
| 35|   free|             NA|              NA|         9055|      36|          education|good|
| 53|    own|     quite rich|              NA|         2835|      24|furniture/equipment|good|
| 35|   rent|         little|        moderate|    

In [138]:
encoder = StringIndexer(inputCol='Risk', outputCol='Binary_Risk')
df = encoder.fit(df).transform(df)
df = df.drop('Risk')
df.show()

+---+-------+---------------+----------------+-------------+--------+-------------------+-----------+
|Age|Housing|Saving accounts|Checking account|Credit amount|Duration|            Purpose|Binary_Risk|
+---+-------+---------------+----------------+-------------+--------+-------------------+-----------+
| 67|    own|             NA|          little|         1169|       6|           radio/TV|        0.0|
| 22|    own|         little|        moderate|         5951|      48|           radio/TV|        1.0|
| 49|    own|         little|              NA|         2096|      12|          education|        0.0|
| 45|   free|         little|          little|         7882|      42|furniture/equipment|        0.0|
| 53|   free|         little|          little|         4870|      24|                car|        1.0|
| 35|   free|             NA|              NA|         9055|      36|          education|        0.0|
| 53|    own|     quite rich|              NA|         2835|      24|furniture/equ

## Transform data for our use case

In [139]:
df = df.withColumn('Checking_little', f.when(f.col('Checking account') == "little", 1).otherwise(0))
df = df.withColumn('Checking_null', f.when(f.col('Checking account') == "NA", 1.0).otherwise(0))
df = df.withColumn('Checking_moderate', f.when(f.col('Checking account') == "moderate", 1).otherwise(0))
df = df.withColumn('Savings_little', f.when(f.col('Saving accounts') == "little", 1).otherwise(0))
df = df.withColumn('Savings_null', f.when(f.col('Saving accounts') == "little", 1).otherwise(0))
df = df.withColumn('Purpose_radio/TV', f.when(f.col('Purpose') == "radio/TV", 1).otherwise(0))
df = df.withColumn('Housing_own', f.when(f.col('Housing') == "own", 1).otherwise(0))
df = df.withColumn('Credit_big', f.when(f.col('Credit amount') > 10000, 1).otherwise(0))
df = df.withColumn('Duration_short', f.when(f.col('Duration') < 12, 1).otherwise(0))
df = df.withColumn('Age_young', f.when(f.col('Age') < 27, 1).otherwise(0))

df = df.drop('Age', 'Housing', 'Saving accounts', 'Checking account', 'Credit amount', 'Duration', 'Purpose')
df.show()

+-----------+---------------+-------------+-----------------+--------------+------------+----------------+-----------+----------+--------------+---------+
|Binary_Risk|Checking_little|Checking_null|Checking_moderate|Savings_little|Savings_null|Purpose_radio/TV|Housing_own|Credit_big|Duration_short|Age_young|
+-----------+---------------+-------------+-----------------+--------------+------------+----------------+-----------+----------+--------------+---------+
|        0.0|              1|          0.0|                0|             0|           0|               1|          1|         0|             1|        0|
|        1.0|              0|          0.0|                1|             1|           1|               1|          1|         0|             0|        1|
|        0.0|              0|          1.0|                0|             1|           1|               0|          1|         0|             0|        0|
|        0.0|              1|          0.0|                0|         

# Define features

In [140]:
features = df.columns
features = features[1:]
features

['Checking_little',
 'Checking_null',
 'Checking_moderate',
 'Savings_little',
 'Savings_null',
 'Purpose_radio/TV',
 'Housing_own',
 'Credit_big',
 'Duration_short',
 'Age_young']

## Feature Pipeline

In [143]:
continuous_feature_assembler = VectorAssembler(inputCols=features, outputCol="all_features")

## Assemble our features and feature pipeline


In [145]:
featurePipeline = Pipeline(stages=[continuous_feature_assembler])

sparkFeaturePipelineModel = featurePipeline.fit(df)

print("Finished constructing the pipeline")

Finished constructing the pipeline


In [111]:
df.schema

StructType(List(StructField(ID,IntegerType,true),StructField(Age,IntegerType,true),StructField(Sex,StringType,true),StructField(Job,IntegerType,true),StructField(Housing,StringType,true),StructField(Saving accounts,StringType,true),StructField(Checking account,StringType,true),StructField(Credit amount,IntegerType,true),StructField(Duration,IntegerType,true),StructField(Purpose,StringType,true),StructField(Risk,StringType,true)))

## Train a Naive Bayes Model

In [147]:
# Step 3.2 Create our model

nb = NaiveBayes(featuresCol="all_features", labelCol="Binary_Risk", smoothing=0.0001, modelType="multinomial")

pipeline_nb = [sparkFeaturePipelineModel] + [nb]

sparkPipelineEstimatornb = Pipeline(stages = pipeline_nb)

sparkPipelinenb = sparkPipelineEstimatornb.fit(df)

print("Complete: Training Naive Bayes")

Complete: Training Naive Bayes


In [14]:
%listPipelines

[
  {
    "marathonState": {
      "deployments": [],
      "tasksStaged": 0,
      "appStatus": "not_deployed",
      "tasksRunning": 0,
      "tasksUnhealthy": 0,
      "tasksStatus": [],
      "tasksHealthy": 0
    },
    "modelName": "testboth",
    "versions": [
      {
        "files": [
          {
            "serializationLibVersion": "2.2.0.5",
            "serializationLib": "spark",
            "modelPath": "/intellmodelrep1/models/testboth/v0/testboth-spark-v0.zip",
            "type": "pipelinemodel"
          },
          {
            "serializationLibVersion": "0.7.0",
            "serializationLib": "mleap",
            "modelPath": "/intellmodelrep1/models/testboth/v0/testboth-mleap-v0.zip",
            "type": "pipelinemodel"
          }
        ],
        "user": "formacion1",
        "id": "2b77709f-f5e7-4b54-9584-5244e6dadae2",
        "additionalInfo": null,
        "modelDescription": "This is a sample model",
        "modelVersion": 1,
        "timestamp": "20

## Save Models in HDFS

In [45]:
%savePipeline -h

usage: savePipeline [-h] [--pipelineName PIPELINENAME]
                    [--pipelineModelObject PIPELINEMODELOBJECT]
                    [--dataframe DATAFRAME] [--description DESCRIPTION]
                    [--additionalInfo ADDITIONALINFO]
                    [--serializationLib {mleap,spark}]

Serializes a PipelineModel to a zip file and upload it to the configured model
repository.

optional arguments:
  -h, --help            show this help message and exit
  --pipelineName PIPELINENAME
  --pipelineModelObject PIPELINEMODELOBJECT
  --dataframe DATAFRAME
  --description DESCRIPTION
  --additionalInfo ADDITIONALINFO
  --serializationLib {mleap,spark}


In [46]:
%deletePipeline -h

usage: deletePipeline [-h] [--pipelineName PIPELINENAME]

Deletes a PipelineModel stored in a model repository.

optional arguments:
  -h, --help            show this help message and exit
  --pipelineName PIPELINENAME


In [47]:
%deployPipeline -h

usage: deployPipeline [-h] [--pipelineName PIPELINENAME] [--cpus CPUS]
                      [--mem MEM] [--instances INSTANCES]

Deploy a PipelineModel stored in a model repository.

optional arguments:
  -h, --help            show this help message and exit
  --pipelineName PIPELINENAME
  --cpus CPUS           Number of cpus per deployed model microservice.
  --mem MEM             Memory per deployed model microservice instance (in
                        Megabytes).
  --instances INSTANCES
                        Number of instances that will be deployed.


In [24]:
%savePipeline   --pipelineName dibesa-credit \
                --pipelineModelObject sparkPipelinenb \
                --dataframe dataset_imputed \
                --description "Linear regression model trained with the data provided by AirBnb"

{"message":"Model 'airbnb' correctly uploaded."}


In [22]:
%deletePipeline --pipelineName dibesa-credit

Error while deleting 'airbnb' model from repository. Response from server:
 {"error":"ModelMetadataNotExistsException","reason":"Model 'airbnb' not found in the metadata repository. "}


In [34]:
%deployPipeline --pipelineName dibesa-credit

Model 'airbnb' correctly deployed from repository.


In [28]:
dataset_imputed.write.mode("overwrite").parquet("/tmp/airbnb_input.parquet")

In [62]:
import requests
import decimal

def get_mleap_request_body(df, num_entries=10):
    schema = df.schema.fields
    fields = [{k:v if 'decimal' not in v else 'double' for k,v in x.jsonValue().items() if k in ('name', 'type')} for x in schema]
    
    field_names = [field['name'] for field in fields]
    data = df.limit(num_entries).collect()
    rows = [
        [row[key] if not isinstance(row[key], decimal.Decimal) else float(row[key]) for key in field_names]
        for row in data
    ]
    body = {'schema': {'fields': fields}, 'rows': rows}
    return body

def request_to_ms(model_name, df):
    url = 'https://microservice-mleap-{0}.microservice.intelligence.marathon.mesos:8080/{0}/transform'.format(model_name)
    request_body = get_mleap_request_body(df, num_entries=1)
    response = requests.post(url, json=request_body, verify=False)
    return response

In [66]:
aa = get_mleap_request_body(df, 1)

In [67]:
aa

{'rows': [['1949687',
   ' London',
   'Other',
   'A unique Victorian development of special architectural and historic interest as it combines housing and workshops. This is a thriving artist community, friendly and close to the centre of the city.',
   80.0,
   1.0,
   1.0,
   'Entire home/apt',
   0.0,
   'moderate',
   100.0,
   80.0,
   8.0,
   10.0,
   0.0,
   20.0,
   94.0,
   380.0,
   0.0]],
 'schema': {'fields': [{'name': 'id', 'type': 'string'},
   {'name': 'city', 'type': 'string'},
   {'name': 'state', 'type': 'string'},
   {'name': 'space', 'type': 'string'},
   {'name': 'price', 'type': 'double'},
   {'name': 'bathrooms', 'type': 'double'},
   {'name': 'bedrooms', 'type': 'double'},
   {'name': 'room_type', 'type': 'string'},
   {'name': 'host_is_superhost', 'type': 'double'},
   {'name': 'cancellation_policy', 'type': 'string'},
   {'name': 'security_deposit', 'type': 'double'},
   {'name': 'price_per_bedroom', 'type': 'double'},
   {'name': 'number_of_reviews', 'type'

In [63]:
rr = request_to_ms('examplemodel',df)



In [88]:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

In [64]:
rr.json()

{'rows': [['1949687',
   ' London',
   'Other',
   'A unique Victorian development of special architectural and historic interest as it combines housing and workshops. This is a thriving artist community, friendly and close to the centre of the city.',
   80.0,
   1.0,
   1.0,
   'Entire home/apt',
   0.0,
   'moderate',
   100.0,
   80.0,
   8.0,
   10.0,
   0.0,
   20.0,
   94.0,
   380.0,
   0.0,
   {'dimensions': [8],
    'values': [1.0, 1.0, 100.0, 20.0, 10.0, 8.0, 380.0, 94.0]},
   {'dimensions': [8],
    'values': [2.0701404784672404,
     1.18111353148471,
     0.4089700538272752,
     0.4690169961895324,
     0.5352011985607764,
     0.28585910572404577,
     1.045684307820422,
     10.959305553807575]},
   0.0,
   0.0,
   1.0,
   0.0,
   0.0,
   {'dimensions': [2], 'indices': [[0]], 'values': [1.0]},
   {'dimensions': [1], 'indices': [[0]], 'values': [1.0]},
   {'dimensions': [6], 'indices': [[1]], 'values': [1.0]},
   {'dimensions': [1], 'indices': [[0]], 'values': [1.0]},
 

In [95]:
type(model), type(df)

(pyspark.ml.pipeline.PipelineModel, pyspark.sql.dataframe.DataFrame)

In [96]:
%savePipeline   --pipelineName examplemodel \
                --pipelineModelObject model \
                --dataframe df \
                --description "Example model to show how to upload and deploy a model"

{"error":"ModelAlreadyExistsException","reason":"Model 'examplemodel' already exists in repository."}


In [97]:
%deployPipeline --pipelineName examplemodel

Error while deploying 'examplemodel' model from repository.  Response from server:
 {"error":"MarathonException","reason":"Error in Marathon. Code: 409  Body: {\"message\":\"An app with id [/intelligence/microservice/microservice-mleap-examplemodel] already exists.\"}"}


In [98]:
df.schema.fields

[StructField(id,StringType,true),
 StructField(city,StringType,true),
 StructField(state,StringType,true),
 StructField(space,StringType,true),
 StructField(price,DoubleType,true),
 StructField(bathrooms,DoubleType,true),
 StructField(bedrooms,DoubleType,true),
 StructField(room_type,StringType,true),
 StructField(host_is_superhost,DoubleType,true),
 StructField(cancellation_policy,StringType,true),
 StructField(security_deposit,DoubleType,true),
 StructField(price_per_bedroom,DoubleType,true),
 StructField(number_of_reviews,DecimalType(11,1),true),
 StructField(extra_people,DoubleType,true),
 StructField(instant_bookable,DoubleType,true),
 StructField(cleaning_fee,DoubleType,true),
 StructField(review_scores_rating,DoubleType,true),
 StructField(square_feet,DoubleType,true),
 StructField(n_bathrooms_more_than_two,DecimalType(2,1),true)]

In [99]:
schema = {'fields': [
    {'name': 'id', 'type': 'string'},
    {'name': 'city', 'type': 'string'},
    {'name': 'state', 'type': 'string'},
    {'name': 'space', 'type': 'string'},
    {'name': 'price', 'type': 'double'},
    {'name': 'bathrooms', 'type': 'double'},
    {'name': 'bedrooms', 'type': 'double'},
    {'name': 'room_type', 'type': 'string'},
    {'name': 'host_is_superhost', 'type': 'double'},
    {'name': 'cancellation_policy', 'type': 'string'},
    {'name': 'security_deposit', 'type': 'double'},
    {'name': 'price_per_bedroom', 'type': 'double'},
    {'name': 'number_of_reviews', 'type': 'double'},
    {'name': 'extra_people', 'type': 'double'},
    {'name': 'instant_bookable', 'type': 'double'},
    {'name': 'cleaning_fee', 'type': 'double'},
    {'name': 'review_scores_rating', 'type': 'double'},
    {'name': 'square_feet', 'type': 'double'},
    {'name': 'n_bathrooms_more_than_two', 'type': 'double'}
]}

In [100]:
example_to_predict = [
    '1949687',
    ' London',
    'Other',
    'A unique Victorian development of special architectural and historic interest as it combines housing and workshops. This is a thriving artist community, friendly and close to the centre of the city.',
    80.0,
    1.0,
    1.0,
    'Entire home/apt',
    0.0,
    'moderate',
    100.0,
    80.0,
    8.0,
    10.0,
    0.0,
    20.0,
    94.0,
    380.0,
    0.0
]

In [101]:
request_body = {'schema': schema, 'rows': [example_to_predict]}

In [102]:
url = 'https://microservice-mleap-examplemodel.microservice.intelligence.marathon.mesos:8080/examplemodel/transform'

In [103]:
import requests
response = requests.post(url, json=request_body, verify=False)

In [106]:
response.json()

{'rows': [['1949687',
   ' London',
   'Other',
   'A unique Victorian development of special architectural and historic interest as it combines housing and workshops. This is a thriving artist community, friendly and close to the centre of the city.',
   80.0,
   1.0,
   1.0,
   'Entire home/apt',
   0.0,
   'moderate',
   100.0,
   80.0,
   8.0,
   10.0,
   0.0,
   20.0,
   94.0,
   380.0,
   0.0,
   {'dimensions': [8],
    'values': [1.0, 1.0, 100.0, 20.0, 10.0, 8.0, 380.0, 94.0]},
   {'dimensions': [8],
    'values': [2.0701404784672404,
     1.18111353148471,
     0.4089700538272752,
     0.4690169961895324,
     0.5352011985607764,
     0.28585910572404577,
     1.045684307820422,
     10.959305553807575]},
   0.0,
   0.0,
   1.0,
   0.0,
   0.0,
   {'dimensions': [2], 'indices': [[0]], 'values': [1.0]},
   {'dimensions': [1], 'indices': [[0]], 'values': [1.0]},
   {'dimensions': [6], 'indices': [[1]], 'values': [1.0]},
   {'dimensions': [1], 'indices': [[0]], 'values': [1.0]},
 