In [None]:
import os
import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

role = get_execution_role()

In [None]:
!python generate_cifar10_tfrecords.py --data-dir ./data

In [None]:
from sagemaker.tensorflow import TensorFlow

import subprocess
instance_type = 'local'

if subprocess.call('nvidia-smi') == 0:
    ## Set type to GPU if one is present
    instance_type = 'local_gpu'
    
local_hyperparameters = {'epochs': 2, 'batch-size' : 64}

source_dir = os.path.join(os.getcwd(), 'source_dir')
estimator = TensorFlow(entry_point='cifar10_keras_main.py',
                       source_dir=source_dir,
                       role=role,
                       framework_version='1.12.0',
                       py_version='py3',
                       hyperparameters=local_hyperparameters,
                       train_instance_count=1, train_instance_type=instance_type)

In [None]:
local_inputs = {'train' : 'file://'+os.getcwd()+'/data/train', 
                'validation' : 'file://'+os.getcwd()+'/data/validation', 
                'eval' : 'file://'+os.getcwd()+'/data/eval'}
estimator.fit(local_inputs)

In [None]:
dataset_location = sagemaker_session.upload_data(path='data', key_prefix='data/DEMO-cifar10-tf')
display(dataset_location)

In [None]:
keras_metric_definition = [
    {'Name': 'train:loss', 'Regex': '.*loss: ([0-9\\.]+) - acc: [0-9\\.]+.*'},
    {'Name': 'train:accuracy', 'Regex': '.*loss: [0-9\\.]+ - acc: ([0-9\\.]+).*'},
    {'Name': 'validation:accuracy', 'Regex': '.*step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: [0-9\\.]+ - val_acc: ([0-9\\.]+).*'},
    {'Name': 'validation:loss', 'Regex': '.*step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: ([0-9\\.]+) - val_acc: [0-9\\.]+.*'},
    {'Name': 'sec/steps', 'Regex': '.* - \d+s (\d+)[mu]s/step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: [0-9\\.]+ - val_acc: [0-9\\.]+'}
]

In [None]:
hyperparameters = {'epochs': 10, 'batch-size' : 256}

In [None]:
from sagemaker.tensorflow import TensorFlow

source_dir = os.path.join(os.getcwd(), 'source_dir')
estimator = TensorFlow(base_job_name='cifar10-tf',
                       entry_point='cifar10_keras_main.py',
                       source_dir=source_dir,
                       role=role,
                       framework_version='1.12.0',
                       py_version='py3',
                       hyperparameters=hyperparameters,
                       train_instance_count=1, train_instance_type='ml.p3.2xlarge',
                       tags = [{'Key' : 'Project', 'Value' : 'cifar10'},{'Key' : 'TensorBoard', 'Value' : 'file'}],
                       metric_definitions=keras_metric_definition)

In [None]:
remote_inputs = {'train' : dataset_location+'/train', 'validation' : dataset_location+'/validation', 'eval' : dataset_location+'/eval'}
estimator.fit(remote_inputs, wait=True)

In [None]:
from IPython.core.display import Markdown

link = 'https://console.aws.amazon.com/cloudwatch/home?region='+sagemaker_session.boto_region_name+'#metricsV2:query=%7B/aws/sagemaker/TrainingJobs,TrainingJobName%7D%20'+estimator.latest_training_job.job_name
display(Markdown('CloudWatch metrics: [link]('+link+')'))
display(Markdown('After you choose a metric, change the period to 1 Minute (Graphed Metrics -> Period)'))

In [None]:
from sagemaker.tensorflow import TensorFlow

source_dir = os.path.join(os.getcwd(), 'source_dir')
estimator_pipe = TensorFlow(base_job_name='pipe-cifar10-tf',
                       entry_point='cifar10_keras_main.py',
                       source_dir=source_dir,
                       role=role,
                       framework_version='1.12.0',
                       py_version='py3',
                       hyperparameters=hyperparameters,
                       train_instance_count=1, train_instance_type='ml.p3.2xlarge',
                       tags = [{'Key' : 'Project', 'Value' : 'cifar10'},{'Key' : 'TensorBoard', 'Value' : 'pipe'}],
                       metric_definitions=keras_metric_definition,
                       input_mode='Pipe')

In [None]:
remote_inputs = {'train' : dataset_location+'/train/train.tfrecords', 'validation' : dataset_location+'/validation', 'eval' : dataset_location+'/eval'}
estimator_pipe.fit(remote_inputs, wait=False)

In [None]:
from sagemaker.tensorflow import TensorFlow

train_instance_type='ml.p3.8xlarge'
train_instance_count = 2
gpus_per_host = 4

In [None]:
distributions = {'mpi': {
                    'enabled': True,
                    'processes_per_host': gpus_per_host
                        }
                }

keras_metric_definition = [
    {'Name': 'train:loss', 'Regex': '.*loss: ([0-9\\.]+) - acc: [0-9\\.]+.*'},
    {'Name': 'train:accuracy', 'Regex': '.*loss: [0-9\\.]+ - acc: ([0-9\\.]+).*'},
    {'Name': 'validation:accuracy', 'Regex': '.*step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: [0-9\\.]+ - val_acc: ([0-9\\.]+).*'},
    {'Name': 'validation:loss', 'Regex': '.*step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: ([0-9\\.]+) - val_acc: [0-9\\.]+.*'},
    {'Name': 'sec/steps', 'Regex': '.* - \d+s (\d+)[mu]s/step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: [0-9\\.]+ - val_acc: [0-9\\.]+'}
]

hyperparameters = {'epochs': 20, 'batch-size' : 256}

input_mode = 'File'

In [None]:
%load_ext autoreload
%autoreload 2

from shard import do_shard
from sagemaker.session import s3_input

def shard_data_and_upload(local_data_dir, gpus_per_host, num_of_instances):
    do_shard(local_data_dir, gpus_per_host, num_of_instances)
    dataset_location = sagemaker_session.upload_data(path='data', key_prefix='data/DEMO-cifar10-tf')
    display(dataset_location)

    shuffle_config = sagemaker.session.ShuffleConfig(234)
    train_s3_uri_prefix = dataset_location

    remote_inputs = {}

    for idx in range(gpus_per_host):
        train_s3_uri = f'{train_s3_uri_prefix}/train/{idx}/'
        train_s3_input = s3_input(train_s3_uri, shuffle_config=shuffle_config, distribution='ShardedByS3Key')
        remote_inputs[f'train_{idx}'] = train_s3_input
        
        remote_inputs['validation_{}'.format(idx)] = '{}/validation'.format(dataset_location)

    remote_inputs['validation'] = '{}/validation'.format(dataset_location)
    remote_inputs['eval'] = '{}/eval'.format(dataset_location)
    
    return remote_inputs

In [None]:
remote_inputs = shard_data_and_upload('./data', gpus_per_host, train_instance_count)

In [None]:
remote_inputs

In [None]:
source_dir = os.path.join(os.getcwd(), 'source_dir')
estimator_dist = TensorFlow(base_job_name='horovod-cifar10-tf',
                       entry_point='cifar10_keras_main-tf2.py',
                       source_dir=source_dir,
                       role=role,
                       framework_version='2.1.0',
                       py_version='py3',
                       hyperparameters=hyperparameters,
                       train_instance_count=train_instance_count,
                       train_instance_type=train_instance_type,
                       tags = [{'Key' : 'Project', 'Value' : 'cifar10'},{'Key' : 'TensorBoard', 'Value' : 'horovod'}],
                       metric_definitions=keras_metric_definition,
                       distributions=distributions,
                       input_mode=input_mode)

In [None]:
estimator_dist.fit(remote_inputs, wait=True)

In [None]:
from sagemaker.tensorflow import TensorFlow

train_instance_type='ml.p3.8xlarge'
train_instance_count = 2
gpus_per_host = 4

In [None]:
distributions = {'mpi': {
                    'enabled': True,
                    'processes_per_host': gpus_per_host
                        }
                }

keras_metric_definition = [
    {'Name': 'train:loss', 'Regex': '.*loss: ([0-9\\.]+) - acc: [0-9\\.]+.*'},
    {'Name': 'train:accuracy', 'Regex': '.*loss: [0-9\\.]+ - acc: ([0-9\\.]+).*'},
    {'Name': 'validation:accuracy', 'Regex': '.*step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: [0-9\\.]+ - val_acc: ([0-9\\.]+).*'},
    {'Name': 'validation:loss', 'Regex': '.*step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: ([0-9\\.]+) - val_acc: [0-9\\.]+.*'},
    {'Name': 'sec/steps', 'Regex': '.* - \d+s (\d+)[mu]s/step - loss: [0-9\\.]+ - acc: [0-9\\.]+ - val_loss: [0-9\\.]+ - val_acc: [0-9\\.]+'}
]

hyperparameters = {'epochs': 20, 'batch-size' : 256}

input_mode = 'Pipe'

In [None]:
remote_inputs = shard_data_and_upload('./data', gpus_per_host, train_instance_count)

In [None]:
remote_inputs

In [None]:
source_dir = os.path.join(os.getcwd(), 'source_dir')
estimator_dist = TensorFlow(base_job_name='horovod-pipe-cifar10-tf',
                       entry_point='cifar10_keras_main-tf2.py',
                       source_dir=source_dir,
                       role=role,
                       framework_version='2.1.0',
                       py_version='py3',
                       hyperparameters=hyperparameters,
                       train_instance_count=train_instance_count,
                       train_instance_type=train_instance_type,
                       tags = [{'Key' : 'Project', 'Value' : 'cifar10'},{'Key' : 'TensorBoard', 'Value' : 'horovod-pipe'}],
                       metric_definitions=keras_metric_definition,
                       distributions=distributions,
                       input_mode=input_mode)

In [None]:
estimator_dist.fit(remote_inputs, wait=True)

In [None]:
!python generate_tensorboard_command.py

In [None]:
predictor = estimator.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge')

In [None]:
# Creating fake prediction data
import numpy as np
data = np.random.randn(1, 32, 32, 3)
print("Predicted class is {}".format(np.argmax(predictor.predict(data)['predictions'])))

In [None]:
from keras.datasets import cifar10
from keras.preprocessing.image import ImageDataGenerator
from sklearn.metrics import confusion_matrix
datagen = ImageDataGenerator()

(x_train, y_train), (x_test, y_test) = cifar10.load_data()

def predict(data):
    predictions = predictor.predict(data)['predictions']
    return predictions

In [None]:
batch_size = 128
predicted = []
actual = []
batches = 0
for data in datagen.flow(x_test,y_test,batch_size=batch_size):
    for i,prediction in enumerate(predict(data[0])):
        predicted.append(np.argmax(prediction))
        actual.append(data[1][i][0])
    batches += 1
    if batches >= len(x_test) / batch_size:
        break

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix

accuracy = accuracy_score(y_pred=predicted,y_true=actual)
display('Average accuracy: {}%'.format(round(accuracy*100,2)))

In [None]:
%matplotlib inline
import seaborn as sn
import pandas as pd
import matplotlib.pyplot as plt

cm = confusion_matrix(y_pred=predicted,y_true=actual)
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
sn.set(rc={'figure.figsize':(11.7,8.27)})
sn.set(font_scale=1.4)#for label size
sn.heatmap(cm, annot=True,annot_kws={"size": 10})# font size

In [None]:
sagemaker_session.delete_endpoint(predictor.endpoint)