# Build Machine Learning Pipeline using SageMaker, EMR, S3, ECS, API Gateway and Lambda 




Preprocess data, build, train and deploy a Spark pipeline:

Let's start by adding few required policies to this notebook instance. Refer "Step-1: Setup permission" of [this documentation](https://collaborate-corp.amazon.com/nuxeo/nxpath/default/default-domain/workspaces/ProServe%20Goldemine%20Workspace/Tutorial%3A%20Build%20an%20End%20t@view_documents?tabIds=MAIN_TABS%3Adocuments%2C%3A&conversationId=0NXMAIN1) incase you need help in adding policies to the notebook. Add `AmazonElasticMapReduceFullAccess`, `AmazonSageMakerFullAccess` and `AmazonS3FullAccess` if they're not already added. 

Now, Let's define few variables. We need a variable to save the name of the s3 bucket which contains the bootstrap scripts and datasets, and another variable to save the name of the notebook instance.

In [None]:
%%local
import os

os.environ['BUCKET'] =  '<bucket name here>'
os.environ['NOTEBOOK_NAME'] =  '<input your notebook instance name here>'

Before we spin the emr spark cluster, we'll save the mleap bootstrap scripts to a s3 bucket which we'll use it while spining up the cluster to install mleap into our clusters. Mleap is a serializer and an execution engine for ML pipelines. Here, we'll train the pipeline in emr spark and export it to mleap bundle. Here's a link if you want to learn more about mleap, https://github.com/combust/mleap

We'll extract the *Subnet*, *Security group* details from the current notebook instance. We'll make sure the emr is spun in the same VPC, subnet and security group as this notebook with spark, livy, boto3 and mleap.

After the emr cluster is ready, we'll copy the `private IP` of the master node and change it in the notebook's *config.json* file place of 'localhost'. At this point, your notebook should be connected to your EMR.

Lastly, we'll save the dataset to the S3 bucket and we'll use it later for model training.

Note: If you get "The supplied ecSubnetId is not valid" error, this could be due to missing network information while spinning up the SageMaker notebook insatnce. Refer to [this documentation](https://collaborate-corp.amazon.com/nuxeo/nxpath/default/default-domain/workspaces/ProServe%20Goldemine%20Workspace/Tutorial%3A%20Build%20an%20End%20t.1546036770584@view_documents?tabIds=MAIN_TABS%3Adocuments%2C%3A&conversationId=0NXMAIN1) and follow the instructions to set up this notebook instance correctly.

In [None]:
%%bash
echo \#!/bin/bash > emr_model_bootstrap.sh
echo sudo pip install boto3 mleap >> emr_model_bootstrap.sh
echo wget http://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/jar/mleap_spark_assembly.jar >> emr_model_bootstrap.sh
echo sudo mkdir -p /usr/lib/spark/jars >> emr_model_bootstrap.sh
echo sudo mv mleap_spark_assembly.jar /usr/lib/spark/jars >> emr_model_bootstrap.sh

aws s3 cp emr_model_bootstrap.sh s3://$BUCKET/scripts/

# Save bucket info for after restart
echo $BUCKET > bucket.tmp

# parse the SUBNET, Security group info from the notebook instance
SUBNET=$(aws sagemaker describe-notebook-instance --notebook-instance-name $NOTEBOOK_NAME | jq -r .SubnetId)

SLAVE_SECURITY_GROUP=$(aws ec2 describe-security-groups --group-names ElasticMapReduce-slave | jq -r .SecurityGroups[0].GroupId)
MASTER_SECURITY_GROUP=$(aws ec2 describe-security-groups --group-names ElasticMapReduce-master | jq -r .SecurityGroups[0].GroupId)


# Create EMR Cluster
CLUSTER_ID=$(aws emr create-cluster --name emr-model-demo --release-label emr-5.10.0 \
    --applications Name=SPARK Name=LIVY  --instance-type m4.xlarge --instance-count 3 \
    --ebs-root-volume-size 100 \
    --ec2-attributes '{"InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"'$SUBNET'","EmrManagedSlaveSecurityGroup":"'$SLAVE_SECURITY_GROUP'","EmrManagedMasterSecurityGroup":"'$MASTER_SECURITY_GROUP'"}' \
    --service-role EMR_DefaultRole --bootstrap Path=s3://$BUCKET/scripts/emr_model_bootstrap.sh | jq -r .ClusterId)

echo Cluster ID is... $CLUSTER_ID

# Wait for Cluster Ready
CLUSTER_STATUS=$(aws emr describe-cluster --cluster-id $CLUSTER_ID | jq -r .Cluster.Status.State)
while [ $CLUSTER_STATUS != 'WAITING' ]; do
  CLUSTER_STATUS=$(aws emr describe-cluster --cluster-id $CLUSTER_ID | jq -r .Cluster.Status.State)
  echo $CLUSTER_STATUS
  sleep 20
done

privateip=$(aws emr list-instances --cluster-id $CLUSTER_ID --instance-group-types MASTER | jq -r .Instances[0].PrivateIpAddress)

cd .sparkmagic

wget https://raw.githubusercontent.com/jupyter-incubator/sparkmagic/master/sparkmagic/example_config.json
mv example_config.json config.json

sed -i "s/localhost/$privateip/g" config.json

# download and save the dataset to s3
wget https://archive.ics.uci.edu/ml/machine-learning-databases/car/car.data

aws s3 cp car.data s3://$BUCKET/data/

## Restart Notebook

After the cluster is up and running in EMR, you need to restart the notebook kernel in order for Sparkmagic to reload the config settings and connect to the cluster over Livy. You can do this by clicking `Kernel` and then `Restart` in the notebook. ***Warning***: You should _not_ run the cells above this point again after restarting the notebook. Just continue from this point.


Begin by running the following cell to confirm that this notebook is connected to EMR:

In [None]:
%%info

Obtain the bucket and notebook names from the temporary file.

In [None]:
%%local
import os

# Obtain the bucket from the temporary file
with open('bucket.tmp', 'r') as f:
    os.environ['BUCKET'] = f.read().replace('\n', '')
    os.environ['NOTEBOOK_NAME'] = f.read().replace('\n', '') #'podk-sm-emr'

print('Please set this value in the next cell in order to send the bucket to the Spark cluster:\n{}'.format(os.environ['BUCKET']))

Now you've your bucket name here, set the same bucket name again in spark.

In [None]:
bucket_name = '<bucket name here>'


Let's import neccessary dependencies.


In [None]:
from __future__ import print_function

import time
import sys
import os
import shutil
import csv
import boto3

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import *
from pyspark.ml.classification import RandomForestClassifier
from mleap.pyspark.spark_support import SimpleSparkSerializer


Once we've the s3 bucket name saved, we'll define few s3 prefixes inside this bucket. One for input data location where the dataset will reside, one for an output bucket prefix where output of inference will be saved and one for model bucket prefix where model artifacts will be saved. 

We'll start with reading the data from S3. In the car dataset, the target variable i.e. car acceptibility is a string. We'll convert it to a number and split the data into train and validate dataset. We'll then build a random forest model and a pipeline with preprocess steps and the model. Let's use this pipeline to train the model. 

Save the transformed training and validation data to two separate CSVs in S3.

Serialize the trained model and store via Mleap bundle, then convert it to .tar.gz file since SageMaker expects that format. We'll then do the same for postprocess steps. We'll use an inference pipeline for inference on data. 



In [None]:
def toCSVLine(data):
    r = ','.join(str(d) for d in data[1])
    return str(data[0]) + "," + r


# Set S3 bucket locations as variables
args = {
    's3_input_data_location' : 's3://{}/data/car.data'.format(bucket_name), 
    's3_output_bucket': bucket_name,  
    's3_output_bucket_prefix': 'output' , 
    's3_model_bucket' : bucket_name,  
    's3_model_bucket_prefix' : 'model'}


# This is needed to write RDDs to file which is the only way to write nested Dataframes into CSV.
spark.sparkContext._jsc.hadoopConfiguration().set("mapred.output.committer.class",
                                                  "org.apache.hadoop.mapred.FileOutputCommitter")

train = spark.read.csv(args['s3_input_data_location'], header=False)


oldColumns = train.schema.names
newColumns = ['buying', 'maint', 'doors', 'persons', 'lug_boot', 'safety', 'cat']

train = reduce(lambda train, idx: train.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), train)

# dropping null values
train = train.dropna()

# Target label
catIndexer = StringIndexer(inputCol="cat", outputCol="label")

labelIndexModel = catIndexer.fit(train)
train = labelIndexModel.transform(train)

converter = IndexToString(inputCol="label", outputCol="cat")

# Spliting in train and test set. Beware : It sorts the dataset
(traindf, validationdf) = train.randomSplit([0.8, 0.2])

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
buyingIndexer = StringIndexer(inputCol="buying", outputCol="indexedBuying")
maintIndexer = StringIndexer(inputCol="maint", outputCol="indexedMaint")
doorsIndexer = StringIndexer(inputCol="doors", outputCol="indexedDoors")
personsIndexer = StringIndexer(inputCol="persons", outputCol="indexedPersons")
lug_bootIndexer = StringIndexer(inputCol="lug_boot", outputCol="indexedLug_boot")
safetyIndexer = StringIndexer(inputCol="safety", outputCol="indexedSafety")


# One Hot Encoder on indexed features
buyingEncoder = OneHotEncoder(inputCol="indexedBuying", outputCol="buyingVec")
maintEncoder = OneHotEncoder(inputCol="indexedMaint", outputCol="maintVec")
doorsEncoder = OneHotEncoder(inputCol="indexedDoors", outputCol="doorsVec")
personsEncoder = OneHotEncoder(inputCol="indexedPersons", outputCol="personsVec")
lug_bootEncoder = OneHotEncoder(inputCol="indexedLug_boot", outputCol="lug_bootVec")
safetyEncoder = OneHotEncoder(inputCol="indexedSafety", outputCol="safetyVec")


# Create the vector structured data (label,features(vector))
assembler = VectorAssembler(inputCols=["buyingVec", "maintVec", "doorsVec", "personsVec", "lug_bootVec", "safetyVec"], outputCol="features")

# rf model 
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)

# Chain featurizers in a Pipeline
pipeline = Pipeline(stages=[buyingIndexer, maintIndexer, doorsIndexer, personsIndexer, lug_bootIndexer, safetyIndexer, buyingEncoder, maintEncoder, doorsEncoder, personsEncoder, lug_bootEncoder, safetyEncoder, assembler, rf])


# Train model.  This also runs the indexers.
model = pipeline.fit(traindf)

# Delete previous data from output
s3 = boto3.resource('s3')
bucket = s3.Bucket(args['s3_output_bucket'])

bucket.objects.filter(Prefix=args['s3_output_bucket_prefix']).delete()    

# Save transformed training data to CSV in S3 by converting to RDD.
transformed_traindf = model.transform(traindf)
transformed_train_rdd = transformed_traindf.rdd.map(lambda x: (x.label, x.features))
lines = transformed_train_rdd.map(toCSVLine)
lines.saveAsTextFile('s3a://' + args['s3_output_bucket'] + '/' +args['s3_output_bucket_prefix'] + '/' + 'train')

# Similar data processing for validation dataset.
predictions = model.transform(validationdf)
transformed_train_rdd = predictions.rdd.map(lambda x: (x.label, x.features))
lines = transformed_train_rdd.map(toCSVLine)
lines.saveAsTextFile('s3a://' + args['s3_output_bucket'] + '/' +args['s3_output_bucket_prefix'] + '/' + 'validation')

# Serialize and store via MLeap  
SimpleSparkSerializer().serializeToBundle(model, "jar:file:/tmp/model.zip", predictions)

# Unzipping as SageMaker expects a .tar.gz file but MLeap produces a .zip file.
import zipfile
with zipfile.ZipFile("/tmp/model.zip") as zf:
    zf.extractall("/tmp/model")

## Writing back the content as a .tar.gz file
import tarfile
with tarfile.open("/tmp/model.tar.gz", "w:gz") as tar:
    tar.add("/tmp/model/bundle.json", arcname='bundle.json')
    tar.add("/tmp/model/root", arcname='root')

s3 = boto3.resource('s3')
file_name = args['s3_model_bucket_prefix'] + '/' + 'model.tar.gz'
s3.Bucket(args['s3_model_bucket']).upload_file('/tmp/model.tar.gz', file_name)

os.remove('/tmp/model.zip')
os.remove('/tmp/model.tar.gz')
shutil.rmtree('/tmp/model')

# Save postprocessor          
SimpleSparkSerializer().serializeToBundle(converter, "jar:file:/tmp/postprocess.zip", predictions)

with zipfile.ZipFile("/tmp/postprocess.zip") as zf:
    zf.extractall("/tmp/postprocess")

# Writing back the content as a .tar.gz file
import tarfile
with tarfile.open("/tmp/postprocess.tar.gz", "w:gz") as tar:
    tar.add("/tmp/postprocess/bundle.json", arcname='bundle.json')
    tar.add("/tmp/postprocess/root", arcname='root')

file_name = args['s3_model_bucket_prefix'] + '/' + 'postprocess.tar.gz'
s3.Bucket(args['s3_model_bucket']).upload_file('/tmp/postprocess.tar.gz', file_name)

os.remove('/tmp/postprocess.zip')
os.remove('/tmp/postprocess.tar.gz')
shutil.rmtree('/tmp/postprocess')
    


Create a sagemaker session and create sagemaker endpoint with pipeline.

In [None]:
%%local

import boto3
import botocore
from botocore.exceptions import ClientError

from sagemaker import Session as Sess

# SageMaker session
sess = Sess()

# Boto3 session   
session = boto3.session.Session()
region = session.region_name

from sagemaker import get_execution_role

role = get_execution_role()


In [None]:
%%local
### Create SageMaker endpoint with pipeline
from botocore.exceptions import ClientError

sagemaker = boto3.client('sagemaker')

bucket_name = os.environ['BUCKET']

# Image locations are published at: https://github.com/aws/sagemaker-sparkml-serving-container
sparkml_images = {
    'us-west-1': '746614075791.dkr.ecr.us-west-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'us-west-2': '246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'us-east-1': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'us-east-2': '257758044811.dkr.ecr.us-east-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-northeast-1': '354813040037.dkr.ecr.ap-northeast-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-northeast-2': '366743142698.dkr.ecr.ap-northeast-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-southeast-1': '121021644041.dkr.ecr.ap-southeast-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-southeast-2': '783357654285.dkr.ecr.ap-southeast-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-south-1': '720646828776.dkr.ecr.ap-south-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'eu-west-1': '141502667606.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'eu-west-2': '764974769150.dkr.ecr.eu-west-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'eu-central-1': '492215442770.dkr.ecr.eu-central-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ca-central-1': '341280168497.dkr.ecr.ca-central-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'us-gov-west-1': '414596584902.dkr.ecr.us-gov-west-1.amazonaws.com/sagemaker-sparkml-serving:2.2'
}

model_data_url_0 = 's3://{}/model/model.tar.gz'.format(bucket_name)
model_data_url_1 = 's3://{}/model/postprocess.tar.gz'.format(bucket_name)

try:
    sparkml_image = sparkml_images[region]

    response = sagemaker.create_model(
        ModelName='pipeline-rf',
        Containers=[
            {
                'Image': sparkml_image,
                'ModelDataUrl': model_data_url_0,
                'Environment': {
                    'SAGEMAKER_SPARKML_SCHEMA': '{"input":[{"type":"string","name":"buying"},{"type":"string","name":"maint"},{"type":"string","name":"doors"},{"type":"string","name":"persons"},{"type":"string","name":"lug_boot"},{"type":"string","name":"safety"}],"output":{"type":"double","name":"features","struct":"vector"}}'
                }
            },
            {
                'Image': sparkml_image,
                'ModelDataUrl': model_data_url_1,
                'Environment': {
                    'SAGEMAKER_SPARKML_SCHEMA': '{"input": [{"type": "double", "name": "label"}], "output": {"type": "string", "name": "cat"}}'
                }

            },
        ],
        ExecutionRoleArn=role
    )

    print('{}\n'.format(response))
    
except ClientError as e:
    print(e)


try:
    response = sagemaker.create_endpoint_config(
        EndpointConfigName='pipeline-rf',
        ProductionVariants=[
            {
                'VariantName': 'DefaultVariant',
                'ModelName': 'pipeline-rf',
                'InitialInstanceCount': 1,
                'InstanceType': 'ml.m4.xlarge',
            },
        ],
    )
    print('{}\n'.format(response))

except ClientError as e:
    print(e)


try:
    response = sagemaker.create_endpoint(
        EndpointName='pipeline-rf',
        EndpointConfigName='pipeline-rf',
    )
    print('{}\n'.format(response))

except ClientError as e:
    print(e)

import time
    

# Monitor the status until completed
endpoint_status = sagemaker.describe_endpoint(EndpointName='pipeline-rf')['EndpointStatus']
while endpoint_status not in ('OutOfService','InService','Failed'):
    endpoint_status = sagemaker.describe_endpoint(EndpointName='pipeline-rf')['EndpointStatus']
    print(endpoint_status)
    time.sleep(30)

Once the model endpoint is created successfully, we can start predicting. Let's test this with a sample payload eg: ['vhigh','vhigh',2,2,'small','low']

In [None]:
%%local
runtime = boto3.client('sagemaker-runtime')

response = runtime.invoke_endpoint(EndpointName='pipeline-rf',
    Body=b'vhigh,vhigh,2,2,small,low',
    ContentType='text/csv',
)

print(response['Body'].read())

Congratulations! We've successfully trained a model in EMR spark, and deployed it in sagemaker endpoint with an inference pipeline. Using the sagemaker endpoint we were able to run inference. 

Now, let's delete all the resources we created for this excercise. 

# Clean Up 

In [None]:
%%local
os.environ['SAGEMAKER_MODEL_NAME'] = 'pipeline-rf'


In [None]:
%%bash

aws sagemaker delete-endpoint --endpoint-name $SAGEMAKER_MODEL_NAME
aws sagemaker delete-endpoint-config --endpoint-config-name $SAGEMAKER_MODEL_NAME
aws sagemaker delete-model --model-name $SAGEMAKER_MODEL_NAME
