# Running Spark ML Pipelines on Amazon EMR Cluster

Notebook [reference](https://github.com/aws/amazon-sagemaker-examples/blob/master/sagemaker-python-sdk/sparkml_serving_emr_mleap_abalone/sparkml_serving_emr_mleap_abalone.ipynb). 

Please ensure you use the following configuration when setting 

In [1]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1631487410552_0003,pyspark,busy,Link,Link,,
3,application_1631487410552_0004,pyspark,idle,Link,Link,,


In [2]:
from __future__ import print_function

import os
import shutil
import boto3
import numpy as np
import time

import pyspark
import pyspark.sql.functions as func
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.sql.types import (
    StructField, 
    StructType, 
    StringType, 
    DoubleType,
    FloatType
)

from pyspark.ml.feature import (
    StringIndexer,
    VectorIndexer,
    OneHotEncoderEstimator, #MLeap =<1.17.0 only supports OneHotEncoderEstimator for OHE
    VectorAssembler,
    IndexToString,
    MinMaxScaler
)

from pyspark.ml.classification import (
    LogisticRegression,
    DecisionTreeClassifier,
    RandomForestClassifier,
    GBTClassifier
)

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from mleap.pyspark.spark_support import SimpleSparkSerializer

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
5,application_1631487410552_0006,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
spark = SparkSession \
    .builder \
    .appName("Classifying if a customer's income is >$50k or lesser.") \
    .getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Load data from a public S3 bucket using the object URI
# The data does not include an header
# Also, the leading whitespace for each value needs to be removed

rawData = spark.read.format('csv') \
                .option('header', 'false') \
                .option('ignoreLeadingWhiteSpace', 'true') \
                .load('s3://layer-guide/dataset/adult-dataset.csv')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
## Another way to read the dataset--simpler.

# rawData = spark.read.csv(
#     "s3://layer-guide/dataset/adult-dataset.csv", header=False, schema=schema
# )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Specify the features and label you want to use for the dataset

dataset = rawData.toDF('Age',
               'WorkClass',
               'FnlWgt',
               'Education',
               'EducationNum',
               'MaritalStatus',
               'Occupation',
               'Relationship',
               'Race',
               'Gender',
               'CapitalGain',
               'CapitalLoss',
               'HoursPerWeek',
               'NativeCountry',
               'Income'
                )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# Validate data structure

print(f'Total Columns: {len(dataset.dtypes)}')
dataset.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total Columns: 15
root
 |-- Age: string (nullable = true)
 |-- WorkClass: string (nullable = true)
 |-- FnlWgt: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- EducationNum: string (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Relationship: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- CapitalGain: string (nullable = true)
 |-- CapitalLoss: string (nullable = true)
 |-- HoursPerWeek: string (nullable = true)
 |-- NativeCountry: string (nullable = true)
 |-- Income: string (nullable = true)

In [8]:
# WARNING: This can be inefficient for large datasets
dataset.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

48842

In [9]:
# Drop the column that's not useful

dataset = dataset.drop('FnlWgt')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
dataset.limit(10).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+----------------+---------+------------+--------------------+-----------------+-------------+-----+------+-----------+-----------+------------+-------------+------+
|Age|       WorkClass|Education|EducationNum|       MaritalStatus|       Occupation| Relationship| Race|Gender|CapitalGain|CapitalLoss|HoursPerWeek|NativeCountry|Income|
+---+----------------+---------+------------+--------------------+-----------------+-------------+-----+------+-----------+-----------+------------+-------------+------+
| 39|       State-gov|Bachelors|          13|       Never-married|     Adm-clerical|Not-in-family|White|  Male|       2174|          0|          40|United-States| <=50K|
| 50|Self-emp-not-inc|Bachelors|          13|  Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|          0|          0|          13|United-States| <=50K|
| 38|         Private|  HS-grad|           9|            Divorced|Handlers-cleaners|Not-in-family|White|  Male|          0|          0|          40|Un

In [11]:
# Replace `?` with NaN val

dataset = dataset.replace('?', None)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# Drop NaN  val

dataset = dataset.dropna(how='any')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# Observe the target column "Income" is a string

dataset.limit(10).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+----------------+---------+------------+--------------------+-----------------+-------------+-----+------+-----------+-----------+------------+-------------+------+
|Age|       WorkClass|Education|EducationNum|       MaritalStatus|       Occupation| Relationship| Race|Gender|CapitalGain|CapitalLoss|HoursPerWeek|NativeCountry|Income|
+---+----------------+---------+------------+--------------------+-----------------+-------------+-----+------+-----------+-----------+------------+-------------+------+
| 39|       State-gov|Bachelors|          13|       Never-married|     Adm-clerical|Not-in-family|White|  Male|       2174|          0|          40|United-States| <=50K|
| 50|Self-emp-not-inc|Bachelors|          13|  Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|          0|          0|          13|United-States| <=50K|
| 38|         Private|  HS-grad|           9|            Divorced|Handlers-cleaners|Not-in-family|White|  Male|          0|          0|          40|Un

In [14]:
# Convert target column (Better to do this w/o a transformer to avoid schema complications)
mapping= {
        '<=50K': '0',
        '>50K': '1'
    }
dataset = dataset.replace(to_replace=mapping, subset=['Income'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
dataset.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Age: string (nullable = true)
 |-- WorkClass: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- EducationNum: string (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Relationship: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- CapitalGain: string (nullable = true)
 |-- CapitalLoss: string (nullable = true)
 |-- HoursPerWeek: string (nullable = true)
 |-- NativeCountry: string (nullable = true)
 |-- Income: string (nullable = true)

In [16]:
# Convert string-encoded Num columns to numeric features

dataset = dataset.withColumn('Age', 
                             dataset['Age'].cast(DoubleType()))
dataset = dataset.withColumn('EducationNum', 
                             dataset['EducationNum'].cast(DoubleType()))
dataset = dataset.withColumn('CapitalGain', 
                             dataset['CapitalGain'].cast(DoubleType()))
dataset = dataset.withColumn('CapitalLoss', 
                             dataset['CapitalLoss'].cast(DoubleType()))
dataset = dataset.withColumn('HoursPerWeek', 
                             dataset['HoursPerWeek'].cast(DoubleType()))
dataset = dataset.withColumn('Income', 
                             dataset['Income'].cast(DoubleType()))

dataset.limit(10).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+----------------+---------+------------+--------------------+-----------------+-------------+-----+------+-----------+-----------+------------+-------------+------+
| Age|       WorkClass|Education|EducationNum|       MaritalStatus|       Occupation| Relationship| Race|Gender|CapitalGain|CapitalLoss|HoursPerWeek|NativeCountry|Income|
+----+----------------+---------+------------+--------------------+-----------------+-------------+-----+------+-----------+-----------+------------+-------------+------+
|39.0|       State-gov|Bachelors|        13.0|       Never-married|     Adm-clerical|Not-in-family|White|  Male|     2174.0|        0.0|        40.0|United-States|   0.0|
|50.0|Self-emp-not-inc|Bachelors|        13.0|  Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|        0.0|        0.0|        13.0|United-States|   0.0|
|38.0|         Private|  HS-grad|         9.0|            Divorced|Handlers-cleaners|Not-in-family|White|  Male|        0.0|        0.0|        4

In [17]:
# Validate the data types...

#dataset.schema.fields

# You can also do this with:
#dataset.dtypes

dataset.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Age: double (nullable = true)
 |-- WorkClass: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- EducationNum: double (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Relationship: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- CapitalGain: double (nullable = true)
 |-- CapitalLoss: double (nullable = true)
 |-- HoursPerWeek: double (nullable = true)
 |-- NativeCountry: string (nullable = true)
 |-- Income: double (nullable = true)

In [18]:
dataset.limit(10).show(n=1, truncate=False, vertical=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0----------------------
 Age           | 39.0          
 WorkClass     | State-gov     
 Education     | Bachelors     
 EducationNum  | 13.0          
 MaritalStatus | Never-married 
 Occupation    | Adm-clerical  
 Relationship  | Not-in-family 
 Race          | White         
 Gender        | Male          
 CapitalGain   | 2174.0        
 CapitalLoss   | 0.0           
 HoursPerWeek  | 40.0          
 NativeCountry | United-States 
 Income        | 0.0           
only showing top 1 row

In [19]:
# Data segragation

(trainingData, testData) = dataset.randomSplit([0.8,0.2], seed=42)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Feature Transformation

In [20]:
# Select categorical features

categoricalFeatures = [item[0] for item in dataset.dtypes if \
                     item[1].startswith('string')]

print(categoricalFeatures)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['WorkClass', 'Education', 'MaritalStatus', 'Occupation', 'Relationship', 'Race', 'Gender', 'NativeCountry']

In [21]:
# Create an array of StringIndexers to convert the categorical values to indices

indexers = [StringIndexer(
    inputCol=column, 
    outputCol=column + '_index', 
    handleInvalid='keep') for column in categoricalFeatures]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# Create an array of OneHotEncoders to encode the categorical values
# Using OneHotEncoderEstimator here because OneHotEncoder transformer is...
# ... not compatible with MLeap (model serialization) as of this time.
encoders = [OneHotEncoderEstimator(
    inputCols=[column + '_index'], 
    outputCols= [column + '_encoded']) for column in categoricalFeatures]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Select numeric features

numFeatures = [item[0] for item in dataset.dtypes \
               if item[1].startswith('double')]

# Only select features for scaling; not the label/target "Income"
numFeatures = numFeatures[:-1] 

print(numFeatures)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['Age', 'EducationNum', 'CapitalGain', 'CapitalLoss', 'HoursPerWeek']

In [24]:
# Num features must be converted to single feature vector before they can be scaled.

vectorFeatures = [VectorAssembler(inputCols=[col], \
                                  outputCol=col + "_vec") for col in numFeatures]

# Scale Num Vectors (better to also train models with and without scaling and observer performance)
numScaler = [MinMaxScaler(
    inputCol= column + "_vec", 
    outputCol= column + '_scaled') for column in numFeatures]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
# Create a pipeline to observe transformed features

pipeline = Pipeline(stages=indexers + encoders + vectorFeatures \
                    + numScaler)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
transformedDF = pipeline.fit(trainingData).transform(trainingData)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
transformedDF.limit(10).show(n=1, truncate=False, vertical=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0-------------------------------------
 Age                   | 17.0                 
 WorkClass             | Federal-gov          
 Education             | 11th                 
 EducationNum          | 7.0                  
 MaritalStatus         | Never-married        
 Occupation            | Adm-clerical         
 Relationship          | Not-in-family        
 Race                  | Black                
 Gender                | Female               
 CapitalGain           | 0.0                  
 CapitalLoss           | 1602.0               
 HoursPerWeek          | 40.0                 
 NativeCountry         | United-States        
 Income                | 0.0                  
 WorkClass_index       | 5.0                  
 Education_index       | 5.0                  
 MaritalStatus_index   | 1.0                  
 Occupation_index      | 3.0                  
 Relationship_index    | 1.0                  
 Race_index            | 1.0                  
 Gender_index

In [28]:
# Select required features

requiredFeatures = [
    'Age_scaled',
    'EducationNum_scaled',
    'CapitalGain_scaled',
    'CapitalLoss_scaled',
    'HoursPerWeek_scaled',
    'WorkClass_encoded',
    'Education_encoded',
    'MaritalStatus_encoded',
    'Occupation_encoded',
    'Relationship_encoded',
    'Race_encoded',
    'Gender_encoded',
    'NativeCountry_encoded'
]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
# Create a Vec assembler

assembler = VectorAssembler(inputCols=requiredFeatures, outputCol='features')

transformedDF = assembler.transform(transformedDF)

transformedDF.limit(10).show(n=1, truncate=False, vertical=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------
 Age                   | 17.0                                                                                                              
 WorkClass             | Federal-gov                                                                                                       
 Education             | 11th                                                                                                              
 EducationNum          | 7.0                                                                                                               
 MaritalStatus         | Never-married                                                                                                     
 Occupation            | Adm-clerical                                                                                                      
 Relationship       

In [30]:
transformedDF.select('features').limit(10).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|            features|
+--------------------+
|(103,[1,3,4,10,17...|
|(103,[1,4,7,17,29...|
|(103,[1,4,7,17,29...|
|(103,[1,4,7,17,29...|
|(103,[1,4,7,17,29...|
|(103,[1,4,7,17,29...|
|(103,[1,4,7,17,29...|
|(103,[1,4,7,17,29...|
|(103,[1,4,7,17,29...|
|(103,[1,4,7,17,29...|
+--------------------+

## Selecting Model Estimators

- [Binomial logistic regression](https://spark.apache.org/docs/latest/ml-classification-regression.html#binomial-logistic-regression) --> For predicting a binary class problem.

 - Assumption for selecting this estimator: To establish a baseline performance for the classifiers on the dataset.
 - Learn more about LR in the [docs](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.LogisticRegression.html).

- [Decision tree classifier](https://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-classifier): Decision trees are great for giving some boost in performance but also risk overfitting and are mostly unstable for production usage.

 - Learn more about this estimator in the [docs](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.DecisionTreeClassifier.html#pyspark.ml.classification.DecisionTreeClassifier).


- [Random forest classifier](https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier): To curb any potential for overfitting on the dataset, and it's a really good candidate for an optimal production model.
 - If we deploy this to production, it can be able to handle missing values from new data it needs to predict on.
 - Learn more about this estimator in the [docs](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.classification.RandomForestClassifier.html).


- (Optional) [Gradient-boosted tree classifier](https://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-classifier): An alternative to random forest, if it performs better.
 - Learn more about this estimator in the [docs](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.GBTClassifier.html).



In [31]:
# lr = LogisticRegression(labelCol='Income', \
#                         featuresCol='features')

# dt = DecisionTreeClassifier(labelCol="Income", \
#                             featuresCol="features")

# rf = RandomForestClassifier(labelCol='Income', 
#                              featuresCol='features')

gbt = GBTClassifier(labelCol="Income", featuresCol="features")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Combine the transformed features and model estimator into a single pipeline.

In [33]:


# pipeline_lr = Pipeline(
#     stages=indexers + encoders + vectorFeatures + numScaler \
#      + [assembler, lr]
# )

# pipeline_dt = Pipeline(
#     stages=indexers + encoders + vectorFeatures + numScaler \
#      + [assembler, dt]
# )

# pipeline_rf = Pipeline(
#     stages=indexers + encoders + vectorFeatures + numScaler \
#      + [assembler, rf]
# )

pipeline_gbt = Pipeline(
    stages=indexers + encoders + vectorFeatures + numScaler \
     + [assembler, gbt]
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Define a set of parameters to be used for each estimator. If you want to save time, you may want to use only one model, as running the set of parameters below can get pretty computer intensive and may cost you some more cents.


In [34]:

# paramGrid_lr = ParamGridBuilder().addGrid(
#     lr.maxIter, [10,50,100]).addGrid(
#     lr.regParam, [0.1, 0.3, 1.0]).addGrid(
#     lr.elasticNetParam, [0.0, 0.7, 1.0]).addGrid(
#     lr.threshold, [0.5, 0.75]
#     ).build()


# paramGrid_dt = ParamGridBuilder().addGrid(
#     dt.maxDepth, [5, 10, 15, 20]).build()


# paramGrid_rf = ParamGridBuilder().addGrid(
#     rf.maxDepth, [5, 10, 15]).addGrid(
#     rf.numTrees, [50, 100, 150]).build()


# paramGrid_gbt = ParamGridBuilder().addGrid(
#     gbt.maxDepth, [5, 10, 15]).addGrid(
#     gbt.maxIter, [20, 25, 50]).build()


# For the purposes of demonstration, and to save time,
# ... the following parameters are used instead.
# You may want to commit this out.
paramGrid_gbt = ParamGridBuilder().addGrid(
    gbt.maxDepth, [10]).addGrid(
    gbt.maxIter, [50]).build()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The class imbalance is not severe with this data, so accuracy can be used to evaluate the estimators.

In [35]:
# Define the estimator evaluators

evaluator = MulticlassClassificationEvaluator(
    labelCol='Income', 
    predictionCol='prediction', 
    metricName='accuracy')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
crossVal = CrossValidator(estimator=pipeline_gbt,
                          estimatorParamMaps=paramGrid_gbt,
                          evaluator=evaluator,
                          numFolds=3) # You can choose n>5

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Train the model with the CrossValidator estimator...

In [37]:
# Fit CrossVal estimator with the pipeline, parameters, and evaluator
start = time.time()
model_gbt = crossVal.fit(trainingData)
end = time.time()
print(end - start)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

587.4561674594879

Alternatively, the parameters can be defined directly within the model estimator `gbt = GBTClassifier(labelCol="Income", featuresCol="features", maxDepth=10, maxIter=50)` and the pipeline fitted directly on the training data.

In [None]:
## IF YOU ARE NOT USING CROSSVALIDATOR ##
# start = time.time()
# model = pipeline.fit(trainingData)
# end = time.time()
# print(end - start)


## Make predictions

In [38]:
# TRANSFORM WILL APPEND A NEW "prediction" column to the training/test data dataframe
predictions_train = model_gbt.transform(trainingData)
predictions = model_gbt.transform(testData)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
# Evaluate training and test performance for overfitting

accuracy_train = evaluator.evaluate(predictions_train)
accuracy_test = evaluator.evaluate(predictions)
print('Train Accuracy = ', accuracy_train)
print('Test Accuracy = ', accuracy_test)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Train Accuracy =  0.892994122678734
Test Accuracy =  0.8584790112459637

The model overfits on this run. Can be improved furthe, of course to combat overfitting. Perhaps reducing MaxDepth, adding regularizers, and other techniques.

In [56]:
# View new columns that have been appended.
predictions.columns 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['Age', 'WorkClass', 'Education', 'EducationNum', 'MaritalStatus', 'Occupation', 'Relationship', 'Race', 'Gender', 'CapitalGain', 'CapitalLoss', 'HoursPerWeek', 'NativeCountry', 'Income', 'WorkClass_index', 'Education_index', 'MaritalStatus_index', 'Occupation_index', 'Relationship_index', 'Race_index', 'Gender_index', 'NativeCountry_index', 'WorkClass_encoded', 'Education_encoded', 'MaritalStatus_encoded', 'Occupation_encoded', 'Relationship_encoded', 'Race_encoded', 'Gender_encoded', 'NativeCountry_encoded', 'Age_vec', 'EducationNum_vec', 'CapitalGain_vec', 'CapitalLoss_vec', 'HoursPerWeek_vec', 'Age_scaled', 'EducationNum_scaled', 'CapitalGain_scaled', 'CapitalLoss_scaled', 'HoursPerWeek_scaled', 'features', 'rawPrediction', 'probability', 'prediction']

## (Optional) Save model

Optionally save your model to a local directory or upload to an S3 bucket.

In [46]:
# import tempfile

# model = model_gbt.bestModel

# path = tempfile.mkdtemp()
# model_path = path + "/model"
# model.write().save(model_path)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Serialize the pipeline for deployment

Use [MLeap](https://github.com/combust/mleap) to serialize entire pipeline as it provides an easy-to-use Pipeline serialization format & execution engine for our online inference.

In [47]:
SimpleSparkSerializer().serializeToBundle(
    model, "jar:file:/tmp/model.zip", predictions
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

To deploy the model to SageMaker, it has to be present in tar.gz format, but MLeap produces the model zip format.

In [48]:
# Unzip the model artifacts and store it in tar.gz format.

import zipfile

with zipfile.ZipFile("/tmp/model.zip") as zf:
    zf.extractall("/tmp/model")

import tarfile

with tarfile.open("/tmp/model.tar.gz", "w:gz") as tar:
    tar.add("/tmp/model/bundle.json", arcname="bundle.json")
    tar.add("/tmp/model/root", arcname="root")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Upload to an S3 bucket... Change folder and file namesif needed.

In [59]:
s3 = boto3.resource("s3")
file_name = os.path.join("emr/sparkml-layer/mleap", "model.tar.gz")
s3.Bucket("layer-bootstrap-script").upload_file("/tmp/model.tar.gz", file_name)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Optionally, delete the model from local storage.

In [60]:
# os.remove("/tmp/model.zip")
# os.remove("/tmp/model.tar.gz")
# shutil.rmtree("/tmp/model")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Deploying the pipeline on AWS SageMaker

In [61]:
# Optional
# predictions.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Age: double (nullable = true)
 |-- WorkClass: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- EducationNum: double (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Relationship: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- CapitalGain: double (nullable = true)
 |-- CapitalLoss: double (nullable = true)
 |-- HoursPerWeek: double (nullable = true)
 |-- NativeCountry: string (nullable = true)
 |-- Income: double (nullable = true)
 |-- WorkClass_index: double (nullable = false)
 |-- Education_index: double (nullable = false)
 |-- MaritalStatus_index: double (nullable = false)
 |-- Occupation_index: double (nullable = false)
 |-- Relationship_index: double (nullable = false)
 |-- Race_index: double (nullable = false)
 |-- Gender_index: double (nullable = false)
 |-- NativeCountry_index: double (nullable = false)
 |-- WorkClass_encoded: vect

In [49]:
import json

# Define the raw data schema (which will be tranformed and fitted \
# by the pipeline in production)

schema = {
    "input": [
        {"name": "Age", "type": "double"},
        {"name": "WorkClass", "type": "string"},
        {"name": "Education", "type": "string"},
        {"name": "EducationNum", "type": "double"},
        {"name": "MaritalStatus", "type": "string"},
        {"name": "Occupation", "type": "string"},
        {"name": "Relationship", "type": "string"},
        {"name": "Race", "type": "string"},
        {"name": "Gender", "type": "string"},
        {"name": "CapitalGain", "type": "double"},
        {"name": "CapitalLoss", "type": "double"},
        {"name": "HoursPerWeek", "type": "double"},
        {"name": "NativeCountry", "type": "string"},
    ],
    "output": {"name": "prediction", "type": "double"},
}

# Convert to JSON object
schema_json = json.dumps(schema)
print(schema_json)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{"input": [{"name": "Age", "type": "double"}, {"name": "WorkClass", "type": "string"}, {"name": "Education", "type": "string"}, {"name": "EducationNum", "type": "double"}, {"name": "MaritalStatus", "type": "string"}, {"name": "Occupation", "type": "string"}, {"name": "Relationship", "type": "string"}, {"name": "Race", "type": "string"}, {"name": "Gender", "type": "string"}, {"name": "CapitalGain", "type": "double"}, {"name": "CapitalLoss", "type": "double"}, {"name": "HoursPerWeek", "type": "double"}, {"name": "NativeCountry", "type": "string"}], "output": {"name": "prediction", "type": "double"}}

In [50]:
# Set the region to deploy the pipeline on AWS

os.environ["AWS_DEFAULT_REGION"] = 'us-east-2'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
from time import gmtime, strftime
import time

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

import sagemaker
from sagemaker import get_execution_role
from sagemaker.sparkml.model import SparkMLModel

sess = sagemaker.Session()
role = get_execution_role()

# S3 location of where you uploaded your trained and serialized SparkML model
sparkml_data = "s3://{}/{}/{}".format(
    "layer-bootstrap-script", "emr/sparkml-layer/mleap", "model.tar.gz"
)
model_name = "sparkml-layer-" + timestamp_prefix
sparkml_model = SparkMLModel(
    model_data=sparkml_data,
    role=role,
    sagemaker_session=sess,
    name=model_name,
    # passing the schema defined above by using an environment
    # variable that sagemaker-sparkml-serving understands
    env={"SAGEMAKER_SPARKML_SCHEMA": schema_json},
)


endpoint_name = "sparkml-layer-ep-" + timestamp_prefix

# You can deploy to multiple instances for high availability and distributed ML
sparkml_model.deploy(
    initial_instance_count=1, instance_type="ml.c4.xlarge", endpoint_name=endpoint_name
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-------------!<sagemaker.sparkml.model.SparkMLPredictor object at 0x7ff611792518>

## Test the deployed pipeline

Send payload (data) to the pipeline endpoint in CSV format...

In [52]:
from sagemaker.predictor import (
    json_serializer,
    csv_serializer,
    json_deserializer,
    Predictor,
)

payload = "17.0, Local-gov, 10th, 6.0, Never-married, Protective-serv, \
            Own-child, White, Female, 0.0, 1602.0, 40.0, United-States"

predictor = Predictor(
    endpoint_name=endpoint_name,
    sagemaker_session=sess,
    serializer=csv_serializer,
#     deserializer=csv_serializer
#     content_type="text/csv",
#     accept="text/csv",
)
print(predictor.predict(payload))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

b'0.0'

Send payload (data) to the pipeline as a JSON object.

In [73]:
payload = {"data": [17.0, "Local-gov", "10th", 6.0, "Never-married", "Protective-serv", \
                    "Own-child", "White", "Female", 0.0, 1602.0, 40.0, "United-States"]}
predictor = Predictor(
    endpoint_name=endpoint_name,
    sagemaker_session=sess,
    serializer=json_serializer
)

print(predictor.predict(payload))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

b'0.0'

Send payload with schema changes to the data... The endpoint can overwrite existing schema and use the data schema from the client application.

In [74]:
payload = {
    "schema" : {
        "input": [
            {"name": "Age", "type": "double"},
            {"name": "WorkClass", "type": "string"},
            {"name": "Education", "type": "string"},
            {"name": "EducationNum", "type": "double"},
            {"name": "MaritalStatus", "type": "string"},
            {"name": "Occupation", "type": "string"},
            {"name": "Relationship", "type": "string"},
            {"name": "Race", "type": "string"},
            {"name": "Gender", "type": "string"},
            {"name": "CapitalGain", "type": "double"},
            {"name": "CapitalLoss", "type": "double"},
            {"name": "HoursPerWeek", "type": "double"},
            {"name": "NativeCountry", "type": "string"},
        ],
        "output": {"name": "prediction", "type": "double"},
    },
    "data": [17.0, "Local-gov", "10th", 6.0, "Never-married", "Protective-serv", \
                    "Own-child", "White", "Female", 0.0, 1602.0, 40.0, "United-States"],
}

predictor = Predictor(
    endpoint_name=endpoint_name,
    sagemaker_session=sess,
    serializer=json_serializer
)

print(predictor.predict(payload))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

b'0.0'

Learn how to invoke the endpoint from your application, backend server, or computer in [this reference](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_runtime_InvokeEndpoint.html). 