In [1]:
import os
import sys
import tarfile
from six.moves import urllib
from ipywidgets import FloatProgress
from IPython.display import display

DATA_URL = 'https://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz'


def cifar10_download(data_dir='/tmp/cifar10_data', print_progress=True):
    """Download and extract the tarball from Alex's website."""
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)

    if os.path.exists(os.path.join(data_dir, 'cifar-10-batches-bin')):
        print('cifar dataset already downloaded')
        return

    filename = DATA_URL.split('/')[-1]
    filepath = os.path.join(data_dir, filename)

    if not os.path.exists(filepath):
        f = FloatProgress(min=0, max=100)
        display(f)
        sys.stdout.write('\r>> Downloading %s ' % (filename))        

        def _progress(count, block_size, total_size):
            if print_progress:
                f.value = 100.0 * count * block_size / total_size

        filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, _progress)
        print()
        statinfo = os.stat(filepath)
        print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')

    tarfile.open(filepath, 'r:gz').extractall(data_dir)

In [2]:
import os
import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

role = get_execution_role()
bucket = sagemaker_session.default_bucket()

In [None]:
# Download cifar10 datset
cifar10_download()

In [None]:
sagemaker_session.upload_data(path='/tmp/cifar10_data', key_prefix='cifar10_data')

In [None]:
# Configure the hyperparameters from the instructor
training_image = '500842391574.dkr.ecr.us-west-2.amazonaws.com/horovod:latest'
#hosting_image = '<<PROVIDED BY INSTRUCTOR>>'

# Training data channel
channels = {'training': 's3://'+bucket+'/cifar10_data'}

# Optmized training parameters
hyperparameters = {'learning-rate': .0001, 'epochs': 12}

# Output of trained model
output_location = "s3://{}".format(bucket)

In [None]:
channels

In [None]:
from sagemaker.estimator import Estimator
# SageMaker estimator
horovod_estimator = Estimator(
    training_image,
    role=role,
    output_path=output_location,
    train_instance_count=2,
    train_instance_type='ml.p3.2xlarge',
    hyperparameters=hyperparameters,
    sagemaker_session=sagemaker_session
)

In [None]:
# Start training
#horovod_estimator.fit(channels)

__OUTPUT__

```
Creating SageMaker trainer environment:
TrainerEnvironment(input_dir='/opt/ml/input', input_config_dir='/opt/ml/input/config', model_dir='/opt/ml/model', output_dir='/opt/ml/output', hyperparameters={'epochs': '12', 'learning-rate': '0.0001'}, resource_config={'current_host': 'algo-1', 'network_interface_name': 'ethwe', 'hosts': ['algo-1', 'algo-2']}, input_data_config={'training': {'TrainingInputMode': 'File', 'RecordWrapperType': 'None', 'S3DistributionType': 'FullyReplicated'}}, output_data_dir='/opt/ml/output/data', hosts=['algo-1', 'algo-2'], channel_dirs={'training': '/opt/ml/input/data/training'}, current_host='algo-1', available_gpus=1, available_cpus=8)
Hyperparameters: 
{'epochs': '12', 'learning-rate': '0.0001'}
Creating SageMaker trainer environment:
TrainerEnvironment(input_dir='/opt/ml/input', input_config_dir='/opt/ml/input/config', model_dir='/opt/ml/model', output_dir='/opt/ml/output', hyperparameters={'epochs': '12', 'learning-rate': '0.0001'}, resource_config={'current_host': 'algo-2', 'network_interface_name': 'ethwe', 'hosts': ['algo-1', 'algo-2']}, input_data_config={'training': {'TrainingInputMode': 'File', 'RecordWrapperType': 'None', 'S3DistributionType': 'FullyReplicated'}}, output_data_dir='/opt/ml/output/data', hosts=['algo-1', 'algo-2'], channel_dirs={'training': '/opt/ml/input/data/training'}, current_host='algo-2', available_gpus=1, available_cpus=8)
Hyperparameters: 
{'epochs': '12', 'learning-rate': '0.0001'}
Billable seconds: 257
```

In [3]:
# Simulate hyperparameters
hyperparameters = {'learning-rate': .0001,
                   'epochs': 12,
                   'sagemaker_process_slots_per_host': 1}
hyperparameters

{'epochs': 12, 'learning-rate': 0.0001, 'sagemaker_process_slots_per_host': 1}

In [4]:
tmp_variable = '/opt/ml/output'
output = {'output_data_dir': tmp_variable}
output

{'output_data_dir': '/opt/ml/output'}

In [7]:
dict(output_data_dir=tmp_variable)

{'output_data_dir': '/opt/ml/output'}

In [None]:
# Simulate env
available_gpus = 1
channel_dirs = {'training': '/opt/ml/input/data/training'}
output_dir = {'output_data_dir': '/opt/ml/output'}

In [None]:
def _decode(obj):  # type: (bytes or str or unicode or object) -> unicode
    """Decode an object to unicode.
    Args:
        obj (bytes or str or unicode or anything serializable): object to be decoded
    Returns:
        object decoded in unicode.
    """
    if obj is None:
        return u''
    if six.PY3 and isinstance(obj, six.binary_type):
        # transforms a byte string (b'') in unicode
        return obj.decode('latin1')
    elif six.PY3:
        # PY3 strings are unicode.
        return str(obj)
    elif isinstance(obj, six.text_type):
        # returns itself if it is unicode
        return obj
    else:
        # decodes pY2 string to unicode
        return str(obj).decode('utf-8')

In [None]:
def to_cmd_args(mapping):  # type: (dict) -> list
    """Transform a dictionary in a list of cmd arguments.
    Example:
        >>>args = mapping.to_cmd_args({'model_dir': '/opt/ml/model', 'batch_size': 25})
        >>>
        >>>print(args)
        ['--model_dir', '/opt/ml/model', '--batch_size', 25]
    Args:
        mapping (dict[str, object]): A Python mapping.
    Returns:
        (list): List of cmd arguments
    """

    sorted_keys = sorted(mapping.keys())

    def arg_name(obj):
        string = _decode(obj)
        if string:
            return u'--%s' % string if len(string) > 1 else u'-%s' % string
        else:
            return u''

    arg_names = [arg_name(argument) for argument in sorted_keys]

    def arg_value(value):
        if hasattr(value, 'items'):
            map_items = ['%s=%s' % (k, v) for k, v in sorted(value.items())]
            return ','.join(map_items)
        return _decode(value)

    arg_values = [arg_value(mapping[key]) for key in sorted_keys]

    items = zip(arg_names, arg_values)

    return [item for item in itertools.chain.from_iterable(items)]

In [None]:
from __future__ import absolute_import
import collections
import itertools
import json
import six

cmd_hyperparameters = to_cmd_args(hyperparameters)
cmd_hyperparameters

In [None]:
channels = to_cmd_args(channel_dirs)
output = to_cmd_args(output_dir)

In [None]:
output

In [None]:
channels

In [None]:
import sys
import textwrap
_MPI_SCRIPT = "./mpi_script.sh"

python_cmd = [sys.executable, 'train.py']
python_cmd.extend(cmd_hyperparameters)
python_cmd.extend(channels)
python_cmd.extend(output)

content = textwrap.dedent("""#!/usr/bin/env bash
touch /mpi_is_running
%s
EXIT_CODE=$?
touch /mpi_is_finished
exit ${EXIT_CODE}
""" % ' '.join(python_cmd))

# build MPI script
with open(_MPI_SCRIPT, 'w') as w:
    w.write(content)

In [None]:
!cat ./mpi_script.sh