In [1]:
import os
import sys
import json
import time

# Log additional outputs from TF's C++ backend
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '0'

In [2]:
# Disable GPUs
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

# Add current directory to path
if '.' not in sys.path:
  sys.path.insert(0, '.')

In [3]:
import tensorflow as tf

# Ignore warnings
tf.get_logger().setLevel('ERROR')

In [4]:
%%writefile mnist.py

# import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  # Load the data
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # Normalize pixel values for x_train and cast to float32
  x_train = x_train / np.float32(255)
  # Cast y_train to int64
  y_train = y_train.astype(np.int64)
  # Define repeated and shuffled dataset
  train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset


def build_and_compile_cnn_model():
  # Define simple CNN model using Keras Sequential
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])

  # Compile model
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  
  return model

Writing mnist.py


In [8]:
!dir *.py

 Volume in drive C has no label.
 Volume Serial Number is 32C8-2425

 Directory of C:\git_repo\ML_experiments\ml_experiments\mlops_coursera

10/13/2024  02:44 PM                 0 distributed_training.py
10/13/2024  03:33 PM             1,174 mnist.py
               2 File(s)          1,174 bytes
               0 Dir(s)  386,796,793,856 bytes free


In [9]:
# Import your mnist model
import mnist

# Set batch size
batch_size = 64

# Load the dataset
single_worker_dataset = mnist.mnist_dataset(batch_size)

# Load compiled CNN model
single_worker_model = mnist.build_and_compile_cnn_model()

# As training progresses, the loss should drop and the accuracy should increase.
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x2495c9eec50>

MULTI WORKER CONFIGURATION

Now let's enter the world of multi-worker training. In TensorFlow, the TF_CONFIG environment variable is required for training on multiple machines, each of which possibly has a different role. TF_CONFIG is a JSON string used to specify the cluster configuration on each worker that is part of the cluster.

There are two components of TF_CONFIG: cluster and task.

Let's dive into how they are used:

cluster:

It is the same for all workers and provides information about the training cluster, which is a dict consisting of different types of jobs such as worker.

In multi-worker training with MultiWorkerMirroredStrategy, there is usually one worker that takes on a little more responsibility like saving checkpoint and writing summary file for TensorBoard in addition to what a regular worker does.

Such a worker is referred to as the chief worker, and it is customary that the worker with index 0 is appointed as the chief worker (in fact this is how tf.distribute.Strategy is implemented).

task:

Provides information of the current task and is different on each worker. It specifies the type and index of that worker.
Here is an example configuration:

In [12]:
tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

In [13]:
json.dumps(tf_config)

'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0}}'

In [15]:
# SYNCHRONOUS TRAINING WITH multiworker mirrored strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()

MultiWorkerMirroredStrategy creates copies of all variables in the model's layers on each device across all workers. It uses CollectiveOps, a TensorFlow op for collective communication, to aggregate gradients and keep the variables in sync. 

:### Implement Distributed Training via Context Managers

To distribute the training to multiple-workers all you need to do is to enclose the model building and `model.compile()` call inside `strategy.scope()`. 

The distribution strategy's scope dictates how and where the variables are created, and in the case of `MultiWorkerMirroredStrategy`, the variables created are `MirroredVariable`s, and they are replicated on each of the workers.


In [16]:
# Implementing distributed strategy via a context manager
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()

Since TF_CONFIG is not set yet the above strategy is effectively single-worker training

TRAIN THE MODEL: CREATE A TRAINIG SCRIPT
To actually run with MultiWorkerMirroredStrategy you'll need to run worker processes and pass a TF_CONFIG to them.

Like the mnist.py file written earlier, here is the main.py that each of the workers will run:

In [17]:
%%writefile main.py

import os
import json

import tensorflow as tf
import mnist # Your module

# Define batch size
per_worker_batch_size = 64

# Get TF_CONFIG from the env variables and save it as JSON
tf_config = json.loads(os.environ['TF_CONFIG'])

# Infer number of workers from tf_config
num_workers = len(tf_config['cluster']['worker'])

# Define strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()

# Define global batch size
global_batch_size = per_worker_batch_size * num_workers

# Load dataset
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

# Create and compile model following the distributed strategy
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()

# Train the model
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)

Writing main.py


In the code snippet above note that the `global_batch_size`, which gets passed to `Dataset.batch`, is set to `per_worker_batch_size * num_workers`. This ensures that each worker processes batches of `per_worker_batch_size` examples regardless of the number of workers.

set TF_CONFIG environment variable


In [18]:

# Set TF_CONFIG env variable
os.environ['TF_CONFIG'] = json.dumps(tf_config)

In [19]:
# first kill any previous runs
%killbgscripts

All background processes were killed.


In [22]:
# This should not print anything at the moment
!tasklist


Image Name                     PID Session Name        Session#    Mem Usage
System Idle Process              0 Services                   0          8 K
System                           4 Services                   0        156 K
Secure System                  108 Services                   0     46,948 K
Registry                       148 Services                   0     41,972 K
smss.exe                       600 Services                   0      1,112 K
csrss.exe                      932 Services                   0      5,300 K
wininit.exe                   1020 Services                   0      5,900 K
csrss.exe                      480 Console                    1      6,644 K
services.exe                  1028 Services                   0      9,972 K
LsaIso.exe                    1048 Services                   0      3,176 K
lsass.exe                     1064 Services                   0     28,332 K
svchost.exe                   1188 Services                   0     34,276 

LAUNCH A WORKER

In [27]:
!python main.py > job_0.log 2>&1

In [26]:
pwd

'C:\\git_repo\\ML_experiments\\ml_experiments\\mlops_coursera'