# Distributed training with Keras

The `tf.distribute.Strategy` API provides a user-friendly abstraction to exploit multiple computional resource during training.
This notebook considers `MirroredStrategy` this should be used to synchronous training on many GPUs on one machine.

In this notebook, `MirroredStrategy` is combined with training using `tf.keras` API, where the model training occurs in `model.fit` method.

If you have multiple computers with many GPUs in each, then `MultiWorkerMirroredStrategy` should be used.

## Setup

In [19]:
import tensorflow_datasets as tfds
import tensorflow as tf

import os

# Load the Tensorboard notebook extension.
%load_ext tensorboard

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


In [20]:
print(tf.__version__)

2.8.2


## Download the dataset

In [21]:
datasets, info = tfds.load(name="mnist", with_info=True, as_supervised=True)
mnist_train, mnist_test = datasets["train"], datasets["test"]

## Define the distribution strategy

In [22]:
strategy = tf.distribute.MirroredStrategy()





INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


In [23]:
print("Number of devices: {}".format(strategy.num_replicas_in_sync))

Number of devices: 1


## Set up the input pipeline

In [24]:
num_train_examples = info.splits["train"].num_examples
num_test_examples = info.splits["test"].num_examples

BUFFER_SIZE = 10000

BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

In [25]:
def scale(image, label):
    image_fl = tf.cast(image, tf.float32)
    image_fl /= 255.0

    return image_fl, label

In [27]:
train_ds = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
test_ds = mnist_test.map(scale).batch(BATCH_SIZE)

## Create the model

In [28]:
with strategy.scope():
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3, activation="relu", input_shape=(28, 28, 1)),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64, activation="relu"),
        tf.keras.layers.Dense(10),
    ])

    model.compile(
        optimizer=tf.keras.optimizers.Adam(),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=["accuracy"]
    )

## Define the callbacks

In [29]:
# Define the checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
# Define the name of the checkpoint files.
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

In [30]:
# Define a function for decaying the learning rate.
# You can define any decay function you need.
def decay(epoch):
  if epoch < 3:
    return 1e-3
  elif epoch >= 3 and epoch < 7:
    return 1e-4
  else:
    return 1e-5

In [31]:
# Define a callback for printing the learning rate at the end of each epoch.
class PrintLR(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    print('\nLearning rate for epoch {} is {}'.format(        epoch + 1, model.optimizer.lr.numpy()))

In [32]:
# Put all the callbacks together.
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='./logs'),
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                       save_weights_only=True),
    tf.keras.callbacks.LearningRateScheduler(decay),
    PrintLR()
]

## Train and test

Note that as we use `tf.data.Dataset`, we need to pass inputs and outputs explicitly to `model.fit`. Also, `model.fit` is invoked exactly in the same way
as it is invoked without using a distribution strategy.

In [33]:
EPOCHS = 12

model.fit(train_ds, epochs=EPOCHS, callbacks=callbacks)

Epoch 1/12
156/938 [===>..........................] - ETA: 21s - loss: 0.5372 - accuracy: 0.8476

KeyboardInterrupt: ignored