# MNIST Training using PyTorch and Horovod

## Contents

1. [Background](#Background)
1. [Setup](#Setup)
1. [Data](#Data)
1. [Train](#Train)
1. [Host](#Host)

---

## Background

Horovod is a distributed deep learning training framework for TensorFlow, Keras, PyTorch, and MXNet. This notebook example shows how to use Horovod with PyTorch in SageMaker using MNIST dataset.

For more information about the PyTorch in SageMaker, please visit [sagemaker-pytorch-containers](https://github.com/aws/sagemaker-pytorch-containers) and [sagemaker-python-sdk](https://github.com/aws/sagemaker-python-sdk) github repositories.


---

## Setup

_This notebook was created and tested on an ml.p2.xlarge notebook instance._

Let's start by creating a SageMaker session and specifying:

- The S3 bucket and prefix that you want to use for training and model data.  This should be within the same region as the Notebook Instance, training, and hosting.
- The IAM role arn used to give training and hosting access to your data. See the [Amazon SageMaker Roles](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-roles.html) for how to create these.  Note, if more than one role is required for notebook instances, training, and/or hosting, please replace the `sagemaker.get_execution_role()` with the appropriate full IAM role arn string(s).


In [None]:
from datetime import datetime

start_time = datetime.now()

In [None]:
import sagemaker
import time
import numpy as np
import boto3
import pandas as pd

from torchvision import datasets, transforms
from sagemaker.pytorch import PyTorch
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner
from IPython.display import HTML

sagemaker_session = sagemaker.Session()

bucket = 'sagemaker-pytorch-dist-demo' # or use `sagemaker_session.default_bucket()`
prefix = 'sagemaker/DEMO-pytorch-mnist'

role = sagemaker.get_execution_role()

## Data
### Getting the data

In this example, we will ues MNIST dataset. MNIST is a widely used dataset for handwritten digit classification. It consists of 70,000 labeled 28x28 pixel grayscale images of hand-written digits. The dataset is split into 60,000 training images and 10,000 test images. There are 10 classes (one for each of the 10 digits).

In [None]:
data_start_time = datetime.now()

In [None]:
datasets.MNIST('data', download=True, transform=transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
]))

### Uploading the data to S3
We are going to use the `sagemaker.Session.upload_data` function to upload our datasets to an S3 location. The return value inputs identifies the location -- we will use later when we start the training job.


In [None]:
inputs = sagemaker_session.upload_data(path='data', bucket=bucket, key_prefix=prefix)
#inputs = 's3://sagemaker-us-east-1-113147044314/sagemaker/DEMO-pytorch-mnist'
print(f'input spec (in this case, just an S3 path): {inputs}')

## Train
### Training script
The `mnist.py` script provides the code we need for training a SageMaker model.
The training script is very similar to a training script you might run outside of SageMaker, but you can access useful properties about the training environment through various environment variables, such as:

* `SM_MODEL_DIR`: A string representing the path to the directory to write model artifacts to.
  These artifacts are uploaded to S3 for model hosting.
* `SM_NUM_GPUS`: The number of gpus available in the current container.
* `SM_CURRENT_HOST`: The name of the current container on the container network.
* `SM_HOSTS`: JSON encoded list containing all the hosts .

Supposing one input channel, 'training', was used in the call to the PyTorch estimator's `fit()` method, the following will be set, following the format `SM_CHANNEL_[channel_name]`:

* `SM_CHANNEL_TRAINING`: A string representing the path to the directory containing data in the 'training' channel.

For more information about training environment variables, please visit [SageMaker Containers](https://github.com/aws/sagemaker-containers).

A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to `model_dir` so that it can be hosted later. Hyperparameters are passed to your script as arguments and can be retrieved with an `argparse.ArgumentParser` instance.

This script uses Horovod framework for distributed training where Horovod-related lines are commented with `Horovod:`. For example, `hvd.broadcast_parameters`, `hvd.DistributedOptimizer` and etc.

For example, the script run by this notebook:

In [None]:
!pygmentize code/mnist.py

### Run training in SageMaker

The `PyTorch` class allows us to run our training function as a training job on SageMaker infrastructure. We need to configure it with our training script, an IAM role, the number of training instances, the training instance type, and hyperparameters. In this case we are going to run our training job on 2 ```ml.p2.xlarge``` instances. But this example can be ran on one or multiple, cpu or gpu instances ([full list of available instances](https://aws.amazon.com/sagemaker/pricing/instance-types/)). The hyperparameters parameter is a dict of values that will be passed to your training script -- you can see how to access these values in the `mnist.py` script above.


In [None]:
train_start_time = datetime.now()

In [None]:
metrics = [
    {
        "Name": "Test Accuracy",
        "Regex": "Test set:.+Accuracy: (\d+(?:\.\d+))%"
    },
    {
        "Name": "Test Loss",
        "Regex": "Test set:.+Average loss: (\d+(?:\.\d+)),.+" 
    },
    {
        "Name": "Epoch",
        "Regex": 'Train Epoch: (\d+)'
    },
    {
        "Name": "Epoch completion",
        "Regex": 'Train Epoch: \d+ \[\d+/\d+ \((\d+%)\)\]'
    },
    {
        "Name": "Train loss",
        "Regex": 'Train Epoch: \d+ \[\d+/\d+ \(\d+%\)\] Loss: (\d+\.\d+)'
    },
]

estimator = PyTorch(entry_point='mnist.py',
                    source_dir='code',
                    role=role,
                    framework_version='1.3.1',
                    train_instance_type='ml.p3.2xlarge',
                    metric_definitions=metrics,
                    train_use_spot_instances=True,
                    train_max_wait=25*60*60,
                    train_instance_count=2,
                    hyperparameters={
                        'epochs': 5,
                        'backend': 'gloo'
                    })

After we've constructed our `PyTorch` object, we can fit it using the data we uploaded to S3. SageMaker makes sure our data is available in the local filesystem, so our training script can simply read the data from disk.


In [None]:
estimator.fit({'training': inputs}, wait=False)
currjob = estimator.jobs[-1]

In [None]:
while currjob.describe()['TrainingJobStatus'] == 'InProgress':
    train_status = currjob.describe()
    currstep = train_status['SecondaryStatus']
    if (currstep == "Training"):
        msg = f"{currjob.name} training (currently training for {train_status['TrainingTimeInSeconds']} seconds)..."
    else:
        msg = f"{currjob.name} training (currently {currstep})..."
    print(msg)
    time.sleep(30)
    
train_result = currjob.describe()
if (train_result['TrainingJobStatus'] == "Completed"):
    jobname = train_result['TrainingJobName']
    jobtime = train_result['TrainingTimeInSeconds']
    jobaccuracy = [x['Value'] for x in train_result['FinalMetricDataList'] if x['MetricName'] == 'Test Accuracy'][0]
    jobloss = [x['Value'] for x in train_result['FinalMetricDataList'] if x['MetricName'] == 'Test Loss'][0]
    print(f"Training job {jobname} finished successfully in {jobtime} seconds!\n"
          f"\tAccuracy: {jobaccuracy:.3f}\n\tLoss: {jobloss:.3f}"
         )
elif (train_result['TrainingJobStatus'] == "Failed"):
    pass

## Hyperparameter Tuning

In [None]:
hpo_start_time = datetime.now()

In [None]:
hyperparameter_ranges = {
    'lr': ContinuousParameter(0.001, 0.1),
    'batch-size': CategoricalParameter([32,64,128])
}
objective_metric_name = 'average test loss'
objective_type = 'Minimize'
metric_definitions = [{'Name': 'average test loss',
                       'Regex': 'Test set: Average loss: ([0-9\\.]+)'}]

In [None]:
tuner = HyperparameterTuner(estimator,
                            objective_metric_name,
                            hyperparameter_ranges,
                            metric_definitions,
                            max_jobs=9,
                            max_parallel_jobs=3,
                            early_stopping_type="Off",
                            objective_type=objective_type)

In [None]:
tuner.fit({'training': inputs}, wait=False)

In [None]:
tuner.wait()

## Host
### Create endpoint
After training, we need to use the `PyTorch` estimator object to create a `PyTorchModel` object and set a different `entry_point`, otherwise, the training script `mnist.py` will be used for inference. (Note that the new `entry_point` must be under the same `source_dir` as `mnist.py`). Then we use the `PyTorchModel` object to deploy a `PyTorchPredictor`. This creates a Sagemaker Endpoint -- a hosted prediction service that we can use to perform inference.

An implementation of `model_fn` is required for inference script. We are going to use default implementations of `input_fn`, `predict_fn`, `output_fn` and `transform_fm` defined in [sagemaker-pytorch-containers](https://github.com/aws/sagemaker-pytorch-containers).

Here's an example of the inference script:

In [None]:
!pygmentize code/inference.py

The arguments to the deploy function allow us to set the number and type of instances that will be used for the Endpoint. These do not need to be the same as the values we used for the training job.  Here we will deploy the model to a single ```ml.p2.xlarge``` instance.

In [None]:
deploy_start_time = datetime.now()

In [None]:
# Create a PyTorchModel object with a different entry_point
model = tuner.best_estimator().create_model(entry_point='inference.py', source_dir='code')

In [None]:
# Deploy the model to an instance
predictor_gpu = model.deploy(initial_instance_count=1, instance_type='ml.p3.2xlarge', wait=True)

### Evaluate
We can now use this predictor to classify hand-written digits. Drawing into the image box loads the pixel data into a `data` variable in this notebook, which we can then pass to the `predictor`.

In [None]:
eval_start_time = datetime.now()

In [None]:
HTML(open("input.html").read())

In [None]:
%%time
image = np.array([data], dtype=np.float32)
response = predictor_gpu.predict(image)
prediction = response.argmax(axis=1)[0]
print(prediction)

#### Predictor Timing

In [None]:
%%time
results = [predictor_gpu.predict(image) for _ in range(100)]

In [None]:
predictor_gpu.delete_endpoint()
time.sleep(5)
predictor_cpu = model.deploy(initial_instance_count=1, instance_type='ml.m5.2xlarge', wait=True)

In [None]:
%%time
results = [predictor_cpu.predict(image) for _ in range(100)]

In [None]:
predictor_cpu.delete_endpoint()

In [None]:
end_time = datetime.now()

In [None]:
total_duration = end_time - start_time
data_duration = train_start_time - data_start_time
train_duration = hpo_start_time - train_start_time
hpo_duration = deploy_start_time - hpo_start_time
deploy_duration = eval_start_time - deploy_start_time

print(f"Total: {total_duration}\nData Load: {data_duration}\nTraining: {train_duration}\nHPO: {hpo_duration}\nDeploy: {deploy_duration}")