<a href="https://colab.research.google.com/github/Kerriea-star/Distributed-training/blob/main/Distributed_training_with_keras.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Overview
The `tf.distribute.Strategy` API provides an abstraction for distributing your training across multiple processing units. It allows you to carry out distributed training using existing models and training code with minimal changes.

This project demonstrates how to use the `tf.distribute.MirroredStrategy` to perform in-graph replication with synchronous training on many GPUs on one machine. The strategy essentially copies all of the model's variables to each processor. Then, it uses **all-reduce** to combine the gradients from all processors, and applies the combined value to all copies of the model.

You will use the `tf.keras` APIs to build the model and `Model.fit` for training it.

`MirroredStrategy` trains your model on multiple GPUs on a single machine. For synchronous training on many GPUs on multiple workers, use the `tf.distribute.MultiWorkerMirroredStrategy` with the Keras Model.fit or a custom training loop.

## Setup

In [10]:
import tensorflow_datasets as tfds
import tensorflow as tf

import os

# Load the TensorBoard notebook extension
%load_ext tensorboard

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


In [11]:
print(tf.__version__)

2.13.0


## Download the dataset
Load the MNIST dataset from **TensorFlow Datasets**. This returns a dataset in the `tf.data` format.

Setting the `with_info` argument to `True` includes the metadata for the entire dataset, which is being saved here to `info`. Among other things, this metadata object includes the number of train and test examples.

In [12]:
datasets, info  = tfds.load(name='mnist', with_info=True, as_supervised=True)

mnist_train, mnist_test = datasets["train"], datasets["test"]

## Define the distribution strategy
Create a `MirroredStrategy` object. This will handle distribution and provide a context manager (`MirroredStrategy.scope`) to build your model inside.

In [13]:
strategy = tf.distribute.MirroredStrategy()

In [14]:
print("Number of devices: {}".format(strategy.num_replicas_in_sync))

Number of devices: 1


###Set up the input pipeline
When training a model with multiple GPUs, you can use the extra computing power effectively by increasing the batch size. In general, use the largest batch size that fits the GPU memory and tune the learning rate accordingly.

In [15]:
# You can also do info.splits.total_num_examples to get the total
# number of examples in the dataset.

num_train_examples = info.splits['train'].num_examples
num_test_examples = info.splits['test'].num_examples

BUFFER_SIZE = 10000

BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

Define a function that normalizes the image pixel values from the `[0, 255] `range to the `[0, 1]` range (**feature scaling**):

In [16]:
def scale(image, label):
  image = tf.cast(image, tf.float32)
  image /= 255

  return image, label

Apply this scale function to the training and test data, and then use the `tf.data.Dataset` APIs to shuffle the training data (`Dataset.shuffle`), and batch it (`Dataset.batch`). Notice that you are also keeping an in-memory cache of the training data to improve performance (`Dataset.cache`).

In [17]:
train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

##Create the model and instantiate the optimizer
Within the context of `Strategy.scope`, create and compile the model using the Keras API:

In [18]:
with strategy.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])

  model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
                metrics=["accuracy"])

For this toy example with the MNIST dataset, you will be using the Adam optimizer's default learning rate of 0.001.

For larger datasets, the key benefit of distributed training is to learn more in each training step, because each step processes more training data in parallel, which allows for a larger learning rate (within the limits of the model and dataset).

##Define the callbacks
Define the following Keras Callbacks:

*   `tf.keras.callbacks.TensorBoard`: writes a log for TensorBoard, which allows you to visualize the graphs.
*   `tf.keras.callbacks.ModelCheckpoint`: saves the model at a certain frequency, such as after every epoch.
*   `tf.keras.callbacks.BackupAndRestore`: provides the fault tolerance functionality by backing up the model and current epoch number.
*   `tf.keras.callbacks.LearningRateScheduler`: schedules the learning rate to change after, for example, every epoch/batch.


For illustrative purposes, add a custom callback called `PrintLR` to display the learning rate in the notebook.

**Note**: *Use the `BackupAndRestore` callback instead of `ModelCheckpoint` as the main mechanism to restore the training state upon a restart from a job failure. Since `BackupAndRestore` only supports `eager mode`, in `graph mode` consider using `ModelCheckpoint`.*


In [19]:
# Define the checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
# Define the name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

In [20]:
# Define a function for decaying the learning rate.
# You cna define any decay function you need.
def decay(epoch):
  if epoch < 3:
    return 1e-3
  elif epoch >=3 and epoch < 7:
    return 1e-4
  else:
    return 1e-5

In [21]:
# Define a callback for printing the learning rate at the end of each epoch
class PrintLR(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    print('\nlearning rate for epoch {} is {}'.format(      epoch + 1, model.optimizer.lr.numpy()))

In [22]:
# Put all the callbacks together
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir='./logs'),
    tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                       save_weights_only=True),
    tf.keras.callbacks.LearningRateScheduler(decay),
    PrintLR()
]

##Train and evaluate
Now, train the model in the usual way by calling Keras` Model.fit` on the model and passing in the dataset created at the beginning of the project. This step is the same whether you are distributing the training or not.

In [23]:
EPOCHS = 12
model.fit(train_dataset, epochs=EPOCHS, callbacks=callbacks)

Epoch 1/12
learning rate for epoch 1 is 0.0010000000474974513
Epoch 2/12
learning rate for epoch 2 is 0.0010000000474974513
Epoch 3/12
learning rate for epoch 3 is 0.0010000000474974513
Epoch 4/12
learning rate for epoch 4 is 9.999999747378752e-05
Epoch 5/12
learning rate for epoch 5 is 9.999999747378752e-05
Epoch 6/12
learning rate for epoch 6 is 9.999999747378752e-05
Epoch 7/12
learning rate for epoch 7 is 9.999999747378752e-05
Epoch 8/12
learning rate for epoch 8 is 9.999999747378752e-06
Epoch 9/12
learning rate for epoch 9 is 9.999999747378752e-06
Epoch 10/12
learning rate for epoch 10 is 9.999999747378752e-06
Epoch 11/12
learning rate for epoch 11 is 9.999999747378752e-06
Epoch 12/12
learning rate for epoch 12 is 9.999999747378752e-06


<keras.src.callbacks.History at 0x7ff60cb7f2e0>