##### Copyright 2018 The TensorFlow Authors.


In [1]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Distributed training with TensorFlow

## Overview

`tf.distribute.Strategy` is a TensorFlow API to distribute training 
across multiple GPUs, multiple machines or TPUs. Using this API, you can distribute your existing models and training code with minimal code changes.

`tf.distribute.Strategy` has been designed with these key goals in mind:

* Easy to use and support multiple user segments, including researchers, ML engineers, etc.
* Provide good performance out of the box.
* Easy switching between strategies.

`tf.distribute.Strategy` can be used with a high-level API like [Keras](https://www.tensorflow.org/guide/keras), and can also be used to distribute custom training loops (and, in general, any computation using TensorFlow).

In TensorFlow 2.x, you can execute your programs eagerly, or in a graph using [`tf.function`](function.ipynb). `tf.distribute.Strategy` intends to support both these modes of execution, but works best with `tf.function`. Eager mode is only recommended for debugging purpose and not supported for `TPUStrategy`. Although we discuss training most of the time in this guide, this API can also be used for distributing evaluation and prediction on different platforms.

You can use `tf.distribute.Strategy` with very few changes to your code, because we have changed the underlying components of TensorFlow to become strategy-aware. This includes variables, layers, models, optimizers, metrics, summaries, and checkpoints.

In this guide, we explain various types of strategies and how you can use them in different situations.

Note: For a deeper understanding of the concepts, please watch [this deep-dive presentation](https://youtu.be/jKV53r9-H14). This is especially recommended if you plan to write your own training loop.


In [2]:
# Import TensorFlow
import tensorflow as tf

## Types of strategies
`tf.distribute.Strategy` intends to cover a number of use cases along different axes. Some of these combinations are currently supported and others will be added in the future. Some of these axes are:

* *Synchronous vs asynchronous training:* These are two common ways of distributing training with data parallelism. In sync training, all workers train over different slices of input data in sync, and aggregating gradients at each step. In async training, all workers are independently training over the input data and updating variables asynchronously. Typically sync training is supported via all-reduce and async through parameter server architecture.
* *Hardware platform:* You may want to scale your training onto multiple GPUs on one machine, or multiple machines in a network (with 0 or more GPUs each), or on Cloud TPUs.

In order to support these use cases, there are six strategies available. In the next section we explain which of these are supported in which scenarios in TF 2.2 at this time. Here is a quick overview:

| Training API          	| MirroredStrategy  	| TPUStrategy         	| MultiWorkerMirroredStrategy     	| CentralStorageStrategy          	| ParameterServerStrategy  	|
|:-----------------------	|:-------------------	|:---------------------	|:---------------------------------	|:---------------------------------	|:--------------------------	|
| **Keras API**             	| Supported         	|  Supported 	| Experimental support 	| Experimental support 	| Supported planned post 2.3 	|
| **Custom training loop**  	| Supported 	| Supported   	| Experimental support            	| Experimental support           	| Supported planned post 2.3         	|
| **Estimator API**         	| Limited Support         	| Not supported           	| Limited Support                       	| Limited Support                       	| Limited Support                	|

Note: [Experimental support](https://www.tensorflow.org/guide/versions#what_is_not_covered) means the APIs are not covered by any compatibilities guarantees.

Note: Estimator support is limited. Basic training and evaluation are experimental, and advanced features—such as scaffold—are not implemented. We recommend using Keras or custom training loops if a use case is not covered.

### MirroredStrategy
`tf.distribute.MirroredStrategy` supports synchronous distributed training on multiple GPUs on one machine. It creates one replica per GPU device. Each variable in the model is mirrored across all the replicas. Together, these variables form a single conceptual variable called `MirroredVariable`. These variables are kept in sync with each other by applying identical updates.

Efficient all-reduce algorithms are used to communicate the variable updates across the devices.
All-reduce aggregates tensors across all the devices by adding them up, and makes them available on each device.
It’s a fused algorithm that is very efficient and can reduce the overhead of synchronization significantly. There are many all-reduce algorithms and implementations available, depending on the type of communication available between devices. By default, it uses NVIDIA NCCL as the all-reduce implementation. You can choose from a few other options we provide, or write your own.

Here is the simplest way of creating `MirroredStrategy`:


In [3]:
mirrored_strategy = tf.distribute.MirroredStrategy()

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)


This will create a `MirroredStrategy` instance which will use all the GPUs that are visible to TensorFlow, and use NCCL as the cross device communication.

If you wish to use only some of the GPUs on your machine, you can do so like this:

In [4]:
mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')


If you wish to override the cross device communication, you can do so using the `cross_device_ops` argument by supplying an instance of `tf.distribute.CrossDeviceOps`. Currently,  `tf.distribute.HierarchicalCopyAllReduce` and `tf.distribute.ReductionToOneDevice` are two options other than `tf.distribute.NcclAllReduce` which is the default.

In [5]:
mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)


### MultiWorkerMirroredStrategy

`tf.distribute.experimental.MultiWorkerMirroredStrategy` is very similar to `MirroredStrategy`. It implements synchronous distributed training across multiple workers, each with potentially multiple GPUs. Similar to `MirroredStrategy`, it creates copies of all variables in the model on each device across all workers.

It uses [CollectiveOps](https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/ops/collective_ops.py) as the multi-worker all-reduce communication method used to keep variables in sync. A collective op is a single op in the TensorFlow graph which can automatically choose an all-reduce algorithm in the TensorFlow runtime according to hardware, network topology and tensor sizes.

It also implements additional performance optimizations. For example, it includes a static optimization that converts multiple all-reductions on small tensors into fewer all-reductions on larger tensors. In addition, we are designing it to have a plugin architecture - so that in the future, you will be able to plugin algorithms that are better tuned for your hardware. Note that collective ops also implement other collective operations such as broadcast and all-gather.

Here is the simplest way of creating `MultiWorkerMirroredStrategy`:

In [6]:
multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO


`MultiWorkerMirroredStrategy` currently allows you to choose between two different implementations of collective ops.  `CollectiveCommunication.RING` implements ring-based collectives using gRPC as the communication layer.  `CollectiveCommunication.NCCL` uses [Nvidia's NCCL](https://developer.nvidia.com/nccl) to implement collectives.  `CollectiveCommunication.AUTO` defers the choice to the runtime.  The best choice of collective implementation depends upon the number and kind of GPUs, and the network interconnect in the cluster. You can specify them in the following way:


In [7]:
multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.NCCL)

INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.NCCL


One of the key differences to get multi worker training going, as compared to multi-GPU training, is the multi-worker setup. The `TF_CONFIG` environment variable is the standard way in TensorFlow to specify the cluster configuration to each worker that is part of the cluster. Learn more about [setting up TF_CONFIG](#TF_CONFIG).

Note: This strategy is [`experimental`](https://www.tensorflow.org/guide/versions#what_is_not_covered) as we are currently improving it and making it work for more scenarios. As part of this, please expect the APIs to change in the future.

### CentralStorageStrategy
`tf.distribute.experimental.CentralStorageStrategy` does synchronous training as well. Variables are not mirrored, instead they are placed on the CPU and operations are replicated across all local GPUs. If there is only one GPU, all variables and operations will be placed on that GPU.

Create an instance of `CentralStorageStrategy` by:


In [8]:
central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()

INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'


This will create a `CentralStorageStrategy` instance which will use all visible GPUs and CPU. Update to variables on replicas will be aggregated before being applied to variables.

Note: This strategy is [`experimental`](https://www.tensorflow.org/guide/versions#what_is_not_covered) as we are currently improving it and making it work for more scenarios. As part of this, please expect the APIs to change in the future.

### ParameterServerStrategy
`tf.distribute.experimental.ParameterServerStrategy` supports parameter servers training on multiple machines. In this setup, some machines are designated as workers and some as parameter servers. Each variable of the model is placed on one parameter server. Computation is replicated across all GPUs of all the workers.

In terms of code, it looks similar to other strategies:
```
ps_strategy = tf.distribute.experimental.ParameterServerStrategy()
```

For multi worker training, `TF_CONFIG` needs to specify the configuration of parameter servers and workers in your cluster, which you can read more about in [TF_CONFIG below](#TF_CONFIG).

Note: This strategy only works with the Estimator API.

### Other strategies

In addition to the above strategies, there are two other strategies which might be useful for prototyping and debugging when using `tf.distribute` APIs.

#### Default Strategy

Default strategy is a distribution strategy which is present when no explicit distribution strategy is in scope. It implements the `tf.distribute.Strategy` interface but is a pass-through and provides no actual distribution. For instance, `strategy.run(fn)` will simply call `fn`. Code written using this strategy should behave exactly as code written without any strategy. You can think of it as a "no-op" strategy.

Default strategy is a singleton - and one cannot create more instances of it. It can be obtained using `tf.distribute.get_strategy()` outside any explicit strategy's scope (the same API that can be used to get the current strategy inside an explicit strategy's scope). 

In [8]:
default_strategy = tf.distribute.get_strategy()

This strategy serves two main purposes:

* It allows writing distribution aware library code unconditionally. For example, in optimizer, we can do `tf.distribute.get_strategy()` and use that strategy for reducing gradients - it will always return a strategy object on which we can call the reduce API.


In [9]:
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values

1.0

* Similar to library code, it can be used to write end users' programs to work with and without distribution strategy, without requiring conditional logic. A sample code snippet illustrating this:

In [11]:
if tf.config.list_physical_devices('gpu'):
  strategy = tf.distribute.MirroredStrategy()
else:  # use default strategy
  strategy = tf.distribute.get_strategy() 

with strategy.scope():
  # do something interesting
  print(tf.Variable(1.))

<tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>


#### OneDeviceStrategy
`tf.distribute.OneDeviceStrategy` is a strategy to place all variables and computation on a single specified device. 

```
strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
```

This strategy is distinct from the default strategy in a number of ways. In default strategy, the variable placement logic remains unchanged when compared to running TensorFlow without any distribution strategy. But when using `OneDeviceStrategy`, all variables created in its scope are explicitly placed on the specified device. Moreover, any functions called via `OneDeviceStrategy.run` will also be placed on the specified device. 

Input distributed through this strategy will be prefetched to the specified device. In default strategy, there is no input distribution.

Similar to the default strategy, this strategy could also be used to test your code before switching to other strategies which actually distribute to multiple devices/machines. This will exercise the distribution strategy machinery somewhat more than default strategy, but not to the full extent as using `MirroredStrategy` or `TPUStrategy` etc. If you want code that behaves as if no strategy, then use default strategy.

So far we've talked about what are the different strategies available and how you can instantiate them. In the next few sections, we will talk about the different ways in which you can use them to distribute your training. We will show short code snippets in this guide and link off to full tutorials which you can run end to end.

## Using `tf.distribute.Strategy` with `tf.keras.Model.fit`
We've integrated `tf.distribute.Strategy` into `tf.keras` which is TensorFlow's implementation of the
[Keras API specification](https://keras.io). `tf.keras`  is a high-level API to build and train models. By integrating into `tf.keras` backend, we've made it seamless for you to distribute your training written in the Keras training framework using `model.fit`.

Here's what you need to change in your code:

1. Create an instance of the appropriate `tf.distribute.Strategy`.
2. Move the creation of Keras model, optimizer and metrics inside `strategy.scope`.

We support all types of Keras models - sequential, functional and subclassed.

Here is a snippet of code to do this for a very simple Keras model with one dense layer:

In [13]:
mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)


In this example we used `MirroredStrategy` so we can run this on a machine with multiple GPUs. `strategy.scope()` indicates to Keras which strategy to use to distribute the training. Creating models/optimizers/metrics inside this scope allows us to create distributed variables instead of regular variables. Once this is set up, you can fit your model like you would normally. `MirroredStrategy` takes care of replicating the model's training on the available GPUs, aggregating gradients, and more.

In [14]:
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)

Epoch 1/2
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).


0.1281416267156601

Here we used a `tf.data.Dataset` to provide the training and eval input. You can also use numpy arrays:

In [15]:
import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)

Epoch 1/2
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x7fa07027b2e8>