## Convert training script to use the Horovod API

You'll need to make the following modifications to your training script to use horovod for distributed training.

1. Run hvd.init()
2. Scale the learning rate by the number of workers.
3. Wrap the optimizer in hvd.DistributedOptimizer.
4. Add hvd.callbacks.BroadcastGlobalVariablesCallback(0) to broadcast initial variable states from rank 0 to all other processes.
5. Modify your code to save checkpoints only on worker 0 to prevent other workers from corrupting them.

**Confirm that that the script still runs after introducing the horovod API**

#### Import horovod and keras backend

In [1]:
import horovod.tensorflow.keras as hvd
import tensorflow.keras.backend as K






In [2]:
from datetime import datetime
import argparse
import os
import numpy as np
import codecs
import json
import tensorflow as tf

from tensorflow import keras
from tensorflow.keras.layers import Input, Dense, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint
from model_def import get_model
    
HEIGHT = 32
WIDTH  = 32
DEPTH  = 3
NUM_CLASSES = 10
NUM_TRAIN_IMAGES = 40000
NUM_VALID_IMAGES = 10000
NUM_TEST_IMAGES  = 10000

In [3]:
def train_preprocess_fn(image):

    # Resize the image to add four extra pixels on each side.
    image = tf.image.resize_image_with_crop_or_pad(image, HEIGHT + 8, WIDTH + 8)

    # Randomly crop a [HEIGHT, WIDTH] section of the image.
    image = tf.random_crop(image, [HEIGHT, WIDTH, DEPTH])

    # Randomly flip the image horizontally.
    image = tf.image.random_flip_left_right(image)

    return image

In [4]:
def make_batch(filenames, batch_size):
    """Read the images and labels from 'filenames'."""
    # Repeat infinitely.
    dataset = tf.data.TFRecordDataset(filenames).repeat()

    # Parse records.
    dataset = dataset.map(single_example_parser, num_parallel_calls=1)

    # Batch it up.
    dataset = dataset.batch(batch_size, drop_remainder=True)
    iterator = dataset.make_one_shot_iterator()

    image_batch, label_batch = iterator.get_next()
    return image_batch, label_batch

In [5]:
def single_example_parser(serialized_example):
    """Parses a single tf.Example into image and label tensors."""
    # Dimensions of the images in the CIFAR-10 dataset.
    # See http://www.cs.toronto.edu/~kriz/cifar.html for a description of the
    # input format.
    features = tf.parse_single_example(
        serialized_example,
        features={
            'image': tf.FixedLenFeature([], tf.string),
            'label': tf.FixedLenFeature([], tf.int64),
        })
    image = tf.decode_raw(features['image'], tf.uint8)
    image.set_shape([DEPTH * HEIGHT * WIDTH])

    # Reshape from [depth * height * width] to [depth, height, width].
    image = tf.cast(
        tf.transpose(tf.reshape(image, [DEPTH, HEIGHT, WIDTH]), [1, 2, 0]),
        tf.float32)
    label = tf.cast(features['label'], tf.int32)
    
    image = train_preprocess_fn(image)
    label = tf.one_hot(label, NUM_CLASSES)
    
    return image, label

In [6]:
# Hyper-parameters
epochs = 1
lr = 0.01
batch_size = 128
momentum = 0.9
weight_decay = 2e-4
optimizer = 'sgd'

# Data directories and other options
checkpoint_dir = '../ckpt_dir'
if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)

train_dir = '../dataset/train'
validation_dir = '../dataset/validation'
eval_dir = '../dataset/eval'

train_dataset = make_batch(train_dir+'/train.tfrecords',  batch_size)
val_dataset = make_batch(validation_dir+'/validation.tfrecords', batch_size)
eval_dataset = make_batch(eval_dir+'/eval.tfrecords', batch_size)





Instructions for updating:
Use `for ... in dataset:` to iterate over a dataset. If using `tf.estimator`, return the `Dataset` object directly from your input function. As a last resort, you can use `tf.compat.v1.data.make_one_shot_iterator(dataset)`.


#### Initialize horovod and get the size of the cluster

In [7]:
hvd.init()
size = hvd.size()

config = tf.ConfigProto()
K.set_session(tf.Session(config=config))

In [8]:
model = get_model(lr, weight_decay, optimizer, momentum)

Instructions for updating:
If using Keras pass *_constraint arguments to layers.


#### How will you update the learning rate for distributed training? 
You need to scale the learning using the size of the cluster (total number of workers)

In [9]:
opt = SGD(lr=lr * size, decay=weight_decay, momentum=momentum)

#### How will you convert the optimizer to distributed optimizer?

In [10]:
opt = hvd.DistributedOptimizer(opt)
model.compile(loss='categorical_crossentropy',
              optimizer=opt,
              metrics=['accuracy'])

#### Add callbacks for syncing initial state, and saving checkpoints only on 1st worker

In [11]:
callbacks = []
callbacks.append(hvd.callbacks.BroadcastGlobalVariablesCallback(0))
callbacks.append(hvd.callbacks.MetricAverageCallback())
callbacks.append(hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=5, verbose=1))
callbacks.append(tf.keras.callbacks.ReduceLROnPlateau(patience=10, verbose=1))
if hvd.rank() == 0:
    callbacks.append(ModelCheckpoint('../ckpt_dir' + '/checkpoint-{epoch}.h5'))

#### Update the number of steps/epoch

In [12]:
history = model.fit(x=train_dataset[0], y=train_dataset[1],
                    steps_per_epoch= (NUM_TRAIN_IMAGES // batch_size)// size,
                    validation_data=val_dataset,
                    validation_steps= (NUM_VALID_IMAGES // batch_size)// size,
                    epochs=epochs, callbacks=callbacks)

Train on 312 samples, validate on 128 samples


In [13]:
# Evaluate model performance
score = model.evaluate(eval_dataset[0],
                       eval_dataset[1],
                       steps=NUM_TEST_IMAGES // batch_size,
                       verbose=0)
print('Test loss    :', score[0])
print('Test accuracy:', score[1])

Test loss    : 1.9416303176146288
Test accuracy: 0.2877604
