#  TensorFlow Distributed Training & Distributed Inference 

For use cases involving large datasets, particularly those where the data is images, it often is necessary to perform distributed training on a cluster of multiple machines. Similarly, when it is time to set up an inference workflow, it also may be necessary to perform highly performant batch inference using a cluster.  In this notebook, we'll examine distributed training and distributed inference with TensorFlow in Amazon SageMaker. 

The model used for this notebook is a basic Convolutional Neural Network (CNN) based on [the Keras examples](https://github.com/keras-team/keras/blob/master/examples/cifar10_cnn.py).  We'll train the CNN to classify images using the [CIFAR-10 dataset](https://www.cs.toronto.edu/~kriz/cifar.html), a well-known computer vision dataset. It consists of 60,000 32x32 images belonging to 10 different classes (6,000 images per class). Here is a graphic of the classes in the dataset, as well as 10 random images from each:

![cifar10](https://maet3608.github.io/nuts-ml/_images/cifar10.png)


## Setup 

We'll begin with some necessary imports, and get an Amazon SageMaker session to help perform certain tasks, as well as an IAM role with the necessary permissions.

In [None]:
%matplotlib inline
import numpy as np
import os
import sagemaker
from sagemaker import get_execution_role

os.system("aws s3 cp s3://sagemaker-workshop-pdx/cifar-10-module . --recursive")

sagemaker_session = sagemaker.Session()
role = get_execution_role()

bucket = sagemaker_session.default_bucket()
prefix = 'sagemaker/DEMO-tf-horovod-inference'
print('Bucket:\n{}'.format(bucket))

Now we'll run a script that fetches the dataset and converts it to the TFRecord format, which provides several conveniences for training models in TensorFlow.

In [None]:
!python generate_cifar10_tfrecords.py --data-dir ./data

For Amazon SageMaker hosted training on a cluster separate from this notebook instance, training data must be stored in Amazon S3, so we'll upload the data to S3 now.

In [None]:
inputs = sagemaker_session.upload_data(path='data', key_prefix='data/DEMO-cifar10-tf')
display(inputs)

Finally, we'll perform some setup that will be common to all of the training jobs in this notebook.  During training, it will helpful to review metric graphs about the training job such as validation accuracy.  If we provide a set of metric definitions, Amazon SageMaker will be able to get the metrics directly from the training job logs, and send them to CloudWatch Metrics.

In [None]:
training_metric_definitions = [
    {'Name': 'train:loss', 'Regex': '.*loss: ([0-9\\.]+) - acc: [0-9\\.]+.*'},
    {'Name': 'train:accuracy', 'Regex': '.*loss: [0-9\\.]+ - acc: ([0-9\\.]+).*'},
    {'Name': 'validation:accuracy', 'Regex': '.*step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: [0-9\\.]+ - val_acc: ([0-9\\.]+).*'},
    {'Name': 'validation:loss', 'Regex': '.*step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: ([0-9\\.]+) - val_acc: [0-9\\.]+.*'},
    {'Name': 'sec/steps', 'Regex': '.* - \d+s (\d+)[mu]s/step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: [0-9\\.]+ - val_acc: [0-9\\.]+'}
]

## (Optional)  Hosted Training on a single machine

Amazon SageMaker provides a variety of model training options:  besides training on a single machine, training can be done on a cluster of multiple machines using either parameter servers or Ring-AllReduce with Horovod.  We'll begin by training a model on a single machine.  This will be followed by distributed training on multiple machines to allow comparison; however if you prefer you may skip ahead to the **Distributed training** section of this notebook.

Initially we'll set up an Amazon SageMaker TensorFlow Estimator object with the details of the training job, such as type and number of instances, hyperparameters, etc.

In [None]:
from sagemaker.tensorflow import TensorFlow

single_machine_instance_type = 'ml.p3.2xlarge'
hyperparameters = {'epochs': 60, 'batch-size' : 256}

estimator_single = TensorFlow(base_job_name='cifar10-tf',
                       entry_point='train.py',
                       role=role,
                       framework_version='1.12.0',
                       py_version='py3',
                       hyperparameters=hyperparameters,
                       train_instance_count=1, 
                       train_instance_type=single_machine_instance_type,
                       tags = [{'Key' : 'Project', 'Value' : 'cifar10'},{'Key' : 'TensorBoard', 'Value' : 'file'}],
                       metric_definitions=training_metric_definitions)

Now we can call the `fit` method of the Estimator object to start training.  During training, you can view the metrics we set up above by going to the Amazon SageMaker console, clicking the **Training jobs** link in the left panel, clicking the job name, then scrolling down to the **Monitor** section to view the metric graphs.

In [None]:
remote_inputs = {'train' : inputs+'/train', 'validation' : inputs+'/validation', 'eval' : inputs+'/eval'}
estimator_single.fit(remote_inputs, wait=True)

Sometimes it makes sense to perform training on a single machine.  For large datasets, however, it may be necessary to perform distributed training on a cluster of multiple machines.  In fact, it may be not only faster but cheaper to do distributed training on several machines rather than one machine.  Fortunately, Amazon SageMaker makes it easy to run distributed training without having to manage cluster setup and tear down. 

## Distributed training with Horovod

Horovod is an open source distributed training framework for TensorFlow, Keras, PyTorch, and MXNet. It is an alternative to the more "traditional" parameter server method of performing distributed training.  In Amazon SageMaker, Horovod is only available with TensorFlow version 1.12 or newer. Only a few lines of code are necessary to use Horovod for distributed training of a Keras model defined by the tf.keras API.  For details, see the `train.py` script included with this notebook; the changes primarily relate to:

- importing Horovod.
- initializing Horovod.
- configuring GPU options and setting a Keras/tf.session with those options.

Once we have a training script, the next step is to set up an Amazon SageMaker TensorFlow Estimator object with the details of the training job.  It is very similar to an Estimator for training on a single machine, except we specify a `distributions` parameter describing Horovod attributes such as the number of process per host, which is set here to the number of GPUs per machine.  Beyond these few simple parameters and the few lines of code in the training script, there is nothing else you need to do to use distributed training with Horovod; Amazon SageMaker handles the heavy lifting for you and manages the underlying cluster setup.

In [None]:
from sagemaker.tensorflow import TensorFlow

hvd_instance_type = 'ml.p3.8xlarge'
hvd_processes_per_host = 4
hvd_instance_count = 2

distributions = {'mpi': {
                    'enabled': True,
                    'processes_per_host': hvd_processes_per_host
                        }
                }
hyperparameters = {'epochs': 60, 'batch-size' : 256}

estimator_dist = TensorFlow(base_job_name='dist-cifar10-tf',
                       entry_point='train.py', 
                       role=role,
                       framework_version='1.12.0',
                       py_version='py3',
                       hyperparameters=hyperparameters,
                       train_instance_count=hvd_instance_count, 
                       train_instance_type=hvd_instance_type,
                       tags = [{'Key' : 'Project', 'Value' : 'cifar10'},{'Key' : 'TensorBoard', 'Value' : 'dist'}],
                       metric_definitions=training_metric_definitions,
                       distributions=distributions)

In [None]:
remote_inputs = {'train' : inputs+'/train', 'validation' : inputs+'/validation', 'eval' : inputs+'/eval'}
estimator_dist.fit(remote_inputs, wait=True)

## Model Deployment with Amazon Elastic Inference

Amazon SageMaker provides both real time inference and batch inference.  Although we will focus on batch inference below, let's start with a quick overview of setting up an Amazon SageMaker hosted endpoint for real time inference with TensorFlow Serving and image data.  The processes for setting up hosted endpoints and Batch Transform jobs have significant differences.  Additionally, we will discuss why and how to use Amazon Elastic Inference with the hosted endpoint.

### Deploying the Model

When considering the overall cost of a machine learning workload, inference often is the largest part, up to 90% of the total.  If a GPU instance type is used for real time inference, it typically is not fully utilized because, unlike training, real time inference does not involve continuously inputting large batches of data to the model.  Elastic Inference provides GPU acceleration suited for inference, allowing you to add inference acceleration to a hosted endpoint for a fraction of the cost of using a full GPU instance.

The `deploy` method of the Estimator object creates an endpoint which serves prediction requests in near real time.  To utilize Elastic Inference with the SageMaker TensorFlow Serving container, simply provide an `accelerator_type` parameter, which determines the type of accelerator that is attached to your endpoint. Refer to the **Inference Acceleration** section of the [instance types chart](https://aws.amazon.com/sagemaker/pricing/instance-types) for a listing of the supported types of accelerators. 

Here we'll use a general purpose CPU compute instance type along with an Elastic Inference accelerator:  together they are much cheaper than the smallest P3 GPU instance type.

In [None]:
predictor = estimator_dist.deploy(initial_instance_count=1,
                                  instance_type='ml.m5.xlarge',
                                  accelerator_type='ml.eia1.medium')

###  Real time inference
  
Now that we have a Predictor object wrapping a real time Amazon SageMaker hosted enpoint, we'll define the label names and look at a sample of 10 images, one from each class.

In [None]:
from IPython.display import Image, display

labels = ['airplane','automobile','bird','cat','deer','dog','frog','horse','ship','truck']
images = []
for entry in os.scandir('sample-img'):
    if entry.is_file() and entry.name.endswith("png"):
        images.append('sample-img/' + entry.name)

for image in images:
    display(Image(image))

Next we'll set up the Predictor object created by the `deploy` method call above.  The TensorFlow Serving container in Amazon SageMaker uses the REST API, which requires requests in a specific JSON format.  

In [None]:
import PIL
from sagemaker.predictor import json_serializer, json_deserializer

# TensorFlow Serving's request and response format is JSON 
predictor.accept = 'application/json'
predictor.content_type = 'application/json'
predictor.serializer = json_serializer
predictor.deserializer = json_deserializer

def get_prediction(file_path):
    
    image = PIL.Image.open(file_path)
    to_numpy_list = np.asarray(image).astype(float)
    instance = np.expand_dims(to_numpy_list, axis=0)
    data = {'instances': instance.tolist()}
    
    return labels[np.argmax(predictor.predict(data)['predictions'], axis=1)[0]]

In [None]:
predictions = [get_prediction(image) for image in images]
print(predictions)

##  Batch Transform with Inference Pipelines

If a use case does not require individual predictions in near real-time, an Amazon SageMaker Batch Transform job is likely a better alternative. Although hosted endpoints also can be used for pseudo-batch prediction, the process is more involved than using the alternative Batch Transform, which is designed for large-scale, asynchronous batch inference.

A typical problem in working with batch inference is how to convert data into tensors that can be input to the model.  For example, image data in .png or .jpg format cannot be directly input to a model, but rather must be converted first.  Batch Transform provides facilities for doing this efficiently.  

### Build a container for transforming image input

As mentioned above, the TensorFlow Serving container in Amazon SageMaker uses the REST API to serve prediction requests. This requires the input data to be converted to JSON format.  One way to do this is to create a container to do the conversion, then create an overall Amazon SageMaker model that links the conversion container to the TensorFlow Serving container with the model.

In the next step, we'll create a container to transform payloads in .png image format into JSON objects that can be forwarded to the TensorFlow Serving container. To do this, we've created a simple Python Flask app that does the transformation, the code for this container is available in the `./image-transformer-container/` directory. First, we'll build a Docker image for the container:

In [None]:
!docker build -t image-transformer ./image-transformer-container/

### Push container to ECR

Next, we'll push the Docker image to an ECR repository in your account. In order to push the container to ECR, the execution role attached to this notebook should have permissions to create a repository, set a repository policy, and upload a Docker image.

In [None]:
import boto3

account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name

ecr_repository = 'image-transformer'
tag = ':latest'
transformer_repository_uri = '{}.dkr.ecr.{}.amazonaws.com/{}'.format(account_id, region, ecr_repository + tag)

# docker login
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)
# create ecr repository
!aws ecr create-repository --repository-name $ecr_repository
# attach policy allowing sagemaker to pull this image
!aws ecr set-repository-policy --repository-name $ecr_repository --policy-text "$( cat ./image-transformer-container/ecr_policy.json )"

!docker tag {ecr_repository + tag} $transformer_repository_uri
!docker push $transformer_repository_uri

### Create a Model with an Inference Pipeline

Now that we have two separate containers for transforming the data and serving predictions, we'll create an Amazon SageMaker Model with the two containers chained together (image transformer -> TensorFlow Serving).  The Model conveniently packages together the functionality required for an Inference Pipeline.

In [None]:
from sagemaker.tensorflow.serving import Model
from time import gmtime, strftime

client = boto3.client('sagemaker')

model_name = "image-to-tfserving-{}".format(strftime("%d-%H-%M-%S", gmtime()))

transform_container = {
    "Image": transformer_repository_uri
}

estimator = estimator_dist
tf_serving_model = Model(model_data=estimator.model_data,
                         role=sagemaker.get_execution_role(),
                         image=estimator.image_name,
                         framework_version=estimator.framework_version,
                         sagemaker_session=estimator.sagemaker_session)

batch_instance_type = 'ml.p3.8xlarge'
tf_serving_container = tf_serving_model.prepare_container_def(batch_instance_type)

model_params = {
    "ModelName": model_name,
    "Containers": [
        transform_container,
        tf_serving_container
    ],
    "ExecutionRoleArn": sagemaker.get_execution_role()
}

client.create_model(**model_params)

### Run a Batch Transform job

Next, we'll run a Batch Transform job using our inference pipeline model. 

In [None]:
input_data_path = 's3://sagemaker-workshop-pdx/cifar-10-png' 
output_data_path = 's3://{}/{}/{}'.format(bucket, prefix, 'batch-predictions')
batch_instance_type = 'ml.p3.2xlarge'
batch_instance_count = 2

transformer = sagemaker.transformer.Transformer(
    model_name = model_name,
    instance_count = batch_instance_count,
    instance_type = batch_instance_type,
    strategy = 'MultiRecord',
    output_path = output_data_path,
    assemble_with= 'Line',
    base_transform_job_name='cifar-10-image-transform',
    sagemaker_session=sagemaker_session,
)

transformer.transform(data = input_data_path, content_type = 'application/x-image')
transformer.wait()

### Inspect Batch Transform output

Finally, we can inspect the output files of our Batch Transform job to see the predictions.  First we'll download the prediction files locally, then extract the predictions from them.

In [None]:
!aws s3 cp --quiet --recursive $transformer.output_path ./batch_predictions

In [None]:
import json
import re

total = 0
correct = 0
predicted = []
actual = []

for entry in os.scandir('batch_predictions'):
    try:
        if entry.is_file() and entry.name.endswith("out"):
            with open(entry, 'r') as f:
                jstr = json.load(f)
                results = [float('%.3f'%(item)) for sublist in jstr['predictions'] for item in sublist]
                class_index = np.argmax(np.array(results))
                predicted_label = labels[class_index]
                predicted.append(predicted_label)
                actual_label = re.search('([a-zA-Z]+).png.out', entry.name).group(1)
                actual.append(actual_label)
                is_correct = (predicted_label in entry.name) or False
                if is_correct:
                    correct += 1
                total += 1
    except Exception as e:
        print(e)
        continue

Let's calculate the accuracy of the predictions.  

In [None]:
print('Out of {} total images, accurate predictions were returned for {}'.format(total, correct))
accuracy = correct / total
print('Accuracy is {:.1%}'.format(accuracy))

The accuracy from the batch transform job on 10000 test images never seen during training is fairly close to the accuracy achieved during training on the validation set.  This is an indication that the model is not overfitting and should generalize fairly well to other unseen data. 

Next we'll plot a confusion matrix, which is a tool for visualizing the performance of a multiclass model. It has entries for all possible combinations of correct and incorrect predictions, and shows how often each one was made by our model. Ours will be row-normalized: each row sums to one, so that entries along the diagonal correspond to recall. 

In [None]:
import pandas as pd
import seaborn as sns

confusion_matrix = pd.crosstab(pd.Series(actual), pd.Series(predicted), rownames=['Actuals'], colnames=['Predictions'], normalize='index')
sns.heatmap(confusion_matrix, annot=True, fmt='.2f', cmap="YlGnBu").set_title('Confusion Matrix')  

If our model had 100% accuracy, and therefore 100% recall in every class, then all of the predictions would fall along the diagonal of the confusion matrix.  Here our model definitely is not 100% accurate, but manages to achieve good recall for most of the classes, though it performs worse for some classes, such as cats.  

# Extensions

Although we did not demonstrate them in this notebook, Amazon SageMaker provides additional ways to make distributed training more efficient for very large datasets:
- **VPC training**:  performing Horovod training inside a VPC improves the network latency between nodes, leading to higher performance and stability of Horovod training jobs.
- **Pipe Mode**:  using [Pipe Mode](https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo.html#your-algorithms-training-algo-running-container-inputdataconfig) reduces startup and training times.  Pipe Mode streams training data from S3 as a Linux FIFO directly to the algorithm, without saving to disk.  To use Pipe Mode, set `input_mode='Pipe'` for the Estimator object, and enable Pipe Mode within the training script by importing PipeModeDataset from the SageMaker Python SDK and setting `dataset = PipeModeDataset(channel=channel_name, record_format='TFRecord')` (see the train.py script for this example).  For a small dataset such as CIFAR-10, Pipe Mode does not provide any advantage, but for very large datasets Pipe Mode can substantially reduce startup and training times.

# Cleanup

To avoid incurring charges due to a stray endpoint, delete the Amazon SageMaker endpoint if you no longer need it:

In [None]:
sagemaker_session.delete_endpoint(predictor.endpoint)