In [0]:
# ways of training single model on multiple GPUs, computers or servers - to decrease training time.
# 2 main categories of distributed strategies :
# 1. Synchronous - generally train models on different parts of the dataset at the same time and at the end of one epoch, those gradients are aggregated and used to update one 
# model
# 2. Asynchronous - all workers train at the same time independently and update weights asynchronously.

COMMON DISTRIBUTED STRATEGIES IN TF 2.0

In [0]:
# 1. MIRRORED STRATEGY - we start by defining our model on a single computer (here, will define on google colab). This strategy is used when we have multiple devices on a single
# computer(multiple GPUs or CPUs) and it leverages all of them by creating a copy of the same model on each device. Each would train independently and at the end of one epoch, 
# it will update the main copy of the model. The aggregation of weights in most cases is done on the CPU or on the specific dedicated GPU for that. But sometimes, we're going to
# have multiple servers or computers where each could have multiple devices on its own. How to scale the strategy to that level? (next strategy for that)

# 2. MULTI WORKERS MIRRORED STRATEGY - In this case, we still have a primary model. But, it is not replicated for each device, but for each computer in the network, where each
# machine is called a worker. Each worker could have multiple devices, in which case it works the same way that mirrored strategy worked. At the end of 1 epoch, we consider 
# all versions of the model to update the primary one. 

# 3. TPU STRATEGY - primarily used for GCP(google cloud platform) because they have TPUs on their servers.

# 4. PARAMETER SERVER STRATEGY

SETUP

In [0]:
# 1. select GPU as hardware accelerator
# 2. install tf 2.0

In [0]:
!pip install tensorflow-gpu==2.0.0.alpha0 # gpu version of tf

IMPORT PROJECT DEPENDENCIES

In [0]:
import time
import numpy as np
import tensorflow as tf

In [4]:
tf.__version__

'2.0.0-alpha0'

DATASET PREPROCESSING

In [5]:
# load mnist dataset
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data()

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [0]:
# image normalization - to accelerate training process (pixel ranges from 0-255 take longer to converge as opposed to from 0-1)
X_train = X_train/255.0
y_train = y_train/255.0

In [7]:
X_train.shape

(60000, 28, 28)

In [0]:
# dataset reshaping - we're using fully connected networks. Therefore, we need to reshape these images to be in vector size and not in matrix form.
X_train = X_train.reshape(-1, 28*28)
X_test = X_test.reshape(-1, 28*28)

# DEFINING A NON DISTIBUTED MODEL (NORMAL MODEL)

In [0]:
model_normal = tf.keras.Sequential()
model_normal.add(tf.keras.layers.Dense(units=128, activation='relu', input_shape=(784,)))
model_normal.add(tf.keras.layers.Dropout(rate=0.2))
model_normal.add(tf.keras.layers.Dense(units=10, activation='softmax'))

In [0]:
model_normal.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['sparse_categorical_accuracy'])

# DISTRIBUTED MODEL

Set up a distibuted strategy - Mirrored strategy used here

In [11]:
distribute = tf.distribute.MirroredStrategy()

INFO:tensorflow:Device is available but not used by distribute strategy: /device:CPU:0
INFO:tensorflow:Device is available but not used by distribute strategy: /device:XLA_GPU:0
INFO:tensorflow:Device is available but not used by distribute strategy: /device:XLA_CPU:0


Define a distributed model

In [12]:
with distribute.scope():
  model_dis = tf.keras.Sequential()
  model_dis.add(tf.keras.layers.Dense(units=128, activation='relu', input_shape=(784,)))
  model_dis.add(tf.keras.layers.Dropout(rate=0.2))
  model_dis.add(tf.keras.layers.Dense(units=10, activation='softmax'))
  model_dis.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['sparse_categorical_accuracy'])

INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Redu

# SPEED TEST - NORMAL VS DISTRIBUTED MODEL

In [13]:
start_time = time.time()
model_dis.fit(X_train, y_train, epochs=10, batch_size=25)
print("Time taken by distributed training : {}".format(time.time() - start_time))

INFO:tensorflow:batch_all_reduce invoked for batches size = 4 with algorithm = nccl, num_packs = 1, agg_small_grads_max_bytes = 0 and agg_small_grads_max_group = 10
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Time taken by distributed training : 129.4754638671875


In [14]:
start_time = time.time()
model_normal.fit(X_train, y_train, epochs=10, batch_size=25)
print("Time taken by normal training : {}".format(time.time() - start_time))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Time taken by normal training : 132.22743201255798


In [0]:
# not much difference here in time taken.
# if we have just 1 gpu and 1 cpu in the device pool, the cpu will slow down the gpu and distributed training will perform worse than normal training on gpu. But if we have 2
# or kore gpus, distributed will be better.