# MNIST distributed training  

The **SageMaker Python SDK** helps you deploy your models for training and hosting in optimized, productions ready containers in SageMaker. The SageMaker Python SDK is easy to use, modular, extensible and compatible with TensorFlow and MXNet. This tutorial focuses on how to create a convolutional neural network model to train the [MNIST dataset](http://yann.lecun.com/exdb/mnist/) using **TensorFlow distributed training**.



### Set up the environment

In [1]:
import os
import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

role = get_execution_role()

### Download the MNIST dataset

In [2]:
import utils
from tensorflow.contrib.learn.python.learn.datasets import mnist
import tensorflow as tf

data_sets = mnist.read_data_sets('data', dtype=tf.uint8, reshape=False, validation_size=5000)

utils.convert_to(data_sets.train, 'train', 'data')
utils.convert_to(data_sets.validation, 'validation', 'data')
utils.convert_to(data_sets.test, 'test', 'data')

Successfully downloaded train-images-idx3-ubyte.gz 9912422 bytes.
Extracting data/train-images-idx3-ubyte.gz
Successfully downloaded train-labels-idx1-ubyte.gz 28881 bytes.
Extracting data/train-labels-idx1-ubyte.gz
Successfully downloaded t10k-images-idx3-ubyte.gz 1648877 bytes.
Extracting data/t10k-images-idx3-ubyte.gz
Successfully downloaded t10k-labels-idx1-ubyte.gz 4542 bytes.
Extracting data/t10k-labels-idx1-ubyte.gz
('Writing', 'data/train.tfrecords')
('Writing', 'data/validation.tfrecords')
('Writing', 'data/test.tfrecords')


### Upload the data
We use the ```sagemaker.Session.upload_data``` function to upload our datasets to an S3 location. The return value inputs identifies the location -- we will use this later when we start the training job.

In [3]:
inputs = sagemaker_session.upload_data(path='data', key_prefix='data/mnist')

INFO:sagemaker:Created S3 bucket: sagemaker-us-east-1-152015771822


# Construct a script for distributed training 

## 1. Define neural network 

In [5]:
import os
import tensorflow as tf
from tensorflow.python.estimator.model_fn import ModeKeys as Modes

INPUT_TENSOR_NAME = 'inputs'
SIGNATURE_NAME = 'predictions'

LEARNING_RATE = 0.001


def model_fn(features, labels, mode, params):
    # Input Layer
    input_layer = tf.reshape(features[INPUT_TENSOR_NAME], [-1, 28, 28, 1])

    # Convolutional Layer #1
    conv1 = tf.layers.conv2d(
        inputs=input_layer,
        filters=32,
        kernel_size=[5, 5],
        padding='same',
        activation=tf.nn.relu)

    # Pooling Layer #1
    pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2)

    # Convolutional Layer #2 and Pooling Layer #2
    conv2 = tf.layers.conv2d(
        inputs=pool1,
        filters=64,
        kernel_size=[5, 5],
        padding='same',
        activation=tf.nn.relu)
    pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2)

    # Dense Layer
    pool2_flat = tf.reshape(pool2, [-1, 7 * 7 * 64])
    dense = tf.layers.dense(inputs=pool2_flat, units=1024, activation=tf.nn.relu)
    dropout = tf.layers.dropout(
        inputs=dense, rate=0.4, training=(mode == Modes.TRAIN))

    # Logits Layer
    logits = tf.layers.dense(inputs=dropout, units=10)

    # Define operations
    if mode in (Modes.PREDICT, Modes.EVAL):
        predicted_indices = tf.argmax(input=logits, axis=1)
        probabilities = tf.nn.softmax(logits, name='softmax_tensor')

    if mode in (Modes.TRAIN, Modes.EVAL):
        global_step = tf.train.get_or_create_global_step()
        label_indices = tf.cast(labels, tf.int32)
        loss = tf.losses.softmax_cross_entropy(
            onehot_labels=tf.one_hot(label_indices, depth=10), logits=logits)
        tf.summary.scalar('OptimizeLoss', loss)

    if mode == Modes.PREDICT:
        predictions = {
            'classes': predicted_indices,
            'probabilities': probabilities
        }
        export_outputs = {
            SIGNATURE_NAME: tf.estimator.export.PredictOutput(predictions)
        }
        return tf.estimator.EstimatorSpec(
            mode, predictions=predictions, export_outputs=export_outputs)

    if mode == Modes.TRAIN:
        optimizer = tf.train.AdamOptimizer(learning_rate=0.001)
        train_op = optimizer.minimize(loss, global_step=global_step)
        return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

    if mode == Modes.EVAL:
        eval_metric_ops = {
            'accuracy': tf.metrics.accuracy(label_indices, predicted_indices)
        }
        return tf.estimator.EstimatorSpec(
            mode, loss=loss, eval_metric_ops=eval_metric_ops)


## 2. Applies the features in the neural network

In [6]:
def serving_input_fn(params):
    inputs = {INPUT_TENSOR_NAME: tf.placeholder(tf.float32, [None, 784])}
    return tf.estimator.export.ServingInputReceiver(inputs, inputs)


def read_and_decode(filename_queue):
    reader = tf.TFRecordReader()
    _, serialized_example = reader.read(filename_queue)

    features = tf.parse_single_example(
        serialized_example,
        features={
            'image_raw': tf.FixedLenFeature([], tf.string),
            'label': tf.FixedLenFeature([], tf.int64),
        })

    image = tf.decode_raw(features['image_raw'], tf.uint8)
    image.set_shape([784])
    image = tf.cast(image, tf.float32) * (1. / 255)
    label = tf.cast(features['label'], tf.int32)

    return image, label


def train_input_fn(training_dir, params):
    return _input_fn(training_dir, 'train.tfrecords', batch_size=100)


def eval_input_fn(training_dir, params):
    return _input_fn(training_dir, 'test.tfrecords', batch_size=100)


def _input_fn(training_dir, training_filename, batch_size=100):
    test_file = os.path.join(training_dir, training_filename)
    filename_queue = tf.train.string_input_producer([test_file])

    image, label = read_and_decode(filename_queue)
    images, labels = tf.train.batch(
        [image, label], batch_size=batch_size,
        capacity=1000 + 3 * batch_size)

    return {INPUT_TENSOR_NAME: images}, labels

## 3. Create a training job using the sagemaker.TensorFlow estimator

In [7]:
from sagemaker.tensorflow import TensorFlow

mnist_estimator = TensorFlow(entry_point='mnist.py',
                             role=role,
                             training_steps=1000, 
                             evaluation_steps=100,
                             train_instance_count=2,
                             train_instance_type='ml.c4.xlarge')

mnist_estimator.fit(inputs)

INFO:sagemaker:Created S3 bucket: sagemaker-us-east-1-152015771822
INFO:sagemaker:Creating training-job with name: sagemaker-tensorflow-py2-cpu-2018-03-09-01-45-40-437


...................................................................
[31mexecuting startup script (first run)[0m
[31m2018-03-09 01:51:07,564 INFO - root - running container entrypoint[0m
[31m2018-03-09 01:51:07,564 INFO - root - starting train task[0m
[31m2018-03-09 01:51:09,117 INFO - botocore.vendored.requests.packages.urllib3.connectionpool - Starting new HTTP connection (1): 169.254.170.2[0m
[31m2018-03-09 01:51:10,024 INFO - botocore.vendored.requests.packages.urllib3.connectionpool - Starting new HTTPS connection (1): s3.amazonaws.com[0m
[31m2018-03-09 01:51:10,095 INFO - botocore.vendored.requests.packages.urllib3.connectionpool - Starting new HTTPS connection (1): s3.amazonaws.com[0m
[31mINFO:tensorflow:----------------------TF_CONFIG--------------------------[0m
[31mINFO:tensorflow:{"environment": "cloud", "cluster": {"worker": ["algo-2:2222"], "ps": ["algo-1:2223", "algo-2:2223"], "master": ["algo-1:2222"]}, "task": {"index": 0, "type": "worker"}}[0m
[31mINFO:

[31m2018-03-09 01:51:18.221138: I tensorflow/core/distributed_runtime/master_session.cc:1004] Start master session be1d0b6476d77f61 with config: gpu_options { per_process_gpu_memory_fraction: 1 } allow_soft_placement: true[0m
[31mINFO:tensorflow:Saving checkpoints for 1 into s3://sagemaker-us-east-1-152015771822/sagemaker-tensorflow-py2-cpu-2018-03-09-01-45-40-437/checkpoints/model.ckpt.[0m
[31mINFO:tensorflow:loss = 2.2869756, step = 0[0m
[32m2018-03-09 01:51:46.806274: I tensorflow/core/distributed_runtime/master_session.cc:1004] Start master session ebe0d7e4a4e3ea6d with config: gpu_options { per_process_gpu_memory_fraction: 1 } allow_soft_placement: true[0m
[32mINFO:tensorflow:loss = 0.2025531, step = 58[0m
[31mINFO:tensorflow:global_step/sec: 4.24912[0m
[31mINFO:tensorflow:loss = 0.033198502, step = 143 (30.194 sec)[0m
[31mINFO:tensorflow:global_step/sec: 6.56009[0m
[32mINFO:tensorflow:loss = 0.033573892, step = 254 (29.474 sec)[0m
[31mINFO:tensorflow:global_ste

The **```fit```** method will create a training job in two **ml.c4.xlarge** instances. The logs above will show the instances doing training, evaluation, and incrementing the number of **training steps**. 

In the end of the training, the training job will generate a saved model for TF serving.

# Deploy the trained model to prepare for predictions

The deploy() method creates an endpoint which serves prediction requests in real-time.

In [8]:
mnist_predictor = mnist_estimator.deploy(initial_instance_count=1,
                                             instance_type='ml.m4.xlarge')

INFO:sagemaker:Creating model with name: sagemaker-tensorflow-py2-cpu-2018-03-09-01-45-40-437
INFO:sagemaker:Creating endpoint with name sagemaker-tensorflow-py2-cpu-2018-03-09-01-45-40-437


----------------------------------------------------------------------------------------------------------------!

# Invoking the endpoint

In [9]:
import numpy as np
from tensorflow.examples.tutorials.mnist import input_data

mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)

for i in range(10):
    data = mnist.test.images[i].tolist()
    tensor_proto = tf.make_tensor_proto(values=np.asarray(data), shape=[1, len(data)], dtype=tf.float32)
    predict_response = mnist_predictor.predict(tensor_proto)
    
    print("========================================")
    label = np.argmax(mnist.test.labels[i])
    print("label is {}".format(label))
    prediction = predict_response['outputs']['classes']['int64Val'][0]
    print("prediction is {}".format(prediction))

Successfully downloaded train-images-idx3-ubyte.gz 9912422 bytes.
Extracting /tmp/data/train-images-idx3-ubyte.gz
Successfully downloaded train-labels-idx1-ubyte.gz 28881 bytes.
Extracting /tmp/data/train-labels-idx1-ubyte.gz
Successfully downloaded t10k-images-idx3-ubyte.gz 1648877 bytes.
Extracting /tmp/data/t10k-images-idx3-ubyte.gz
Successfully downloaded t10k-labels-idx1-ubyte.gz 4542 bytes.
Extracting /tmp/data/t10k-labels-idx1-ubyte.gz
label is 7
prediction is 7
label is 2
prediction is 2
label is 1
prediction is 1
label is 0
prediction is 0
label is 4
prediction is 4
label is 1
prediction is 1
label is 4
prediction is 4
label is 9
prediction is 9
label is 5
prediction is 5
label is 9
prediction is 9


# Deleting the endpoint

In [10]:
sagemaker.Session().delete_endpoint(mnist_predictor.endpoint)

INFO:sagemaker:Deleting endpoint with name: sagemaker-tensorflow-py2-cpu-2018-03-09-01-45-40-437
