#Assignment Module 4.  Distributed Training with TensorFlow - Multi-GPU and Multi-Node Simulation

###Problem Statement:
You are tasked with implementing and optimizing a neural network for image classification using TensorFlow's distributed strategies. First, you will use the MirroredStrategy for distributed training across multiple GPUs (Colab simulates multi-GPU setups). Then, you'll extend the setup to a multi-node distributed system using MultiWorkerMirroredStrategy to simulate multi-node training.

You will implement and optimize the training process and compare the performance between the multi-GPU and multi-node setups.

###Part 1: Multi-GPU Training using MirroredStrategy
1. Define a Distributed Strategy: Use tf.distribute.MirroredStrategy() to simulate multi-GPU training.

2. Dataset: Use the MNIST dataset, ensuring it is preprocessed and normalized.

3. Model: Build a simple CNN using TensorFlow’s Sequential API.

4. Training: Train the model using the distributed strategy and compare the performance with non-distributed training.

5. Evaluation: Evaluate the model on the test set and ensure that the training converges correctly with multiple GPUs.



###Part 2: Multi-Node Training using MultiWorkerMirroredStrategy
1. Simulate a Multi-Node Setup: Set up MultiWorkerMirroredStrategy with appropriate environment variables (TF_CONFIG) for node communication.

2. Training: Train the same model across simulated nodes and compare the performance.

3. Evaluation: Evaluate the model after training in the multi-node setup.

# Part 1

In [2]:
import tensorflow as tf
import time

In [4]:
# Use MirroredStrategy for multi-GPU training
strategy = tf.distribute.MirroredStrategy()
print("Number of devices in strategy:", strategy.num_replicas_in_sync)

# Load the MNIST dataset
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

# Reshape and normalize the images
x_train = x_train.reshape(-1, 28, 28, 1).astype('float32') / 255.0
x_test = x_test.reshape(-1, 28, 28, 1).astype('float32') / 255.0
batch_sizes = [64, 128, 256]


def create_model():
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10, activation='softmax')
    ])
    return model


# Batch Sizes to Experiment
# Train and Evaluate the Model without using MirroredStrategy


print("\nTraining on Single GPU:")
for batch_size in batch_sizes:
    print(f"\nBatch Size: {batch_size}")
    #  a new model instance
    model_single_gpu = create_model()
    model_single_gpu.compile(optimizer='adam',
                             loss='sparse_categorical_crossentropy',
                             metrics=['accuracy'])
    # training time
    start_time = time.time()
    model_single_gpu.fit(x_train, y_train, epochs=5, batch_size=batch_size, verbose=2)
    end_time = time.time()
    training_time = end_time - start_time
    print(f"Training Time: {training_time:.2f} seconds")
    # eval the model
    test_loss, test_acc = model_single_gpu.evaluate(x_test, y_test, verbose=2)
    print(f"Test Accuracy: {test_acc:.4f}")



INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)
Number of devices in strategy: 1
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
[1m11490434/11490434[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 0us/step

Training on Single GPU:

Batch Size: 64
Epoch 1/5


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


938/938 - 3s - 4ms/step - accuracy: 0.9525 - loss: 0.1607
Epoch 2/5
938/938 - 3s - 3ms/step - accuracy: 0.9851 - loss: 0.0476
Epoch 3/5
938/938 - 3s - 3ms/step - accuracy: 0.9896 - loss: 0.0331
Epoch 4/5
938/938 - 3s - 4ms/step - accuracy: 0.9925 - loss: 0.0243
Epoch 5/5
938/938 - 3s - 3ms/step - accuracy: 0.9936 - loss: 0.0196
Training Time: 16.03 seconds
313/313 - 0s - 1ms/step - accuracy: 0.9909 - loss: 0.0281
Test Accuracy: 0.9909

Batch Size: 128
Epoch 1/5
469/469 - 3s - 7ms/step - accuracy: 0.9389 - loss: 0.2059
Epoch 2/5
469/469 - 3s - 6ms/step - accuracy: 0.9823 - loss: 0.0580
Epoch 3/5
469/469 - 3s - 5ms/step - accuracy: 0.9878 - loss: 0.0397
Epoch 4/5
469/469 - 2s - 5ms/step - accuracy: 0.9906 - loss: 0.0308
Epoch 5/5
469/469 - 2s - 5ms/step - accuracy: 0.9923 - loss: 0.0243
Training Time: 13.37 seconds
313/313 - 0s - 1ms/step - accuracy: 0.9899 - loss: 0.0292
Test Accuracy: 0.9899

Batch Size: 256
Epoch 1/5
235/235 - 3s - 14ms/step - accuracy: 0.9203 - loss: 0.2842
Epoch 2/5

In [5]:
# Train and Evaluate the Model Using MirroredStrategy

print("\nTraining with MirroredStrategy:")
for batch_size in batch_sizes:
    print(f"\nBatch Size: {batch_size}")
    with strategy.scope():
        # A new model instance
        model_multi_gpu = create_model()
        model_multi_gpu.compile(optimizer='adam',
                                loss='sparse_categorical_crossentropy',
                                metrics=['accuracy'])
    #  training time
    start_time = time.time()
    model_multi_gpu.fit(x_train, y_train, epochs=5, batch_size=batch_size, verbose=2)
    end_time = time.time()
    training_time = end_time - start_time
    print(f"Training Time: {training_time:.2f} seconds")
    # eval the model
    test_loss, test_acc = model_multi_gpu.evaluate(x_test, y_test, verbose=2)
    print(f"Test Accuracy: {test_acc:.4f}")



Training with MirroredStrategy:

Batch Size: 64

Epoch 1/5
938/938 - 4s - 4ms/step - accuracy: 0.9502 - loss: 0.1636
Epoch 2/5
938/938 - 3s - 4ms/step - accuracy: 0.9852 - loss: 0.0471
Epoch 3/5
938/938 - 3s - 4ms/step - accuracy: 0.9895 - loss: 0.0331
Epoch 4/5
938/938 - 3s - 3ms/step - accuracy: 0.9927 - loss: 0.0239
Epoch 5/5
938/938 - 3s - 3ms/step - accuracy: 0.9940 - loss: 0.0196
Training Time: 16.90 seconds
313/313 - 0s - 1ms/step - accuracy: 0.9915 - loss: 0.0278
Test Accuracy: 0.9915

Batch Size: 128
Epoch 1/5
469/469 - 3s - 7ms/step - accuracy: 0.9437 - loss: 0.2011
Epoch 2/5
469/469 - 3s - 6ms/step - accuracy: 0.9827 - loss: 0.0553
Epoch 3/5
469/469 - 3s - 6ms/step - accuracy: 0.9883 - loss: 0.0386
Epoch 4/5
469/469 - 3s - 6ms/step - accuracy: 0.9909 - loss: 0.0294
Epoch 5/5
469/469 - 3s - 6ms/step - accuracy: 0.9932 - loss: 0.0224
Training Time: 14.87 seconds
313/313 - 0s - 1ms/step - accuracy: 0.9900 - loss: 0.0283
Test Accuracy: 0.9900

Batch Size: 256
Epoch 1/5
235/235 



# Part 2

In [6]:
# importing libs
import os
import json
import tensorflow as tf
import time

# remove TF_CONFIG if it was set
os.environ.pop('TF_CONFIG', None)

# define the strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
print("Number of devices in strategy:", strategy.num_replicas_in_sync)

INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
Number of devices in strategy: 1


In [8]:
# load and preprocess the MNIST dataset
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

#  reshape and normalize the images
x_train = x_train.reshape(-1, 28, 28, 1).astype('float32') / 255.0
x_test = x_test.reshape(-1, 28, 28, 1).astype('float32') / 255.0

#  the datasets
GLOBAL_BATCH_SIZE = 64
BUFFER_SIZE = 10000
EPOCHS = 5


train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_dataset = test_dataset.batch(GLOBAL_BATCH_SIZE)

# distribute the datasets
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

def create_model():
    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
        tf.keras.layers.MaxPooling2D((2, 2)),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10, activation='softmax')
    ])
    return model



with strategy.scope():

    model = create_model()
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
        reduction=tf.keras.losses.Reduction.NONE)
    optimizer = tf.keras.optimizers.Adam()

    train_loss = tf.keras.metrics.Mean(name='train_loss')
    train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

    test_loss = tf.keras.metrics.Mean(name='test_loss')
    test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')
    
    

def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)


@tf.function
def train_step(inputs):
    images, labels = inputs

    with tf.GradientTape() as tape:
        predictions = model(images, training=True)
        loss = compute_loss(labels, predictions)

    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    train_loss.update_state(loss)
    train_accuracy.update_state(labels, predictions)


@tf.function
def test_step(inputs):
    images, labels = inputs
    predictions = model(images, training=False)
    loss = compute_loss(labels, predictions)

    test_loss.update_state(loss)
    test_accuracy.update_state(labels, predictions)
    
for epoch in range(EPOCHS):
    start_time = time.time()

    train_loss.reset_state()
    train_accuracy.reset_state()
    test_loss.reset_state()
    test_accuracy.reset_state()

    for inputs in train_dist_dataset:
        strategy.run(train_step, args=(inputs,))

    for inputs in test_dist_dataset:
        strategy.run(test_step, args=(inputs,))

    end_time = time.time()

    template = ('Epoch {}, Loss: {:.4f}, Accuracy: {:.4f}, '
                'Test Loss: {:.4f}, Test Accuracy: {:.4f}, Time: {:.2f} sec')
    print(template.format(epoch + 1,
                          train_loss.result(),
                          train_accuracy.result(),
                          test_loss.result(),
                          test_accuracy.result(),
                          end_time - start_time))



Epoch 1, Loss: 0.1706, Accuracy: 0.9488, Test Loss: 0.0478, Test Accuracy: 0.9847, Time: 4.83 sec
Epoch 2, Loss: 0.0484, Accuracy: 0.9850, Test Loss: 0.0333, Test Accuracy: 0.9883, Time: 4.04 sec
Epoch 3, Loss: 0.0343, Accuracy: 0.9894, Test Loss: 0.0373, Test Accuracy: 0.9878, Time: 4.02 sec
Epoch 4, Loss: 0.0252, Accuracy: 0.9923, Test Loss: 0.0301, Test Accuracy: 0.9900, Time: 3.99 sec
Epoch 5, Loss: 0.0198, Accuracy: 0.9936, Test Loss: 0.0322, Test Accuracy: 0.9887, Time: 4.31 sec
