# Data Parallelism with SageMaker TensorFlow Container

This tutorial focuses on how to train ResNet50 model on the CIFAR-10 dataset using SageMaker TensorFlow container. It leverages SageMaker's Distributed Data Parallel Library to extends SageMaker’s training capabilities on deep learning models with near-linear scaling efficiency, achieving fast time-to-train with minimal code changes.


### Set Up the Environment

We'll begin with some necessary imports, and get an Amazon SageMaker session to help perform certain tasks, as well as an IAM role with the necessary permissions.

In [1]:
#pip install sagemaker --upgrade matplotlib

In [2]:
import numpy as np
import os
import sagemaker
from sagemaker import get_execution_role

In [3]:
sagemaker_session = sagemaker.Session()
role = get_execution_role()
bucket = sagemaker_session.default_bucket()
prefix = 'sagemaker/DEMO-tf-2-sdp-cifar10'
print('Bucket:\n{}'.format(bucket))

Bucket:
sagemaker-us-east-1-367158743199


In [4]:
sagemaker.__version__

'2.32.0'

# Data

TensorFlow Datasets is a collection of datasets ready to use. We are downloading The CIFAR-10 dataset from there.

The CIFAR-10 consists of 60000 32x32 colour images in 10 classes, with 6000 images per class. There are 50000 training images and 10000 test images.

In [5]:
from tensorflow.keras import datasets

(train_images, train_labels), (test_images, test_labels) = datasets.cifar10.load_data()
print('Data type:\n{}'.format(type(train_images)))
print('Shapes:\n{}\n{}\n{}\n{}'\
      .format(train_images.shape, train_labels.shape, test_images.shape, test_labels.shape))

Data type:
<class 'numpy.ndarray'>
Shapes:
(50000, 32, 32, 3)
(50000, 1)
(10000, 32, 32, 3)
(10000, 1)


The next step is to normalize pixel values to be between 0 and 1:

In [6]:
train_images, test_images = train_images / 255.0, test_images / 255.0

Now that we have normalized the data, we will save it locally./

In [7]:
import numpy as np
import os

data_dir = os.path.join(os.getcwd(), '../data')
os.makedirs(data_dir, exist_ok=True)

train_dir = os.path.join(os.getcwd(), 'data/train')
os.makedirs(train_dir, exist_ok=True)

test_dir = os.path.join(os.getcwd(), 'data/test')
os.makedirs(test_dir, exist_ok=True)

np.save(os.path.join(train_dir, 'train_images.npy'), train_images)
np.save(os.path.join(train_dir, 'train_labels.npy'), train_labels)
np.save(os.path.join(test_dir, 'test_images.npy'), test_images)
np.save(os.path.join(test_dir, 'test_labels.npy'), test_labels)

Now we can save the dataset locally prior to uploading to Amazon S3. 

For Amazon SageMaker hosted training on a cluster separate from the hardware serving this notebook, training data must be stored in Amazon S3, Amazon EFS, or Amazon FSx for Lustre.  We'll upload the data to S3 now.

In [8]:
inputs = sagemaker_session.upload_data(path='data', key_prefix='data/tf-2-sdp-cifar10')
display(inputs)

's3://sagemaker-us-east-1-367158743199/data/tf-2-sdp-cifar10'

s3://sagemaker-us-east-1-367158743199/data/tf-2-sdp-cifar10

# A Primer on Data Parallelism

If you’re training a model on a single GPU, its full internal state is available locally: model parameters, optimizer parameters, gradients (parameter updates computed by backpropagation), and so on. However, things are different when you distribute a training job to a cluster of GPUs.

Using a technique named “data parallelism,” the training set is split in mini-batches that are evenly distributed across GPUs. Thus, each GPU only trains the model on a fraction of the total data set. Obviously, this means that the model state will be slightly different on each GPU, as they will process different batches. In order to ensure training convergence, the model state needs to be regularly updated on all nodes. This can be done synchronously or asynchronously:

* Synchronous training: all GPUs report their gradient updates either to all other GPUs (many-to-many communication), or to a central parameter server that redistributes them (many-to-one, followed by one-to-many). As all updates are applied simultaneously, the model state is in sync on all GPUs, and the next mini-batch can be processed.

* Asynchronous training: gradient updates are sent to all other nodes, or to a central server. However, they are applied immediately, meaning that model state will differ from one GPU to the next.

Unfortunately, these techniques don’t scale very well. As the number of GPUs increases, a parameter server will inevitably become a bottleneck. Even without a parameter server, network congestion soon becomes a problem, as n GPUs need to exchange `n*(n-1)` messages after each iteration, for a total amount of `n*(n-1)*model size` bytes. For example, ResNet-50 is a popular model used in computer vision applications. With its 26 million parameters, each 32-bit gradient update takes about 100 megabytes. With 8 GPUs, each iteration requires sending and receiving 56 updates, for a total of 5.6 gigabytes. Even with a fast network, this will cause some overhead, and slow down training.

Still, as datasets keep growing, the network bottleneck issue often rises again. Enter SageMaker and its new AllReduce algorithm.

# Distributed Training with Sagemaker Data Parallelism 

[Amazon SageMaker](https://aws.amazon.com/sagemaker/) now supports a new data parallelism library that makes it easier to train models on datasets that may be as large as hundreds or thousands of gigabytes.


Amazon SageMaker now helps ML teams reduce distributed training time and cost, thanks to the SageMaker data parallelism library. Available for TensorFlow and PyTorch, the data parallelism library implements a more efficient distribution of computation, optimizes network communication, and fully utilizes our fastest [p3](https://aws.amazon.com/ec2/instance-types/p3/) and [p4](https://aws.amazon.com/ec2/instance-types/p4/) GPU instances.

   
![title](https://docs.aws.amazon.com/sagemaker/latest/dg/images/distributed/data-parallel/sdp-pytorch.png)


In [9]:
import sys
!{sys.executable} -m pip install --upgrade sagemaker



The SageMaker data parallelism API is designed for ease of use. A TensorFlow training job is defined by using the TensorFlow estimator class. It lets you run your training script on SageMaker infrastructure in a containerized environment. For more information on how to instantiate it, checkout this example on setting up a [basic training job](https://sagemaker-examples.readthedocs.io/en/latest/frameworks/tensorflow/get_started_mnist_train.html#TensorFlow-Estimator).

In [12]:
from sagemaker.tensorflow import TensorFlow

resource_config = {'volume_size_in_gb': 1024}
train_instance_type = 'ml.p3.16xlarge'
hyperparameters = {
    'epochs': 2,
    'batch_size': 2048,
    'learning_rate': 0.001,
}
distributions = {
    'smdistributed':{
        'dataparallel':{
            'enabled': True
        }
    }
}

framework_version = '2.3.1'

estimator_dp = TensorFlow(
                        base_job_name='tf2-resnet-dist',
                        source_dir='../src',
                        entry_point='train_resnet_sdp_debug.py',
                        role=role,
                        py_version='py37',
                        framework_version=framework_version,
                        # For training with multinode distributed training, set this count. Example: 2
                        instance_count=2,
                        # For training with p3dn instance use - ml.p3dn.24xlarge
                        instance_type= 'ml.p3.16xlarge',
                        sagemaker_session=sagemaker_session,
                        resource_config=resource_config,
                        hyperparameters=hyperparameters,
                        # Training using SMDataParallel Distributed Training Framework
                        distribution=distributions,
                        debugger_hook_config=False
                        )

In [13]:
# inputs = 's3://sagemaker-us-east-1-367158743199/data/tf-2-sdp-cifar10'
# Set up input channels for training and testing data
remote_inputs = {'train': inputs + '/train'
                 }
estimator_dp.fit(remote_inputs)

2021-03-30 17:20:18 Starting - Starting the training job...
2021-03-30 17:20:24 Starting - Launching requested ML instancesProfilerReport-1617124817: InProgress
.........
2021-03-30 17:22:08 Starting - Preparing the instances for training.........
2021-03-30 17:23:45 Downloading - Downloading input data...
2021-03-30 17:24:12 Training - Downloading the training image...........[34m2021-03-30 17:25:57.303341: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:460] Initializing the SageMaker Profiler.[0m
[34m2021-03-30 17:25:57.307193: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:105] SageMaker Profiler is not enabled. The timeline writer thread will not be started, future recorded events will be dropped.[0m
[34m2021-03-30 17:25:57.486445: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.11.0[0m
[34m2021-03-30 17:25:57.568029: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:460] Initia

In [14]:
model_data = estimator_dp.model_data
print("Storing {} as model_data".format(model_data))

Storing s3://sagemaker-us-east-1-367158743199/tf2-resnet-dist-2021-03-30-17-20-17-230/output/model.tar.gz as model_data


# Deployment

You define the model object by using SageMaker SDK's TensorFlowModel and pass in the model from the estimator and the entry_point. The function loads the model and sets it to use a GPU, if available.

In [15]:
# from sagemaker.tensorflow import TensorFlowModel

# model_data = estimator_dp.model_data
# print("Storing {} as model_data".format(model_data))
# %store model_data

# # create model group - The function loads the model and sets it to use a GPU, if available.
# model = TensorFlowModel(model_data=model_data, role=role, framework_version=framework_version)

# # Deploy the model on an endpoint
# # You can optionally change both the instance count and instance type.

# predictor = model.deploy(initial_instance_count=1,
#                          instance_type="ml.c5.xlarge",
# #                          accelerator_type='ml.eia1.medium'
#                         )

In [None]:
predictor = estimator_dp.deploy(initial_instance_count=1,
                                instance_type="ml.c5.xlarge"
                               )

update_endpoint is a no-op in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


-----------

## Evaluate

Now that we have a Predictor object wrapping a real time Amazon SageMaker hosted enpoint, we can evaluate the model by invoking the endpoint with CIFAR-10 test set. 

we'll define the label names and look at a sample of 5 images.

In [None]:
import matplotlib.pyplot as plt

In [None]:
test_uri = inputs + '/test'

# download test data from s3
os.system(f"aws s3 cp {test_uri} ../data/sample-img --recursive --quiet")

In [None]:
test_images = np.load('../data/sample-img/test_images.npy')
test_labels = np.load('../data/sample-img/test_labels.npy')

cifar_classes = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
print('Example training images and their labels: ' + str([x[0] for x in test_labels[0:5]])) 
print('Corresponding classes for the labels: ' + str([cifar_classes[x[0]] for x in test_labels[0:5]]))

f, axarr = plt.subplots(1, 5)
f.set_size_inches(16, 6)

images = []
for i in range(5):
    img = test_images[i]
    images.append(img/255.0)
    axarr[i].imshow(img)
plt.show()

images = np.array(images, dtype=np.float32)

In [None]:
images = np.array(images, dtype=np.float32)

In [None]:
predictions = predictor.predict(images)['predictions']
# softmax to logit
predictions = np.array(predictions, dtype=np.float32)
predictions = np.argmax(predictions, axis=1)

print("Predictions: ", *[cifar_classes[i] for i in predictions])


# Clean up

If you don't intend to try out inference or to do anything else with the endpoint, you should delete the endpoint.



In [None]:
# predictor.delete_endpoint()