# Distributed Training with Amazon SageMaker and `gluon`

This lab demonstrates how to perform distributed training on multiple hosts using Gluon and Amazon SageMaker. 

A multiple regression model will be trained on a dataset of synthetic examples. A single dense layer will be used, optimised via the `gluon` stochastic gradient descent Trainer. The function to be learned is:

`y = 2*x0 - 3.4*x1 + 4.2`

Where y is in R and x = (x1,x2) is in R2

The synthetic dataset has 10 million examples and a 1% test set is held out.

There are two main steps to distribute this workload on multiple hosts using `gluon`:

1. Choose the version of kvstore to use when creating the Gluon Trainer. For distributed training it is either 'dist_sync', 'dist_device_sync', 'dist_async'. See there refence (https://mxnet.incubator.apache.org/api/python/kvstore/kvstore.html#mxnet.kvstore.create) for details.
2. Specify more than 1 instance when creating a Amazon SageMaker MXNet model.

### Import libraries and set up environment variables

MXNet, boto3, and sagemaker are imported. Fill in the bucket name for a bucket that exists in your account and has access from the role which the Amazon SageMaker notebook instance takes on. Optionally chagne the prefix of the folder in the bucket.

In [1]:
import mxnet as mx
from mxnet import nd
import os
import boto3
from sagemaker.mxnet import MXNet
from sagemaker import get_execution_role

role = get_execution_role()

s3 = boto3.client('s3')

# specify your bucket name here which is accessible from the SageMaker notebook
bucket_name = 'eduthie-sagemaker-1'
# an prefix for the folder name for this lab, include the trailing slash
prefix = 'distributed_training_gluon_lab/'

local_dir = '/tmp'

### Define the function to learn and generate data

The function real_fn is defined and a synthetic dataset of 10 million examples is created. Random noise is added.

In [2]:
num_inputs = 2
num_outputs = 1
num_examples = 1000000

def real_fn(X):
    return 2 * X[:, 0] - 3.4 * X[:, 1] + 4.2

X = nd.random_normal(shape=(num_examples, num_inputs))
noise = 0.01 * nd.random_normal(shape=(num_examples,))
y = real_fn(X) + noise

### Define functions to split and uplaod the dataset to Amazon S3

The data is uploaded to Amazon S3 so it can be used to train in Amazon SageMaker. Full copies of the dataset are uploaded as a single file to 'train/full' to be trained on a single host. The data is sharded into 5 parts and uploaded to 'train' in order to run on 5 hosts. A complete copy of the test 1% is uploaded to 'test'.

Data parallelism is used here. The dataset is split into 5 parts. The 5 separate hosts work on one of these parts. During each epoch, on each host, the part is split up into batches. The data is propogated forwards and backwards through the network to calculate the gradients for each batch. At the end of the batch the gradients are summed over the 5 hosts and broadcast back using the kvstore. 

This is achieved by setting the kvstore parameter of the gluon Trainer to 'dist_device_sync'. If 'dist_async' is used the training on each host does not wait for the gradients on other hosts to complete before proceeding to the bext batch.

In [3]:
# Saves the ndarrays X and y in a dictionary in a file
# and then uploads them to the target_folder in Amazon S3 with a filename i
def save_and_upload(X,y,target_folder,i):
    file_name = '{}'.format(i)
    local_path = os.path.join(local_dir,file_name)
    mx.nd.save(local_path,{'X':X, 'y':y})
    print('Number of examples {}'.format(len(X)))
    print('Created local file {}'.format(local_path))
    upload_filename = '{}{}/{}'.format(prefix,target_folder,file_name)
    print('Uploading to {}'.format(upload_filename))
    s3.upload_file(local_path, bucket_name, upload_filename)

# Splits the ndarrays X and y into k equal shards and saves each to a local file
# each file is uploaded to Amazon S3 in the target_folder named by inded starting from 0
def split_and_upload(X,y,k,target_folder):
    n = len(X)
    assert (n//k)*k == n
    idx = list(range(0, n+1, n//k))
    X_shards = [X[idx[i]:idx[i+1]] for i in range(k)]
    y_shards = [y[idx[i]:idx[i+1]] for i in range(k)]
    
    for Xi,yi,i in zip(X_shards,y_shards,range(k)):
        save_and_upload(Xi,yi,target_folder,i)

The dataset is split into training and test, split into shards, and uploaded to Amazon S3.

In [4]:
train_frac = 0.9
num_splits = 5
split_index = int(num_examples*train_frac)
X_train = X[0:split_index]
X_test = X[split_index:]
y_train = y[0:split_index]
y_test = y[split_index:]
print(len(X_train))
print(len(X_test))

save_and_upload(X_train,y_train,'train/full',0)
split_and_upload(X_train,y_train,num_splits,'train')
save_and_upload(X_test,y_test,'test',0)

900000
100000
Number of examples 900000
Created local file /tmp/0
Uploading to distributed_training_gluon_lab/train/full/0
Number of examples 180000
Created local file /tmp/0
Uploading to distributed_training_gluon_lab/train/0
Number of examples 180000
Created local file /tmp/1
Uploading to distributed_training_gluon_lab/train/1
Number of examples 180000
Created local file /tmp/2
Uploading to distributed_training_gluon_lab/train/2
Number of examples 180000
Created local file /tmp/3
Uploading to distributed_training_gluon_lab/train/3
Number of examples 180000
Created local file /tmp/4
Uploading to distributed_training_gluon_lab/train/4
Number of examples 100000
Created local file /tmp/0
Uploading to distributed_training_gluon_lab/test/0


### Import the python module and test locally

To train a custom MXNet model in Amazon SageMaker, the model code is uploaded in a separate python module. This is specified using the 'entry_point' parameter referencing a local path. That module requires a train() method. See multiple_regession.py for more details.

If any additional code or libraries are referenced in the entry_point module they can also be uploaded to the training instances using the 'source_dir' parameter. All contents of source_dir are copied to the training server recursively, and the training script uses that director as a working directory when training.

In [5]:
from multiple_regression import train

In [None]:
channel_input_dirs = {'train':'./data'}
hyperparameters = {'batch_size':64, 'epochs':10, 'learning_rate':0.1}
train(hyperparameters=hyperparameters,channel_input_dirs=channel_input_dirs,num_gpus=1,hosts=['alg-1'],
      current_host='alg-1')

Train file path ./data/0
Number of examples 100
kvstore device


### Train on Amazon SageMaker

Two SageMaker estimators are created using the higher level Python API MXNet class. In this case two estimators are created, specifying the train_instane_count and train_instance_type to select the number of hosts in a cluster and the type of Amazon ec2 instance used:

1. A distributed version running on 5 hosts with a ml.p3.2xlarge instance.
2. A single-host version on the same instance type.

The role used is passed, which is a string of the arn of the role used by Amazon SageMaker.

The hyperparameters are input as a dictionary. In this case a batch_size of 64 is used over 10 epochs, with a very small learning_rate. These are not optimal for this task, see what you can do to improve the convergence time.

Finally the fit() method is called on the estimator with the training and test data as parameters. wait is set to False so fit returns immediately after the job is created. 

After running the cell below, go to the 'Training Jobs' section of the Amazon SageMaker console to monitor the progress of the jobs.

l_estimator_1 on 1 machine takes approcimately 45 minutes to complete. Feel free to cancel either of these jobs before they finish using the console.

l_estimator_5 completes in about 15 minutes on 5 machines. There is some communication overhead in distributed mode but still a substantial speedup.

#### Questions

1. How much overhead is there when running a distributed job on SageMaker for spinning up the machines and for syncing the parameters?
2. What else can be done to speed up training?
3. Why does the first epoch take twice as long?

In [7]:
l_estimator_5 = MXNet(entry_point='multiple_regression.py',
    role=role,
    train_instance_count=num_splits, 
    train_instance_type='ml.p3.2xlarge',
    hyperparameters={'batch_size':64, 'epochs':10, 'learning_rate':0.0000001, 'sync':True})

l_estimator_1 = MXNet(entry_point='multiple_regression.py',
    role=role,
    train_instance_count=1, 
    train_instance_type='ml.p3.2xlarge',
    hyperparameters={'batch_size':64, 'epochs':10, 'learning_rate':0.0000001})

train_data_location = 's3://{}/{}train'.format(bucket_name,prefix)
test_data_location = 's3://{}/{}test'.format(bucket_name,prefix)

l_estimator_1.fit({'train': 's3://{}/{}train/full'.format(bucket_name,prefix), 
                       'test': 's3://{}/{}test'.format(bucket_name,prefix)},wait=False)
l_estimator_5.fit({'train': 's3://{}/{}train'.format(bucket_name,prefix), 
    'test': 's3://{}/{}test'.format(bucket_name,prefix)},wait=False)

INFO:sagemaker:Creating training-job with name: sagemaker-mxnet-2018-07-13-09-24-59-913
INFO:sagemaker:Creating training-job with name: sagemaker-mxnet-2018-07-13-09-25-01-355
