# SageMaker PySpark Custom Estimator MNIST Example

1. [Introduction](#Introduction)
2. [Setup](#Setup)
3. [Loading the Data](#Loading-the-Data)
4. [Create a custom SageMakerEstimator](#Create-a-custom-SageMakerEstimator)
5. [Inference](#Inference)
6. [Clean-up](#Clean-up)
7. [More on SageMaker Spark](#More-on-SageMaker-Spark)

## Introduction

- Use MNIST data again
- Bring a custom container (PyTorch)
- Showcases the flexibility of integrating Spark with deep learning workloads
- Add `AmazonEC2ContainerRegistryFullAccess` to your notebook instance IAM role before running this

## Setup

First, we import the necessary modules and create the `SparkSession` with the SageMaker-Spark dependencies attached. 

In [1]:
import os
import boto3

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

import sagemaker
from sagemaker import get_execution_role
import sagemaker_pyspark

role = get_execution_role()

# Configure Spark to use the SageMaker Spark dependency jars
jars = sagemaker_pyspark.classpath_jars()

classpath = ":".join(sagemaker_pyspark.classpath_jars())

# See the SageMaker Spark Github to learn how to connect to EMR from a notebook instance
spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath)\
    .master("local[*]").getOrCreate()
    
spark

## Bringing your own container

Let's start from SageMaker's PyTorch container.

In [57]:
!cat container/Dockerfile

# Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

# For more information on creating a Dockerfile
# https://docs.docker.com/compose/gettingstarted/#step-2-create-a-dockerfile
# https://github.com/awslabs/amazon-sagemaker-examples/master/advanced_functionality/pytorch_extending_our_containers/pytorch_extending_our_containers.ipynb
# SageMaker PyTorch image
FROM 520713654638.dkr.ecr.us-west-2.amazonaws.com/sagemaker-pytorch:0.4.0-cpu-py3

RUN pip install sci

Build the container on our notebook instance and publish to ECR.

In [20]:
%%sh

# The name of our algorithm
algorithm_name=pytorch-mnist-byo

cd container

account=$(aws sts get-caller-identity --query Account --output text)

# Get the region defined in the current configuration (default to us-west-2 if none defined)
region=$(aws configure get region)
region=${region:-us-west-2}

fullname="${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:latest"

# If the repository doesn't exist in ECR, create it.

aws ecr describe-repositories --repository-names "${algorithm_name}" > /dev/null 2>&1

if [ $? -ne 0 ]
then
    aws ecr create-repository --repository-name "${algorithm_name}" > /dev/null
fi

# Get the login command from ECR and execute it directly
$(aws ecr get-login --region ${region} --no-include-email)

# Get the login command from ECR in order to pull down the SageMaker PyTorch image
$(aws ecr get-login --registry-ids 520713654638 --region ${region} --no-include-email)

# Build the docker image locally with the image name and then push it to ECR
# with the full name.

docker build  -t ${algorithm_name} .
docker tag ${algorithm_name} ${fullname}

docker push ${fullname}

Login Succeeded
Login Succeeded
Sending build context to Docker daemon    291MB
Step 1/7 : FROM 520713654638.dkr.ecr.us-west-2.amazonaws.com/sagemaker-pytorch:0.4.0-cpu-py3
 ---> 87fbc61d071a
Step 2/7 : RUN pip install scikit-learn
 ---> Using cache
 ---> c6d27f949294
Step 3/7 : RUN pip install scipy
 ---> Using cache
 ---> e2905b056794
Step 4/7 : ENV PATH="/opt/ml/code:${PATH}"
 ---> Using cache
 ---> 8e6e9db086b1
Step 5/7 : COPY /mnist /opt/ml/code
 ---> 57bad9e9b1eb
Step 6/7 : ENV SAGEMAKER_SUBMIT_DIRECTORY /opt/ml/code
 ---> Running in 841b78a58e3d
Removing intermediate container 841b78a58e3d
 ---> 72bf3d8d2e89
Step 7/7 : ENV SAGEMAKER_PROGRAM mnist.py
 ---> Running in d0fb74851ca2
Removing intermediate container d0fb74851ca2
 ---> bfe12e870d9c
Successfully built bfe12e870d9c
Successfully tagged pytorch-mnist-byo:latest
The push refers to repository [288940994276.dkr.ecr.us-west-2.amazonaws.com/pytorch-mnist-byo]
f26db7e52786: Preparing
3b2fd6bed648: Preparing
226199250d1d: Prepa



## Loading the Data

Now, we load the MNIST dataset into a Spark Dataframe, which dataset is available in LibSVM format at

`s3://sagemaker-sample-data-[region]/spark/mnist/`

where `[region]` is replaced with a supported AWS region, such as us-east-1.

In order to train and make inferences our input DataFrame must have a column of Doubles (named "label" by default) and a column of Vectors of Doubles (named "features" by default).

Spark's LibSVM DataFrameReader loads a DataFrame already suitable for training and inference.

Here, we load into a DataFrame in the SparkSession running on the local Notebook Instance, but you can connect your Notebook Instance to a remote Spark cluster for heavier workloads. Starting from EMR 5.11.0, SageMaker Spark is pre-installed on EMR Spark clusters. For more on connecting your SageMaker Notebook Instance to a remote EMR cluster, please see [this blog post](https://aws.amazon.com/blogs/machine-learning/build-amazon-sagemaker-notebooks-backed-by-spark-in-amazon-emr/).

In [11]:
import boto3

region = boto3.Session().region_name

trainingData = spark.read.format('libsvm')\
    .option('numFeatures', '784')\
    .option('vectorType', 'dense')\
    .load('s3a://sagemaker-sample-data-{}/spark/mnist/train/'.format(region))

testData = spark.read.format('libsvm')\
    .option('numFeatures', '784')\
    .option('vectorType', 'dense')\
    .load('s3a://sagemaker-sample-data-{}/spark/mnist/test/'.format(region))

trainingData.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  5.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  4.0|[0.0,0.0,0.0,0.0,...|
|  1.0|[0.0,0.0,0.0,0.0,...|
|  9.0|[0.0,0.0,0.0,0.0,...|
|  2.0|[0.0,0.0,0.0,0.0,...|
|  1.0|[0.0,0.0,0.0,0.0,...|
|  3.0|[0.0,0.0,0.0,0.0,...|
|  1.0|[0.0,0.0,0.0,0.0,...|
|  4.0|[0.0,0.0,0.0,0.0,...|
|  3.0|[0.0,0.0,0.0,0.0,...|
|  5.0|[0.0,0.0,0.0,0.0,...|
|  3.0|[0.0,0.0,0.0,0.0,...|
|  6.0|[0.0,0.0,0.0,0.0,...|
|  1.0|[0.0,0.0,0.0,0.0,...|
|  7.0|[0.0,0.0,0.0,0.0,...|
|  2.0|[0.0,0.0,0.0,0.0,...|
|  8.0|[0.0,0.0,0.0,0.0,...|
|  6.0|[0.0,0.0,0.0,0.0,...|
|  9.0|[0.0,0.0,0.0,0.0,...|
+-----+--------------------+
only showing top 20 rows



MNIST images are 28x28, resulting in 784 pixels. The dataset consists of images of digits going from 0 to 9, representing 10 classes. 

In each row:
* The `label` column identifies the image's label. For example, if the image of the handwritten number is the digit 5, the label value is 5.
* The `features` column stores a vector (`org.apache.spark.ml.linalg.Vector`) of `Double` values. The length of the vector is 784, as each image consists of 784 pixels. Those pixels are the features we will use. 



As we are interested in clustering the images of digits, the number of pixels represents the feature vector, while the number of classes represents the number of clusters we want to find. 

## Training

### PyTorch script

We've added a custom PyTorch training script to our container.

In [85]:
!pygmentize container/mnist/mnist.py

[34mimport[39;49;00m [04m[36margparse[39;49;00m
[34mimport[39;49;00m [04m[36mjson[39;49;00m
[34mimport[39;49;00m [04m[36mlogging[39;49;00m
[34mimport[39;49;00m [04m[36mos[39;49;00m
[34mimport[39;49;00m [04m[36msagemaker_containers[39;49;00m
[34mimport[39;49;00m [04m[36msys[39;49;00m
[34mimport[39;49;00m [04m[36mtorch[39;49;00m
[34mimport[39;49;00m [04m[36mtorch.distributed[39;49;00m [34mas[39;49;00m [04m[36mdist[39;49;00m
[34mimport[39;49;00m [04m[36mtorch.nn[39;49;00m [34mas[39;49;00m [04m[36mnn[39;49;00m
[34mimport[39;49;00m [04m[36mtorch.nn.functional[39;49;00m [34mas[39;49;00m [04m[36mF[39;49;00m
[34mimport[39;49;00m [04m[36mtorch.optim[39;49;00m [34mas[39;49;00m [04m[36moptim[39;49;00m
[34mimport[39;49;00m [04m[36mtorch.utils.data[39;49;00m
[34mimport[39;49;00m [04m[36mtorch.utils.data.distributed[39;49;00m
[34mfrom[39;49;00m [04m[36mtorchvision[39;49;00m [34mimport[39;49;00m d

            [34mif[39;49;00m batch_idx % args.log_interval == [34m0[39;49;00m:
                logger.info([33m'[39;49;00m[33mTrain Epoch: {} [{}/{} ({:.0f}[39;49;00m[33m%[39;49;00m[33m)] Loss: {:.6f}[39;49;00m[33m'[39;49;00m.format(
                    epoch, batch_idx * [36mlen[39;49;00m(data), [36mlen[39;49;00m(train_loader.sampler),
                    [34m100.[39;49;00m * batch_idx / [36mlen[39;49;00m(train_loader), loss.item()))
    save_model(model, args.model_dir)


[34mdef[39;49;00m [32mmodel_fn[39;49;00m(model_dir):
    device = torch.device([33m"[39;49;00m[33mcuda[39;49;00m[33m"[39;49;00m [34mif[39;49;00m torch.cuda.is_available() [34melse[39;49;00m [33m"[39;49;00m[33mcpu[39;49;00m[33m"[39;49;00m)
    model = torch.nn.DataParallel(Net())
    [34mwith[39;49;00m [36mopen[39;49;00m(os.path.join(model_dir, [33m'[39;49;00m[33mmodel.pth[39;49;00m[33m'[39;49;00m), [33m'[39;49;00m[33mrb[39;49;00m[33m'[39;49;00m) [3

### Custom SageMakerEstimator

SageMaker-Spark provides several classes that extend SageMakerEstimator to use SageMaker-provided algorithms, like `KMeansSageMakerEstimator` to run the SageMaker-provided K-Means algorithm. These classes are `SageMakerEstimator` with certain default values passed in. You can use SageMaker-Spark with any algorithm (provided by Amazon or your own model) that runs on Amazon SageMaker by creating a `SageMakerEstimator`.

This example uses our own PyTorch container setup to run MNIST.  We'll limit to training only for simplicity.

In [62]:
account = boto3.client('sts').get_caller_identity()['Account']
region = boto3.Session().region_name

In [83]:
from sagemaker_pyspark import SageMakerEstimator
from sagemaker_pyspark import IAMRole
from sagemaker_pyspark import RandomNamePolicyFactory
from sagemaker_pyspark import EndpointCreationPolicy
from sagemaker_pyspark.transformation.serializers import LibSVMRequestRowSerializer
from sagemaker_pyspark.transformation.deserializers import XGBoostCSVRowDeserializer

# Create an Estimator from scratch
estimator = SageMakerEstimator(
    trainingImage='{}.dkr.ecr.{}.amazonaws.com/pytorch-mnist-byo:latest'.format(account, region),
    modelImage='{}.dkr.ecr.{}.amazonaws.com/pytorch-mnist-byo:latest'.format(account, region),
    sagemakerRole=IAMRole(role),
    trainingInstanceType="ml.m4.xlarge", # ml.p3.2xlarge could be better, but you might not have the limits for it
    trainingInstanceCount=1,
    trainingChannelName="training",
    trainingSparkDataFormat="libsvm",
    trainingSparkDataFormatOptions=None,
    trainingCompressionCodec=None,
    hyperParameters={"epochs": "6", "backend": "gloo"},
    endpointCreationPolicy=EndpointCreationPolicy.DO_NOT_CREATE,
    endpointInstanceType=None,
    endpointInitialInstanceCount=0,
    requestRowSerializer=None,
    responseRowDeserializer=None,
    namePolicyFactory=RandomNamePolicyFactory("sparksm-6-")
    )

The main parts of a `SageMakerEstimator` are:
* `trainingImage`: the Docker Registry path where the training image is hosted - can be a custom Docker image hosting your own model, or one of the [Amazon provided images](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html)
* `modelImage`: the Docker Registry path where the inference image is used - can be a custom Docker image hosting your own model, or one of the [Amazon provided images](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html)
* `hyperparameters`: the hyper-parameters of the algorithm being created - the values passed in need to be of type string

Let's train this estimator by calling fit on it with the training data. Please note the below code will take several minutes to run and create all the resources needed for this model. 

In [22]:
customModel = estimator.fit(trainingData)

Confirm our job ran correctly.

In [56]:
job_name = customModel.modelPath.objectPath.split('/')[1]
boto3.client('sagemaker').describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']

'Completed'

## More on SageMaker Spark

The SageMaker Spark Github repository has more about SageMaker Spark, including how to use SageMaker Spark using the Scala SDK: https://github.com/aws/sagemaker-spark
