# Exercise05 : Distributed Training

Here we change our sample (see "[Exercise03 : Just Train in Your Working Machine](/notebooks/exercise03_train_simple.ipynb)") for distributed training using multiple machines.

In this exercise we use Horovod framework (https://github.com/horovod/horovod) using built-in ```azureml.train.dnn.TensorFlow``` estimator, but you can also configure using primitive ```azureml.core.ScriptRunConfig``` for the training with TensorFlow and Horovod. (See [here](https://tsmatz.wordpress.com/2019/01/17/azure-machine-learning-service-custom-amlcompute-and-runconfig-for-mxnet-distributed-training/) for sample script with TensorFlow and Horovod using ```azureml.core.ScriptRunConfig```.)

*back to [index](/Readme.md)*

## Save your training script as file (train.py)

Create ```scirpt``` directory.

In [1]:
import os
script_folder = './script'
os.makedirs(script_folder, exist_ok=True)

Change our original source code ```train.py``` (see "[Exercise03 : Just Train in Your Working Machine](/notebooks/exercise03_train_simple.ipynb)") as follows. The lines commented "##### modified" is modified lines.    
After that, please add the following ```%%writefile``` at the beginning of the source code and run this cell.    
This source code is saved as ```./script/train_horovod.py```.

In [2]:
%%writefile script/train_horovod.py
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import sys
import os
import shutil
import argparse
import math

import tensorflow as tf
import horovod.tensorflow as hvd ##### modified

FLAGS = None
batch_size = 100

#
# define functions for Estimator
#

def _my_input_fn(filepath, num_epochs):
    # image - 784 (=28 x 28) elements of grey-scaled integer value [0, 1]
    # label - digit (0, 1, ..., 9)
    data_queue = tf.train.string_input_producer(
        [filepath],
        num_epochs = num_epochs) # data is repeated and it raises OutOfRange when data is over
    data_reader = tf.TFRecordReader()
    _, serialized_exam = data_reader.read(data_queue)
    data_exam = tf.parse_single_example(
        serialized_exam,
        features={
            'image_raw': tf.FixedLenFeature([], tf.string),
            'label': tf.FixedLenFeature([], tf.int64)
        })
    data_image = tf.decode_raw(data_exam['image_raw'], tf.uint8)
    data_image.set_shape([784])
    data_image = tf.cast(data_image, tf.float32) * (1. / 255)
    data_label = tf.cast(data_exam['label'], tf.int32)
    data_batch_image, data_batch_label = tf.train.batch(
        [data_image, data_label],
        batch_size=batch_size)
    return {'inputs': data_batch_image}, data_batch_label

def _get_input_fn(filepath, num_epochs):
    return lambda: _my_input_fn(filepath, num_epochs)

def _my_model_fn(features, labels, mode):
    # with tf.device(...): # You can set device if using GPUs

    # define network and inference
    # (simple 2 fully connected hidden layer : 784->128->64->10)
    with tf.name_scope('hidden1'):
        weights = tf.Variable(
            tf.truncated_normal(
                [784, FLAGS.first_layer],
                stddev=1.0 / math.sqrt(float(784))),
            name='weights')
        biases = tf.Variable(
            tf.zeros([FLAGS.first_layer]),
            name='biases')
        hidden1 = tf.nn.relu(tf.matmul(features['inputs'], weights) + biases)
    with tf.name_scope('hidden2'):
        weights = tf.Variable(
            tf.truncated_normal(
                [FLAGS.first_layer, FLAGS.second_layer],
                stddev=1.0 / math.sqrt(float(FLAGS.first_layer))),
            name='weights')
        biases = tf.Variable(
            tf.zeros([FLAGS.second_layer]),
            name='biases')
        hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases)
    with tf.name_scope('softmax_linear'):
        weights = tf.Variable(
            tf.truncated_normal(
                [FLAGS.second_layer, 10],
                stddev=1.0 / math.sqrt(float(FLAGS.second_layer))),
        name='weights')
        biases = tf.Variable(
            tf.zeros([10]),
            name='biases')
        logits = tf.matmul(hidden2, weights) + biases
 
    # compute evaluation matrix
    predicted_indices = tf.argmax(input=logits, axis=1)
    if mode != tf.estimator.ModeKeys.PREDICT:
        label_indices = tf.cast(labels, tf.int32)
        accuracy = tf.metrics.accuracy(label_indices, predicted_indices)
        tf.summary.scalar('accuracy', accuracy[1]) # output to TensorBoard
 
        loss = tf.losses.sparse_softmax_cross_entropy(
            labels=labels,
            logits=logits)
 
    # define operations
    if mode == tf.estimator.ModeKeys.TRAIN:
        global_step = tf.train.get_or_create_global_step()        
        optimizer = tf.train.GradientDescentOptimizer(
            learning_rate=FLAGS.learning_rate)
        optimizer = hvd.DistributedOptimizer(optimizer) ##### modified
        train_op = optimizer.minimize(
            loss=loss,
            global_step=global_step)
        return tf.estimator.EstimatorSpec(
            mode,
            loss=loss,
            train_op=train_op)
    if mode == tf.estimator.ModeKeys.EVAL:
        eval_metric_ops = {
            'accuracy': accuracy
        }
        return tf.estimator.EstimatorSpec(
            mode,
            loss=loss,
            eval_metric_ops=eval_metric_ops)
    if mode == tf.estimator.ModeKeys.PREDICT:
        probabilities = tf.nn.softmax(logits, name='softmax_tensor')
        predictions = {
            'classes': predicted_indices,
            'probabilities': probabilities
        }
        export_outputs = {
            'prediction': tf.estimator.export.PredictOutput(predictions)
        }
        return tf.estimator.EstimatorSpec(
            mode,
            predictions=predictions,
            export_outputs=export_outputs)

def _my_serving_input_fn():
    inputs = {'inputs': tf.placeholder(tf.float32, [None, 784])}
    return tf.estimator.export.ServingInputReceiver(inputs, inputs)

#
# Main
#

parser = argparse.ArgumentParser()
parser.add_argument(
    '--data_folder',
    type=str,
    default='./data',
    help='Folder path for input data')
parser.add_argument(
    '--chkpoint_folder',
    type=str,
    default='./logs',  # AML experiments logs folder
    help='Folder path for checkpoint files')
parser.add_argument(
    '--model_folder',
    type=str,
    default='./outputs',  # AML experiments outputs folder
    help='Folder path for model output')
parser.add_argument(
    '--learning_rate',
    type=float,
    default='0.07',
    help='Learning Rate')
parser.add_argument(
    '--first_layer',
    type=int,
    default='128',
    help='Neuron number for the first hidden layer')
parser.add_argument(
    '--second_layer',
    type=int,
    default='64',
    help='Neuron number for the second hidden layer')
FLAGS, unparsed = parser.parse_known_args()

# clean checkpoint and model folder if exists
if os.path.exists(FLAGS.chkpoint_folder) :
    for file_name in os.listdir(FLAGS.chkpoint_folder):
        file_path = os.path.join(FLAGS.chkpoint_folder, file_name)
        if os.path.isfile(file_path):
            os.remove(file_path)
        elif os.path.isdir(file_path):
            shutil.rmtree(file_path)
if os.path.exists(FLAGS.model_folder) :
    for file_name in os.listdir(FLAGS.model_folder):
        file_path = os.path.join(FLAGS.model_folder, file_name)
        if os.path.isfile(file_path):
            os.remove(file_path)
        elif os.path.isdir(file_path):
            shutil.rmtree(file_path)

hvd.init() ##### modified

# read TF_CONFIG
run_config = tf.contrib.learn.RunConfig()

# create Estimator
mnist_fullyconnected_classifier = tf.estimator.Estimator(
    model_fn=_my_model_fn,
    model_dir=FLAGS.chkpoint_folder if hvd.rank() == 0 else None, ##### modified
    config=run_config)
train_spec = tf.estimator.TrainSpec(
    input_fn=_get_input_fn(os.path.join(FLAGS.data_folder, 'train.tfrecords'), 2),
    #max_steps=60000 * 2 / batch_size)
    max_steps=(60000 * 2 / batch_size) // hvd.size(), ##### modified
    hooks=[hvd.BroadcastGlobalVariablesHook(0)]) ##### modified
eval_spec = tf.estimator.EvalSpec(
    input_fn=_get_input_fn(os.path.join(FLAGS.data_folder, 'test.tfrecords'), 1),
    steps=10000 * 1 / batch_size,
    start_delay_secs=0)

# run !
tf.estimator.train_and_evaluate(
    mnist_fullyconnected_classifier,
    train_spec,
    eval_spec
)

# save model and variables
if hvd.rank() == 0 : ##### modified
    model_dir = mnist_fullyconnected_classifier.export_savedmodel(
        export_dir_base = FLAGS.model_folder,
        serving_input_receiver_fn = _my_serving_input_fn)
    print('current working directory is ', os.getcwd())
    print('model is saved ', model_dir)

Writing script/train_horovod.py


## Train on multiple machines (Horovod)

### Step 1 : Get workspace setting

Before starting, you must read your configuration settings. (See "[Exercise01 : Prepare Config Settings](/notebooks/exercise01_prepare_config.ipynb)".)

In [3]:
from azureml.core import Workspace
import azureml.core

ws = Workspace.from_config()

Found the config file in: /data/home/username/azure-ml-tensorflow-complete-sample/notebooks/aml_config/config.json


### Step 2 : Create multiple virtual machines (cluster)

Create your new AML compute for distributed clusters. By enabling auto-scaling from 0 to 4, you can save money (all nodes are terminated) if it's inactive. see https://docs.microsoft.com/en-us/azure/architecture/best-practices/auto-scaling 
If already exists, this script will get the existing cluster. The script below creates a cluster of  D2_v2 machines - vm_size='Standard_D2_v2',

In [4]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

try:
    compute_target = ComputeTarget(workspace=ws, name='mycluster01')
    print('found existing:', compute_target.name)
except ComputeTargetException:
    print('creating new.')
    compute_config = AmlCompute.provisioning_configuration(
        vm_size='Standard_D2_v2',
        min_nodes=0,
        max_nodes=4)
    compute_target = ComputeTarget.create(ws, 'mycluster01', compute_config)
    compute_target.wait_for_completion(show_output=True)

creating new.
Creating
Succeeded
AmlCompute wait for completion finished
Minimum number of nodes requested have been provisioned


In [5]:
# get a status for the current cluster.
print(compute_target.status.serialize())

{'allocationState': 'Steady', 'allocationStateTransitionTime': '2018-12-10T23:58:49.117000+00:00', 'creationTime': '2018-12-10T23:58:14.928393+00:00', 'currentNodeCount': 0, 'errors': None, 'modifiedTime': '2018-12-10T23:59:03.412210+00:00', 'nodeStateCounts': {'idleNodeCount': 0, 'leavingNodeCount': 0, 'preemptedNodeCount': 0, 'preparingNodeCount': 0, 'runningNodeCount': 0, 'unusableNodeCount': 0}, 'provisioningState': 'Succeeded', 'provisioningStateTransitionTime': None, 'scaleSettings': {'minNodeCount': 0, 'maxNodeCount': 4, 'nodeIdleTimeBeforeScaleDown': 'PT120S'}, 'targetNodeCount': 0, 'vmPriority': 'Dedicated', 'vmSize': 'STANDARD_D2_V2'}


### Step 3 : Prepare datastore

You can mount your datastore (See "[Exercise02 : Prepare Datastore](/notebooks/exercise02_prepare_datastore.ipynb)") into your Batch AI compute.

In [6]:
from azureml.core import Datastore

# get your datastore (See "Exercise 02 : Prepare Datastore")
ds = Datastore.get(ws, datastore_name="myblob01")
ds_data = ds.path('tfdata')

### Step 4 : Generate estimator **

Run distributed training by Horovod using built-in ```azureml.train.dnn.TensorFlow``` estimator.    
If you want to customize more detailed settings (other frameworks, custom images, etc), please use base ```azureml.train.estimator.Estimator``` (parent class).

** Note : This estimator (```azureml.train.dnn.TensorFlow```) is an estimator in AML SDK, and not the same as ```tf.estimator.Estimator``` in TensorFlow. Do not confused for the terminology "Estimator".

In [7]:
from azureml.train.dnn import TensorFlow

script_params={
    '--data_folder': ds_data
}
estimator = TensorFlow(
    source_directory='./script',
    compute_target=compute_target,
    script_params=script_params,
    entry_script='train_horovod.py',
    node_count=2,
    process_count_per_node=1,
    distributed_backend='mpi',
    use_gpu=False)

### Step 5 : Run script and wait for completion

In [8]:
from azureml.core import Experiment

exp = Experiment(workspace=ws, name='tf_distribued')
run = exp.submit(estimator)
run.wait_for_completion(show_output=True)

RunId: tf_distribued_1544486640614

Streaming azureml-logs/20_image_build_log.txt

2018/12/11 00:04:08 Using acb_vol_86c50ced-41fd-4ff6-9367-dfb09b8c9363 as the home volume
2018/12/11 00:04:08 Creating Docker network: acb_default_network, driver: 'bridge'
2018/12/11 00:04:09 Successfully set up Docker network: acb_default_network
2018/12/11 00:04:09 Setting up Docker configuration...
2018/12/11 00:04:10 Successfully set up Docker configuration
2018/12/11 00:04:10 Logging in to registry: ws016106599079.azurecr.io
2018/12/11 00:04:12 Successfully logged in
2018/12/11 00:04:12 Executing step ID: acb_step_0. Working directory: '', Network: 'acb_default_network'
2018/12/11 00:04:12 Obtaining source code and scanning for dependencies...
2018/12/11 00:04:13 Successfully obtained source code and scanned for dependencies
Sending build context to Docker daemon    150kB

Step 1/13 : FROM mcr.microsoft.com/azureml/base:0.2.0
0.2.0: Pulling from azureml/base
3b37166ec614: Pulling fs layer
504facff2

  Downloading https://files.pythonhosted.org/packages/14/2c/cd551d81dbe15200be1cf41cd03869a46fe7226e7450af7a6545bfc474c9/idna-2.8-py2.py3-none-any.whl (58kB)
Collecting chardet<3.1.0,>=3.0.2 (from requests>=2.19.1->azureml-core==1.0.2.*->azureml-defaults->-r /azureml-setup/condaenv.hc_rqzxj.requirements.txt (line 1))
  Downloading https://files.pythonhosted.org/packages/bc/a9/01ffebfb562e4274b6487b4bb1ddec7ca55ec7510b22e4c51f14098443b8/chardet-3.0.4-py2.py3-none-any.whl (133kB)
Collecting requests-oauthlib>=0.5.0 (from msrest>=0.5.1->azureml-core==1.0.2.*->azureml-defaults->-r /azureml-setup/condaenv.hc_rqzxj.requirements.txt (line 1))
  Downloading https://files.pythonhosted.org/packages/94/e7/c250d122992e1561690d9c0f7856dadb79d61fd4bdd0e598087dce607f6c/requests_oauthlib-1.0.0-py2.py3-none-any.whl
Collecting isodate>=0.6.0 (from msrest>=0.5.1->azureml-core==1.0.2.*->azureml-defaults->-r /azureml-setup/condaenv.hc_rqzxj.requirements.txt (line 1))
  Downloading https://files.pythonhoste

  Complete output from command /azureml-envs/azureml_c41295f9cf92073d57b356b8eef29378/bin/python -u -c "import setuptools, tokenize;__file__='/tmp/pip-install-82if505l/horovod/setup.py';f=getattr(tokenize, 'open', open)(__file__);code=f.read().replace('\r\n', '\n');f.close();exec(compile(code, __file__, 'exec'))" bdist_wheel -d /tmp/pip-wheel-openme11 --python-tag cp36:
  zip_safe flag not set; analyzing archive contents...
  pycparser.ply.__pycache__.lex.cpython-36: module references __file__
  pycparser.ply.__pycache__.lex.cpython-36: module MAY be using inspect.getsourcefile
  pycparser.ply.__pycache__.yacc.cpython-36: module references __file__
  pycparser.ply.__pycache__.yacc.cpython-36: module MAY be using inspect.getsourcefile
  pycparser.ply.__pycache__.yacc.cpython-36: module MAY be using inspect.stack
  pycparser.ply.__pycache__.ygen.cpython-36: module references __file__
  
  Installed /tmp/pip-install-82if505l/horovod/.eggs/pycparser-2.19-py3.6.egg
  running bdist_wheel
  r

    Uninstalling setuptools-40.6.2:
      Successfully uninstalled setuptools-40.6.2
  Running setup.py install for horovod: started
    Running setup.py install for horovod: finished with status 'done'
Successfully installed PyJWT-1.7.1 PyOpenSSL-18.0.0 SecretStorage-2.3.1 absl-py-0.6.1 adal-1.2.0 antlr4-python3-runtime-4.7.1 applicationinsights-0.11.7 argcomplete-1.9.4 asn1crypto-0.24.0 astor-0.7.1 azure-cli-command-modules-nspkg-2.0.2 azure-cli-core-2.0.52 azure-cli-nspkg-3.0.3 azure-cli-profile-2.1.2 azure-cli-telemetry-1.0.0 azure-common-1.1.16 azure-graphrbac-0.53.0 azure-mgmt-authorization-0.51.1 azure-mgmt-containerregistry-2.5.0 azure-mgmt-keyvault-1.1.0 azure-mgmt-nspkg-3.0.2 azure-mgmt-resource-2.0.0 azure-mgmt-storage-3.1.0 azure-nspkg-3.0.2 azure-storage-blob-1.4.0 azure-storage-common-1.4.0 azure-storage-nspkg-3.1.0 azureml-core-1.0.2 azureml-defaults-1.0.2 backports.tempfile-1.0 backports.weakref-1.0.post1 bcrypt-3.1.4 cffi-1.11.5 chardet-3.0.4 colorama-0.4.1 contextlib2

{'runId': 'tf_distribued_1544486640614',
 'target': 'mycluster01',
 'status': 'Finalizing',
 'startTimeUtc': '2018-12-11T00:14:13.697925Z',
 'properties': {'azureml.runsource': 'experiment',
  'ContentSnapshotId': 'a956a4e2-8f1b-4d44-8581-30bbefb4d480'},
 'runDefinition': {'Script': 'train_horovod.py',
  'Arguments': ['--data_folder',
   '$AZUREML_DATAREFERENCE_fc58254fa9c942f7a384af1aa8253e07'],
  'SourceDirectoryDataStore': None,
  'Framework': 0,
  'Communicator': 3,
  'Target': 'mycluster01',
  'DataReferences': {'fc58254fa9c942f7a384af1aa8253e07': {'DataStoreName': 'myblob01',
    'Mode': 'Mount',
    'PathOnDataStore': 'tfdata',
    'PathOnCompute': None,
    'Overwrite': False}},
  'JobName': None,
  'AutoPrepareEnvironment': True,
  'MaxRunDurationSeconds': None,
  'NodeCount': 2,
  'Environment': {'Python': {'InterpreterPath': 'python',
    'UserManagedDependencies': False,
    'CondaDependencies': {'name': 'project_environment',
     'dependencies': ['python=3.6.2',
      {'p

### Step 6 : Check results

In [9]:
run.get_file_names()

['azureml-logs/20_image_build_log.txt',
 'azureml-logs/60_control_log_rank_0.txt',
 'azureml-logs/60_control_log_rank_1.txt',
 'azureml-logs/80_driver_log_rank_0.txt',
 'logs/checkpoint',
 'logs/model.ckpt-0.meta',
 'logs/events.out.tfevents.1544487468.86ceebae67124cd6be30443940e3e727000000',
 'logs/graph.pbtxt',
 'logs/model.ckpt-0.data-00000-of-00001',
 'logs/model.ckpt-0.index',
 'azureml-logs/80_driver_log_rank_1.txt',
 'logs/model.ckpt-600.index',
 'logs/model.ckpt-600.meta',
 'logs/model.ckpt-600.data-00000-of-00001',
 'logs/eval/events.out.tfevents.1544487483.86ceebae67124cd6be30443940e3e727000000',
 'outputs/1544487483/saved_model.pb',
 'outputs/1544487483/variables/variables.data-00000-of-00001',
 'outputs/1544487483/variables/variables.index',
 'driver_log',
 'azureml-logs/azureml.log',
 'azureml-logs/56_batchai_stderr.txt',
 'azureml-logs/55_batchai_execution-tvm-829305193_1-20181211t001024z.txt']

**Please change ```1544487483``` to meet previous results.**

In [14]:
run.download_file(
    name='outputs/1544487483/saved_model.pb',
    output_file_path='distributed_model/saved_model.pb')
run.download_file(
    name='outputs/1544487483/variables/variables.data-00000-of-00001',
    output_file_path='distributed_model/variables/variables.data-00000-of-00001')
run.download_file(
    name='outputs/1544487483/variables/variables.index',
    output_file_path='distributed_model/variables/variables.index')

In [15]:
import tensorflow as tf

# Read data by tensor
dataset = tf.data.TFRecordDataset('./data/test.tfrecords')
iterator = dataset.make_one_shot_iterator()
data_org = iterator.get_next()
data_exam = tf.parse_single_example(
    data_org,
    features={
        'image_raw': tf.FixedLenFeature([], tf.string),
        'label': tf.FixedLenFeature([], tf.int64)
    })
data_image = tf.decode_raw(data_exam['image_raw'], tf.uint8)
data_image.set_shape([784])
data_image = tf.cast(data_image, tf.float32) * (1. / 255)
data_label = tf.cast(data_exam['label'], tf.int32)

# Run tensor and generate data
with tf.Session() as sess:
    image_arr = []
    label_arr = []
    for i in range(3):
        image, label = sess.run([data_image, data_label])
        image_arr.append(image)
        label_arr.append(label)

# Predict
pred_fn = tf.contrib.predictor.from_saved_model('./distributed_model')
pred = pred_fn({'inputs': image_arr})

print('Predicted: ', pred['classes'].tolist())
print('Actual   : ', label_arr)

INFO:tensorflow:Restoring parameters from ./distributed_model/variables/variables
Predicted:  [7, 2, 1]
Actual   :  [7, 2, 1]


### Step 6 : Remove AML compute

**You don't need to remove your AML compute** for saving money, because the nodes will be automatically terminated, when it's inactive.    
But if you want to clean up, please run the following.

In [16]:
# Delete cluster (nbodes) and remove from AML workspace
mycompute = AmlCompute(workspace=ws, name='mycluster01')
mycompute.delete()