# Multi-worker training with Keras

<table class="tfo-notebook-buttons" align="left">
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/tutorials/distribute/multi_worker_with_keras.ipynb"><img src="https://www.tensorflow.org/images/colab_logo_32px.png" />Run in Google Colab</a>
  </td>
  <td>
    <a href="https://storage.googleapis.com/tensorflow_docs/docs/site/en/tutorials/distribute/multi_worker_with_keras.ipynb"><img src="https://www.tensorflow.org/images/download_logo_32px.png" />Download notebook</a>
  </td>
</table>

## Overview

This section demonstrates multi-worker distributed training with Keras model using tf.distribute.Strategy API, tf.distribute.experimental.MultiWorkerMirroredStrategy. Using this strategy, a Keras model that has been designed to run on single-worker can seamlessly work on multiple workers with a very minimal code change.
We have modified the original sample available at https://www.tensorflow.org/guide/distributed_training

## Setup

First, let us make necessary imports.

In [1]:
import json
import os
import sys

Before importing TensorFlow, let us make a few changes to the environment.
We will have to disable all GPUs. This will prevent errors caused by the workers all trying to use the same GPU. For an actual application each worker would be on a different machine.

In [2]:
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Reset the `TF_CONFIG` environment variable, we will use this later to configure the runtime

In [3]:
os.environ.pop('TF_CONFIG', None)

Make sure that the current directory is on python's path. This will allow the notebook to import the files written by `%%writefile` later.


In [4]:
if '.' not in sys.path:
  sys.path.insert(0, '.')

Import TensorFlow and check the version

In [5]:
import tensorflow as tf
tf.__version__

'2.3.0'

### MNIST Dataset and model definition

Create an `mnist.py` file with a simple CNN model and dataset loaded using tf.keras.datasets.mnist.load_data(). This file will be used by the worker-processes in this example:

In [6]:
%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model

Writing mnist.py


Train the model for a small number of epochs and observe the results of a single worker for verification. With each epoch, the loss drops and the accuracy increases.

In [8]:
import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=10)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x7fc81310f278>

## Multi-worker Configuration with two workers

Let us enter the world of multi-worker training. In TensorFlow, the `TF_CONFIG` environment variable is required for training on multiple machines, each of which possibly has a different role. `TF_CONFIG` is a JSON string used to specify the cluster configuration on each worker that is part of the cluster.
Example configuration:

In [10]:
tf_config = {
    'cluster': {
        'worker': ['localhost:11111', 'localhost:22222']
    },
    'task': {'type': 'worker', 'index': 0}
}

`TF_CONFIG` serialized as a JSON string:

In [12]:
json.dumps(tf_config)

'{"cluster": {"worker": ["localhost:11111", "localhost:22222"]}, "task": {"type": "worker", "index": 0}}'

There are two components of `TF_CONFIG`: `cluster` and `task`.

* `cluster` is the same for all workers and provides information about the training cluster, which is a dict consisting of different types of jobs such as `worker`. In multi-worker training with `MultiWorkerMirroredStrategy`, there is usually one `worker` that takes on a little more responsibility like saving checkpoint and writing summary file for TensorBoard in addition to what a regular `worker` does. Such a worker is referred to as the `chief` worker, and it is customary that the `worker` with `index` 0 is appointed as the chief `worker` (in fact this is how `tf.distribute.Strategy` is implemented).

* `task` provides information of the current task and is different on each worker. It specifies the `type` and `index` of that worker. 



```
# This is formatted as code
```

In this example, we will set the task `type` to `"worker"` and the task `index` to `0`. This machine is the first worker and will be appointed as the chief worker and do more work than the others. 

**Note**: that other machines will need to have the `TF_CONFIG` environment variable set as well, and it should have the same `cluster` dict, but different task `type` or task `index` depending on what the roles of those machines are.


For illustration purposes, this sample shows how one may set a `TF_CONFIG` with 2 workers on `localhost`.  In practice, users would create multiple workers on external IP addresses/ports, and set `TF_CONFIG` on each worker appropriately.

In this example you will use 2 workers, the first worker's `TF_CONFIG` is shown above. For the second worker you would set `tf_config['task']['index']=1`

Above, `tf_config` is just a local variable in python. To actually use it to configure training, this dictionary needs to be serialized as JSON, and placed in the `TF_CONFIG` environment variable.

### Environment variables and subprocesses in notebooks

Subprocesses inherit environment variables from their parent. So if you set an environment variable in this `jupyter notebook` process:

In [15]:
os.environ['GREETINGS'] = 'Hello TensorFlow!'

You can access the environment variable from a subprocesses:

In [14]:
%%bash
echo ${GREETINGS}

Hello TensorFlow!


In the next section, you'll use this to pass the `TF_CONFIG` to the worker subprocesses. You would never really launch your jobs this way, but it's sufficient for the purposes of this tutorial: To demonstrate a minimal multi-worker example.

## Strategy to be chosen

In TensorFlow there are two main forms of distributed training as we saw earlier in the chapter.

* Synchronous training, where the steps of training are synced across the workers and replicas, and
* Asynchronous training, where the training steps are not strictly synced.

`MultiWorkerMirroredStrategy`, which is the recommended strategy for synchronous multi-worker training, will be demonstrated in this guide.
To train the model, use an instance of `tf.distribute.experimental.MultiWorkerMirroredStrategy`.

`MultiWorkerMirroredStrategy` creates copies of all variables in the model's layers on each device across all workers.  It uses `CollectiveOps`, a TensorFlow op for collective communication, to aggregate gradients and keep the variables in sync.

Note: `TF_CONFIG` is parsed and TensorFlow's GRPC servers are started at the time `MultiWorkerMirroredStrategy()` is called, so the `TF_CONFIG` environment variable must be set before a `tf.distribute.Strategy` instance is created. Since `TF_CONFIG` is not set yet the above strategy is effectively single-worker training.

## Train the model

With the integration of `tf.distribute.Strategy` API into `tf.keras`, the only change you will make to distribute the training to multiple-workers is enclosing the model building and `model.compile()` call inside `strategy.scope()`. The distribution strategy's scope dictates how and where the variables are created, and in the case of `MultiWorkerMirroredStrategy`, the variables created are `MirroredVariable`s, and they are replicated on each of the workers.


Note: Currently there is a limitation in `MultiWorkerMirroredStrategy` where TensorFlow ops need to be created after the instance of strategy is created. If you see `RuntimeError: Collective ops must be configured at program startup`, try creating the instance of `MultiWorkerMirroredStrategy` at the beginning of the program and put the code that may create ops after the strategy is instantiated.

To run with `MultiWorkerMirroredStrategy` you'll need to run worker processes and pass a `TF_CONFIG` to them.

Like the `mnist.py` file written earlier, here is the `main.py` that each of the workers will run:

In [None]:
%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)

Writing main.py


In the code snippet above note that the `global_batch_size`, which gets passed to `Dataset.batch`, is set to `per_worker_batch_size * num_workers`. This ensures that each worker processes batches of `per_worker_batch_size` examples regardless of the number of workers.

The current directory now contains both Python files:

In [None]:
%%bash
ls *.py

main.py
mnist.py


json-serialize the `TF_CONFIG` and add it to the environment variables:

In [None]:
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Now,  launch a worker process that will run the `main.py` and use the `TF_CONFIG`:

In [None]:
# first kill any previous runs
%killbgscripts

All background processes were killed.


In [None]:
%%bash --bg
python main.py &> job_0.log

Starting job # 0 in a separate thread.


There are a few things to note about the above command:

1. It uses the `%%bash` which is a [notebook "magic"](https://ipython.readthedocs.io/en/stable/interactive/magics.html) to run  bash commands.
2. It uses the `--bg` flag to run the `bash` process in the background, because this worker will not terminate. It waits for all the workers before it starts.

The backgrounded worker process won't print output to this notebook, so the `&>` redirects its output to a file, so you can see what happened.

We will wait a few seconds for the process to start up:

In [None]:
import time
time.sleep(5)

Worker's logfile so far:

In [None]:
%%bash
cat job_0.log

2020-11-07 03:07:49.970694: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
2020-11-07 03:07:51.640093: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcuda.so.1
2020-11-07 03:07:51.649302: E tensorflow/stream_executor/cuda/cuda_driver.cc:314] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2020-11-07 03:07:51.649357: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (d05dc70bbdd6): /proc/driver/nvidia/version does not exist
2020-11-07 03:07:51.664781: I tensorflow/core/platform/profile_utils/cpu_utils.cc:104] CPU Frequency: 2200000000 Hz
2020-11-07 03:07:51.665127: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x2e4aa00 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-11-07 03:07:51.665191: I tensorflow/com

The last line of the log file should say: `Started server with target: grpc://localhost:12345`. The first worker is now ready, and is waiting for all the other worker(s) to be ready to proceed.

So update the `tf_config` for the second worker's process to pick up:

In [None]:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Now launch the second worker. This will start the training since all the workers are active (so there's no need to background this process):

In [None]:
%%bash
python main.py

Epoch 1/3
Epoch 2/3
Epoch 3/3


2020-11-07 03:08:21.234886: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
2020-11-07 03:08:22.920781: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcuda.so.1
2020-11-07 03:08:22.929993: E tensorflow/stream_executor/cuda/cuda_driver.cc:314] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2020-11-07 03:08:22.930047: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (d05dc70bbdd6): /proc/driver/nvidia/version does not exist
2020-11-07 03:08:22.936466: I tensorflow/core/platform/profile_utils/cpu_utils.cc:104] CPU Frequency: 2200000000 Hz
2020-11-07 03:08:22.936719: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x13b0a00 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-11-07 03:08:22.936751: I tensorflow/com

Now if you recheck the logs written by the first worker you'll see that it participated in training that model:

In [None]:
%%bash
cat job_0.log

2020-11-07 03:07:49.970694: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
2020-11-07 03:07:51.640093: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcuda.so.1
2020-11-07 03:07:51.649302: E tensorflow/stream_executor/cuda/cuda_driver.cc:314] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2020-11-07 03:07:51.649357: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (d05dc70bbdd6): /proc/driver/nvidia/version does not exist
2020-11-07 03:07:51.664781: I tensorflow/core/platform/profile_utils/cpu_utils.cc:104] CPU Frequency: 2200000000 Hz
2020-11-07 03:07:51.665127: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x2e4aa00 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2020-11-07 03:07:51.665191: I tensorflow/com

Unsurprisingly this ran _slower_ than the the test run at the beginning of this tutorial. Running multiple workers on a single machine only adds overhead. The goal here was not to improve the training time, but only to give an example of multi-worker training.

In [None]:
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts

All background processes were killed.
