## Training CNN on Multiple Machines

In [1]:
# Importing Librairies
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, InputLayer, Reshape, Rescaling
from tensorflow.keras.layers import Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.utils.vis_utils import plot_model
#from keras.regularizers import l2
import matplotlib.pyplot as plt
import numpy as np
import json
import os

# Force using CPU by making GPU invisible
# os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

# TF_CONFIG envrionment variable declaration
os.environ["TF_CONFIG"] = json.dumps({
    'cluster': {
        "chief": ["192.168.10.1:8888"],
        'worker': ["192.168.10.2:8888", "192.168.10.3:8888", "192.168.10.4:8888"]
    },
    'task': {'type': 'chief', 'index': 0}
    #'task': {'type': 'worker', 'index': 0}
})


# NCCL option
options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)

# Declaring Strategy before inserting Data and creating Model
strategy = tf.distribute.MultiWorkerMirroredStrategy(communication_options=options)

INFO:tensorflow:Enabled multi-worker collective ops with available devices: ['/job:chief/replica:0/task:0/device:CPU:0', '/job:chief/replica:0/task:0/device:GPU:0', '/job:worker/replica:0/task:1/device:CPU:0', '/job:worker/replica:0/task:1/device:GPU:0', '/job:worker/replica:0/task:0/device:CPU:0', '/job:worker/replica:0/task:0/device:GPU:0', '/job:worker/replica:0/task:2/device:CPU:0', '/job:worker/replica:0/task:2/device:GPU:0']
INFO:tensorflow:Check health not enabled.
INFO:tensorflow:MultiWorkerMirroredStrategy with cluster_spec = {'chief': ['192.168.10.1:8888'], 'worker': ['192.168.10.2:8888', '192.168.10.3:8888', '192.168.10.4:8888']}, task_type = 'chief', task_id = 0, num_workers = 4, local_devices = ('/job:chief/task:0/device:GPU:0',), communication = CommunicationImplementation.NCCL


In [2]:
# Importing & Preprocessing CIFAR-10 Dataset from Directory
img_height = 32
img_width = 32
data_path = './CIFAR-10/'

def unpickle(file):
    import pickle
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict


def load_dataset():
    # Loading Training Dataset
    x_train = np.empty((50000, 32, 32, 3), dtype='uint8')
    y_train = np.empty((50000, ), dtype='uint8')
    i = 1    
    while i <= 5:
        filename = 'data_batch_' + str(i)
        fullpath = os.path.join(data_path, filename)
        data_batch = unpickle(fullpath)
        
        x = data_batch[b'data']
        x_train[(i - 1) * 10000:i * 10000, :, :, :] = x.reshape(len(x), 3, img_height, img_width).transpose(0, 2, 3, 1)
        y_train[(i - 1) * 10000:i * 10000] = data_batch[b'labels']
        i = i + 1
    
    # Loading Test/Validation Dataset
    fullpath = os.path.join(data_path, 'test_batch')
    test_batch = unpickle(fullpath)
    x_test = test_batch[b'data']
    x_test = x_test.reshape(len(x_test), 3, img_height, img_width).transpose(0, 2, 3, 1)
    y_test = test_batch[b'labels']
    y_test = np.asarray(y_test)
    
    # Normalizing Images
    x_train, x_test = normalize_images(x_train, x_test)
    # Display Dataset Size
    print('Training Images : {}'.format(len(x_train)))
    print('Test Images : {}'.format(len(x_test)))
    return (x_train, y_train), (x_test, y_test)


def normalize_images(train, test):
    # convert 0-255 integers to floats
    train_norm = train.astype('float32')
    test_norm = test.astype('float32')
    # scaling values from 0-255 to 0-1
    train_norm = train_norm / 255.0
    test_norm = test_norm / 255.0
    # return normalized images
    return train_norm, test_norm


# Function : Create and Compile Model
def create_and_compile_model():
    model = Sequential()
    model.add(Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=(32, 32, 3)))
    model.add(BatchNormalization())
    model.add(Conv2D(32, (3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D((2, 2)))
    model.add(Dropout(0.2))
    model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D((2, 2)))
    model.add(Dropout(0.3))
    model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
    model.add(BatchNormalization())
    model.add(MaxPooling2D((2, 2)))
    model.add(Dropout(0.4))
    model.add(Flatten())
    model.add(Dense(256, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dropout(0.5))
    model.add(Dense(10, activation='softmax'))

    optimizer = tf.keras.optimizers.Adam()
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)
    accuracy = tf.keras.metrics.SparseCategoricalAccuracy()

    # Compiling the model
    model.compile(optimizer=optimizer, loss=loss_fn, metrics=[accuracy])
    return model

In [None]:
per_replica_batch_size = 1024
global_batch_size = per_replica_batch_size * strategy.num_replicas_in_sync

# load CIFAR-10 Dataset
(x_train, y_train), (x_test, y_test) = load_dataset()

# Data Augmentation Generator
datagen = ImageDataGenerator(
            width_shift_range=0.2,
            height_shift_range=0.2,
            horizontal_flip=True)
datagen.fit(x_train)

# Data iterator generating training batches
iter_train = datagen.flow(x_train, y_train, batch_size=global_batch_size)
# Calculating Steps par Epoch
steps = int(len(x_train) / global_batch_size)

with strategy.scope():
    # Creating and Compiling Model inside the Strategy Scope
    model = create_and_compile_model()    
    
# Training the model without callbacks
history = model.fit(datagen.flow(x_train, y_train, batch_size=global_batch_size), 
                    validation_data=(x_test, y_test), steps_per_epoch=steps, epochs=100)

In [None]:
# Evaluating the model on test dataset
score = model.evaluate(x_test, y_test)
print("test loss : {:.4f}".format(score[0]))
print("test accuracy : {:.4f}".format(score[1]))