## Distributed Training with Chainer and ChainerMN

Chainer can train in two modes: single-machine, and distributed. Unlike the single-machine notebook example that trains an image classification model on the CIFAR-10 dataset, we will write a Chainer script that uses `chainermn` to distribute training to multiple instances.

[VGG](https://arxiv.org/pdf/1409.1556v6.pdf) is an architecture for deep convolution networks. In this example, we train a convolutional network to perform image classification using the CIFAR-10 dataset on multiple instances. CIFAR-10 consists of 60000 32x32 colour images in 10 classes, with 6000 images per class. There are 50000 training images and 10000 test images. We'll train a model on SageMaker, deploy it to Amazon SageMaker, and then classify images using the deployed model.

The Chainer script runs inside of a Docker container running on SageMaker. For more on the Chainer container, please visit the sagemaker-chainer-containers repository and the sagemaker-python-sdk repository:

* https://github.com/aws/sagemaker-chainer-containers
* https://github.com/aws/sagemaker-python-sdk

In [None]:
# Setup
from sagemaker import get_execution_role
import sagemaker

sagemaker_session = sagemaker.Session()

# This role retrieves the SageMaker-compatible role used by this Notebook Instance.
role = get_execution_role()
help(sagemaker_session.upload_data)

## Downloading training and test data

We use helper functions given by `chainer` to download and preprocess the CIFAR10 data. 

In [None]:
import chainer

from chainer.datasets import get_cifar10

train, test = get_cifar10()

## Uploading the data

We save the preprocessed data to the local filesystem, and then use the `sagemaker.Session.upload_data` function to upload our datasets to an S3 location. The return value `inputs` identifies the S3 location, which we will use when we start the Training Job.

In [None]:
import os
import shutil

import numpy as np

train_data = [element[0] for element in train]
train_labels = [element[1] for element in train]

test_data = [element[0] for element in test]
test_labels = [element[1] for element in test]


try:
    os.makedirs('/tmp/data/train')
    os.makedirs('/tmp/data/test')
    np.savez('/tmp/data/train/train.npz', data=train_data, labels=train_labels)
    np.savez('/tmp/data/test/test.npz', data=test_data, labels=test_labels)
    train_input = sagemaker_session.upload_data(path=os.path.join('/tmp', 'data', 'train'),
                                                            key_prefix='notebook/chainer_cifar/train')
    test_input = sagemaker_session.upload_data(path=os.path.join('/tmp', 'data', 'test'),
                                                           key_prefix='notebook/chainer_cifar/test')
finally:
    shutil.rmtree('/tmp/data')

## Writing the Chainer training script to run on Amazon SageMaker

We need to provide a training script that can run on the SageMaker platform. The training scripts are essentially the same as one you would write for local training, except that you need to provide a function `train` that returns a trained model.

Since we will use the same script to host the Chainer model, the script also needs a function `model_fn` that loads the model -- by default, Chainer models are saved to disk as `model.npz`. When SageMaker calls your `train` and `model_fn` functions, it will pass in arguments that describe the training environment.

While the `train` and `model_fn` functions are required, the Chainer container provides default implementations for a few other functions. The function hooks recognized by the container are listed below, with required functions in bold:

### Training

* **`train`**: This function is passed arguments read from the Training Job's environment and returns a trained model. The return value of `train` is saved and uploaded to S3 as a model artifact by `save`.

  `train` can accept the following arguments by name:
  * `hyperparameters (dict)`: The hyperparameters map passed from the SageMaker Python SDK.
  * `channel_input_dirs (dict of str: str)`: A map of input channel names (like 'train' and 'test') to filesystem paths to data in those input channels. 
  * `output_data_dir (str)`: The filesystem path to write output artifacts to. Output artifacts may include checkpoints, graphs, and other files you might like to save, not including model artifacts. These artifacts are uploaded to S3 along with your model artifacts.
  * `num_gpus (int): ` The number of GPUs available to the host.
  * `num_cpus (int): `: The number of CPUs available to the host.
  * `hosts (list of str)`: The list of hostnames for all Training Job instances.
  * `current_host (str)`: The hostname of the current host.
  
  For more on the arguments to `train` and others, please visit https://github.com/aws/sagemaker-containers.
  
  
* `save(model, model_dir)`: Writes the return value from `train` (`model`) to `model_dir`. These model artifacts are uploaded to S3 so that they can be hosted behind a SageMaker Endpoint.

  The default implementation saves the `model` as a file named `model.npz` file by invoking `chainer.serializers.save_npz`

### Hosting and Inference

* **`model_fn(model_dir)`**: This function is invoked to load model artifacts from those written into `model_dir` by `save`.input_data
* `input_fn(input_data, content_type)`: This function is invoked to deserialize prediction data when a prediction request is made. The return value is passed to predict_fn. `input_fn` accepts two arguments: `input_data`, which is the serialized input data in the body of the prediction request, and `content_type`, the MIME type of the data.
  
  The default implementation deserializes [npy-formatted](https://docs.scipy.org/doc/numpy-1.14.0/neps/npy-format.html) data into a NumPy array with content type 'application/x-npy', but the default handler can also handle CSV data with content type 'text/csv' and JSON data with content type 'application/json'
  
  `input_fn` accepts the following arguments:
  
  * `input_data`: serialized input data in the body of the prediction request.
  * `content_type`: MIME type of the data. By default, the Chainer predictor sends prediction requests with content type 'application/x-npy'.
  
  
* `predict_fn(input_data, model)`: This function accepts the return value of `input_fn` (as `input_data`) and the return value of `model_fn`, `model`, and returns inferences obtained from the model.

  The default implementation calls `model(input_data)` and returns the result as a NumPy array.
  
  
* `output_fn(prediction, accept)`: This function is invoked to serialize the return value from `predict_fn`, passed in via `prediction`, back to the SageMaker client in response to prediction requests

  The default implementation serializes NumPy arrays returned by `predict_fn`, which the SageMaker Python SDK can deserialize back into a NumPy array, but the default handler can also respond with JSON or CSV, depending on the `accept` MIME type given in the prediction request.

Check the script below, which uses `chainer` to train on any number of GPUs on a single machine, to see how this works. This script implements `train`, and `model_fn`, but relies on the default `save`, `input_fn`, `predict_fn`, `output_fn`.

For more on implementing these functions, see the documentation at https://github.com/aws/sagemaker-python-sdk.

For more on the functions provided by the Chainer container, see https://github.com/aws/sagemaker-chainer-containers

The script below uses a `chainermn` Communicator to distribute training to multiple nodes. The Communicator depends on MPI (Message Passing Interface), so the Chainer container running on SageMaker runs this script with `mpirun` if the `Chainer` Estimator specifies a `train_instance_count` of two or greater, or if `use_mpi` in the `Chainer` estimator is `true`.

By default, one process is created per GPU (on GPU instances), or one per host (on CPU instances, which are not recommended for this notebook).

See https://github.com/chainer/chainermn for more on running Chainer on multiple nodes.  

In [None]:
!cat 'code/chainer_cifar_vgg_distributed.py'

## Running the training script on SageMaker

To train with a Chainer script, we construct a ```Chainer``` estimator using the [sagemaker-python-sdk](https://github.com/aws/sagemaker-python-sdk). We pass in an `entry_point`, the name of a script that contains a couple of functions with certain signatures (`train` and `model_fn`), and a `source_dir`, a directory containing all code to run inside the Chainer container. This script will be run on SageMaker in a container that invokes these functions to train and load Chainer models. 

The ```Chainer``` class allows us to run our training function as a training job on SageMaker infrastructure. We need to configure it with our training script, an IAM role, the number of training instances, and the training instance type. In this case we will run our training job on two `ml.p3.2xlarge` instances.

This script uses the `chainermn` package, which distributes training with MPI. Your script is run with `mpirun`, so a ChainerMN Communicator object can be used to distribute training. Arguments to `mpirun` are set to sensible defaults, but you can configure how your script is run in distributed mode. See the ```Chainer``` class documentation for more on configuring MPI.

In [None]:
from sagemaker.chainer.estimator import Chainer

chainer_estimator = Chainer(entry_point='chainer_cifar_vgg_distributed.py', source_dir="code", role=role,
                            use_mpi=True, sagemaker_session=sagemaker_session,
                            train_instance_count=2, train_instance_type='ml.p3.2xlarge',
                            hyperparameters={'epochs': 30, 'batch_size': 256})

chainer_estimator.fit({'train': train_input, 'test': test_input})

Our Chainer script writes various artifacts, such as plots, to a directory `output_data_dir`, the contents of which which SageMaker uploads to S3. Now we download and extract these artifacts.

In [None]:
from s3_util import retrieve_output_from_s3

chainer_training_job = chainer_estimator.latest_training_job.name

desc = sagemaker_session.sagemaker_client.describe_training_job(TrainingJobName=chainer_training_job)
output_data = desc['ModelArtifacts']['S3ModelArtifacts'].replace('model.tar.gz', 'output.tar.gz')

retrieve_output_from_s3(output_data, 'output/distributed_cifar')

These plots show the accuracy and loss over epochs:

In [None]:
from IPython.display import Image
from IPython.display import display

accuracy_graph = Image(filename = "output/distributed_cifar/algo-1/accuracy.png", width=800, height=800)
loss_graph = Image(filename = "output/distributed_cifar/algo-1/loss.png", width=800, height=800)

display(accuracy_graph, loss_graph)

## Deploying the Trained Model

After training, we use the Chainer estimator object to create and deploy a hosted prediction endpoint. We can use a CPU-based instance for inference (in this case an `ml.m4.xlarge`), even though we trained on GPU instances.

The predictor object returned by `deploy` lets us call the new endpoint and perform inference on our sample images. 

In [None]:
predictor = chainer_estimator.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge')

### CIFAR10 sample images

We'll use these CIFAR10 sample images to test the service:

<img style="display: inline; height: 32px; margin: 0.25em" src="images/airplane1.png" />
<img style="display: inline; height: 32px; margin: 0.25em" src="images/automobile1.png" />
<img style="display: inline; height: 32px; margin: 0.25em" src="images/bird1.png" />
<img style="display: inline; height: 32px; margin: 0.25em" src="images/cat1.png" />
<img style="display: inline; height: 32px; margin: 0.25em" src="images/deer1.png" />
<img style="display: inline; height: 32px; margin: 0.25em" src="images/dog1.png" />
<img style="display: inline; height: 32px; margin: 0.25em" src="images/frog1.png" />
<img style="display: inline; height: 32px; margin: 0.25em" src="images/horse1.png" />
<img style="display: inline; height: 32px; margin: 0.25em" src="images/ship1.png" />
<img style="display: inline; height: 32px; margin: 0.25em" src="images/truck1.png" />



## Predicting using SageMaker Endpoint

We batch the images together into a single NumPy array to obtain multiple inferences with a single prediction request.

In [None]:
from skimage import io
import numpy as np

def read_image(filename):
    img = io.imread(filename)
    img = np.array(img).transpose(2, 0, 1)
    img = np.expand_dims(img, axis=0)
    img = img.astype(np.float32)
    img *= 1. / 255.
    img = img.reshape(3, 32, 32)
    return img


def read_images(filenames):
    return np.array([read_image(f) for f in filenames])

filenames = ['images/airplane1.png',
             'images/automobile1.png',
             'images/bird1.png',
             'images/cat1.png',
             'images/deer1.png',
             'images/dog1.png',
             'images/frog1.png',
             'images/horse1.png',
             'images/ship1.png',
             'images/truck1.png']

image_data = read_images(filenames)

The predictor runs inference on our input data and returns a list of predictions whose argmax gives the predicted label of the input data. 

In [None]:
response = predictor.predict(image_data)

for i, prediction in enumerate(response):
    print('image {}: prediction: {}'.format(i, prediction.argmax(axis=0)))

## Cleanup

After you have finished with this example, remember to delete the prediction endpoint to release the instance(s) associated with it.

In [None]:
chainer_estimator.delete_endpoint()