<a href="https://colab.research.google.com/github/Sen-R/tutorials-dl/blob/master/tensorflow/Distributed_training_in_TensorFlow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Distributed training in TensorFlow

## Introduction

The `tf.distribute` library is designed to make training with multiple accelerators / multiple workers easier. The *strategy* concept abstracts out many aspects of distributed training so that model definition and training code can be kept agnostic about the hardware that will be used to train the model.

In this notebook, we will cover the basic concepts of what a strategy is and how the `tf.distribute` library can be used.

## Basic concepts in distributed training

There are many levels at which to parallelise computation for deep learning:
* Use multiple machines - i.e. each an entire computer, with processing (including potentially hardware accelerators), memory and communication capabilities;
* Use multiple hardware accelerators (GPUs) on a single computer;
* Invoking highly parallel computation of tensor operations within a processor.

Of these, the last is automatically accomplished through the use of even a single GPU or other dedicated accelerators. However, to speed tasks up further, it could be desirable parallelise further, by using multiple accelerators or even multiple workers. Furthermore, if you wish to use a TPU, even a single TPU node consists of 8 cores, so distributed computation is compulsory. This is where `tf.distribute` comes in.

Whether you are using multiple workers, multiple accelerators or a combination of the two, the next step is to decide how you wish to parallelise your computation. In this context, we are typically talking about parallelising the training loop that searches for optimal weights for a neural network.

Recall that the key steps in a training loop are, given a batch of training data:
1. The forward pass: compute the model outputs and thus loss on the batch;
2. The backward pass: compute gradients with respect to model parameters;
3. Update weights: perform an update of the model weights using the computed gradients and the chosen optimisation algorithm.

Also, note that for most network architectures and loss functions, there is no or little (in the case of batch normalisation) interdependency between examples in a given batch during the forward and backward pass steps. The collective impact of individual examples only really comes together when a reduction (such as calculating the mean) is performed on the per-example loss and its corresponding gradients.

Hence, a tidy way to divide labour between multiple devices would be to split a batch up further into sub-batches, to be sent to each device to perform the forward and backward passes in isolation. When the individual computations are done, the devices talk to each other to compute aggregate gradients and update the model weights accordingly. Note that to take this approach, each device would have to hold its own copy of the model (which would hence need to fit in that device's memory) - called a *replica* of the model - and we need to ensure that each device updates the weights of the model identically.

This approach is called *synchronised* training and is the focus of this notebook. It is conceptually the tidiest and most reliable form of parallelism, in the sense that the evolution of model parameters during training should end up being identical to how parameters would have evolved under single-device training. The downside of this approach is that, since each weight update happens in lock-step, the speed-up of this form of distributed training is limited by the slowest device in the cluster and also by the communications bandwidth available for sharing gradients between replicas to update weights (which could be significant when we are talking about multi-worker parallelism). An alternative approach is *asynchronous* training, where individual devices do not wait for each other, but this is not covered here.

In [1]:
import tensorflow as tf
from tqdm.notebook import tqdm

print("TensorFlow version:", tf.__version__)

TensorFlow version: 2.5.0


## Strategies

A strategy is the object that takes care of distribution for you. Strategies are used to accomplish several aspects of distributed training:

* Control the placement of data across multiple devices, ensuring where appropriate (e.g. model weights) identical data are mirrored across devices and elsewhere where appropriate (e.g. training data) that batches of data are sharded across devices.
* Perform the same operations in parallel across all devices.
* Gather and / or reduce the per-device results.

The strategies we will cover in this tutorial include:
* The default strategy, returned by `tf.distribute.get_strategy`, which actually performs no distribution, but can be used run code that requires a strategy.
* The `MirroredStrategy` that mirrors variables (including model weights) across all devices in scope to perform synchronised training.
* The `TPUStrategy` which is used for trainin on TPUs but is conceptually similar to the `MirroredStrategy`.

We will not specifically discuss these other strategies:
* The `OneDeviceStrategy` that is similar to the default strategy in that it does not perform any parallelism but it does ensure computation takes place on the specified device.
* The `MultiWorkerMirroredStrategy` that is conceptually similar to the `MirroredStrategy` but allows for devices to be attached to multiple workers (rather than a single worker).
* The `CentralStorageStrategy` that like the `MirroredStrategy` performs synchronised training, but with model parameters held centrally on a single device (either the CPU if there are multiple GPUs, or the GPU if there is only a single GPU).
* The `ParameterServerStrategy` which can be used for asynchronous training.

### Choosing a strategy

Simply instantiate a strategy object using the desired `Strategy` subclass. Here we create a `MirroredStrategy`:

In [2]:
strategy = tf.distribute.MirroredStrategy(devices=["/CPU:0", "/GPU:0"])

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0', '/job:localhost/replica:0/task:0/device:GPU:0')


Note that since Colab does not provide multiple GPU acceleration (and we don't want to use TPU acceleration unnecessarily) I have forced the strategy to use both the CPU and single available GPU for mirrored training. This is clearly not an efficient setup for actual training, but will be fine for demonstrating how to use this strategy.

Let us begin by checking the number of model replicas in the strategy:

In [3]:
print("Number of replicas in strategy:", strategy.num_replicas_in_sync) # should be 2


Number of replicas in strategy: 2


### Basic usage

#### Variable placement

Strategies have an associated scope, accessed using the `scope` method. Variables created within that scope (including variables created within functions called within that scope) are automatically replicated across all devices.

In [4]:
class MyClass:
    def __init__(self):
        self.weight = tf.Variable(3.14, dtype=tf.float32)

# Non-replicated instantiation of MyClass
nonrep = MyClass()
print("nonrep.weight:", nonrep.weight)
print()

with strategy.scope():
    # Replicated instantiation of MyClass
    rep = MyClass()
print("rep.weight:", rep.weight)

nonrep.weight: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=3.14>

rep.weight: MirroredVariable:{
  0: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=3.14>,
  1: <tf.Variable 'Variable/replica_1:0' shape=() dtype=float32, numpy=3.14>
}


As you can see, when an object is created outside of strategy scope (`nonrep`), variables are just created on the default device. However, when the same class is instantiated within strategy scope (`rep`), variables held by that object are of type `MirroredVariable` and are replicated across both devices.

#### Running code

We can use `strategy.run` to run code in "replica scope". Semantically, code run in replica scope runs almost as if it is running purely on a single device, independent of others. I said "almost" because there are certain operations, like variable assignment that may first use cross device communication, to ensure variable updates remain in sync.

In [5]:
@tf.function
def add(a, b):
    return a + b

res = strategy.run(add, args=(tf.constant([[1, 2]]), tf.constant([[3, 4]])))
print(res)

PerReplica:{
  0: tf.Tensor([[4 6]], shape=(1, 2), dtype=int32),
  1: tf.Tensor([[4 6]], shape=(1, 2), dtype=int32)
}


There are also methods to collect per-replica results into a tuple, concatenate them into a tensor or perform a reduction on them:

In [6]:
print("strategy.experimental_local_results:")
print(strategy.experimental_local_results(res))
print()

print("strategy.gather:")
print(strategy.gather(res, axis=0))
print()

print("strategy.reduce:")
print(strategy.reduce("SUM", res, axis=0))

strategy.experimental_local_results:
(<tf.Tensor: shape=(1, 2), dtype=int32, numpy=array([[4, 6]], dtype=int32)>, <tf.Tensor: shape=(1, 2), dtype=int32, numpy=array([[4, 6]], dtype=int32)>)

strategy.gather:
INFO:tensorflow:Gather to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
tf.Tensor(
[[4 6]
 [4 6]], shape=(2, 2), dtype=int32)

strategy.reduce:
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
tf.Tensor([ 8 12], shape=(2,), dtype=int32)


#### Data sharding

On the other hand, we want data from input datasets to be sharded across devices. This is achieved by calling the `experimental_distribute_dataset` method on a TensorFlow `Dataset` object.

In the following code, we first create a "dataset" consisting of the numbers from 0 to 15, batched into chunks of 4, and distribute it across the two devices. Sharding means the original size 4 batches will be sub-divided into size 2 batches, one-per device, as you can see from the output below.

In [7]:
ds = tf.data.Dataset.from_tensor_slices(tf.range(16)).batch(4)
dds = strategy.experimental_distribute_dataset(ds)

@tf.function
def return_batch(batch):
    """No-op: just returns the batch as-is"""
    return {"replica": tf.distribute.get_replica_context().replica_id_in_sync_group, "batch": batch, }

def dataset_distribution_experiment(dataset):
    for idx, batch in enumerate(dataset):
        print("Global batch #:", idx)
        results = strategy.experimental_local_results(strategy.run(return_batch, args=(batch,)))
        print("Results:", [{k: v.numpy() for k, v in d.items()} for d in results])
        print("")

print("Original dataset (not sharded):")
print("===============================")
dataset_distribution_experiment(ds)
print()
print()
print("Distributed dataset:")
print("====================")
dataset_distribution_experiment(dds)

Original dataset (not sharded):
Global batch #: 0
Results: [{'replica': 0, 'batch': array([0, 1, 2, 3], dtype=int32)}, {'replica': 1, 'batch': array([0, 1, 2, 3], dtype=int32)}]

Global batch #: 1
Results: [{'replica': 0, 'batch': array([4, 5, 6, 7], dtype=int32)}, {'replica': 1, 'batch': array([4, 5, 6, 7], dtype=int32)}]

Global batch #: 2
Results: [{'replica': 0, 'batch': array([ 8,  9, 10, 11], dtype=int32)}, {'replica': 1, 'batch': array([ 8,  9, 10, 11], dtype=int32)}]

Global batch #: 3
Results: [{'replica': 0, 'batch': array([12, 13, 14, 15], dtype=int32)}, {'replica': 1, 'batch': array([12, 13, 14, 15], dtype=int32)}]



Distributed dataset:
Global batch #: 0
Results: [{'replica': 0, 'batch': array([0, 1], dtype=int32)}, {'replica': 1, 'batch': array([2, 3], dtype=int32)}]

Global batch #: 1
Results: [{'replica': 0, 'batch': array([4, 5], dtype=int32)}, {'replica': 1, 'batch': array([6, 7], dtype=int32)}]

Global batch #: 2
Results: [{'replica': 0, 'batch': array([8, 9], dtype

As the output shows, if we fail to distribute the dataset, the same (global, size 4) batches will be fed to both replicas, meaning we fail to derive any advantage from distributed training. However, by distributing the dataset, each replica receives one half of the data.

#### Coordinated variable updates

The last ingredient for distributed training is to update replicated variables in a coordinated manner. I.e., after backpropagation on their respective sub-batches, each replica will hold a different gradient tensor (with respect to the same model weights). To perform synchronised training, we need to ensure that these different gradients are combined to single value that is then used by the optimisation algorithm to update weight variables in all the replicas.

For variables created in strategy scope, this is taken care of automatically when using any of the variable `assign` methods. The only requirement is to specify, when the variable is created, how updates should be aggregated across replicas before being assigned. In the following example, we will create a mirrored "running_sum" variable that needs to be updated with values from the distributed dataset we created earlier. We want to add the results from each replica first before we use `assign_add`, so we set `aggregation=tf.VariableAggregation.SUM` when creating the variable:

In [8]:
class RunningSum:
    def __init__(self, aggregation):
        self.result = tf.Variable(0, dtype=tf.int32, aggregation=aggregation)

    @tf.function
    def update(self, value):
        self.result.assign_add(tf.reduce_sum(value))

def assignment_experiment(aggregation, strategy, dataset):
    with strategy.scope():
        rs = RunningSum(aggregation)
    for val in dataset:
        strategy.run(rs.update, args=(val,))
    return rs.result

print(
    "Per replica variables diverge without aggregation (as each device "
    "only updates according to the values it sees):"
)
print(assignment_experiment(tf.VariableAggregation.NONE, strategy, dds))
print()
print(
    "But setting aggregation to SUM keeps the variables in sync and yields "
    "the correct result for the sum of the dataset (i.e. 120):"
)
print(assignment_experiment(tf.VariableAggregation.SUM, strategy, dds))

Per replica variables diverge without aggregation (as each device only updates according to the values it sees):
MirroredVariable:{
  0: <tf.Variable 'Variable:0' shape=() dtype=int32, numpy=52>,
  1: <tf.Variable 'Variable/replica_1:0' shape=() dtype=int32, numpy=68>
}

But setting aggregation to SUM keeps the variables in sync and yields the correct result for the sum of the dataset (i.e. 120):
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0', '/job:localhost/replica:0/task:0/device:CPU:0').
MirroredVariable:{
  0: <tf.Variable 'Variable:0' shape=() dtype=int32, numpy=120>,
  1: <tf.Variable 'Variable/replica_1:0' shape=() dtype=int32, numpy=120>
}


However, this is *not* how coordination of variable aggregation is done with Tensorflow's optimisers. This is presumably because there is a reliance on users having set the `aggregation` property of variables correctly for this approach to work. Instead, optimisers instead use the pattern and functions described [here](https://www.tensorflow.org/api_docs/python/tf/distribute/StrategyExtended) to perform coordinated gradient updates.

Finally, in practice, it is unlikely we ever need to worry about these details for deep learning applications. This is because TensorFlow's `Optimizer` base class (and therefore correctly implemented subclasses) takes care of this for us (summing gradients by default) within the `apply_gradients` method. This can be overridden by using the `experimental_aggregate_gradients` option, which may be useful for aggregating processed gradient, as discussed further in the documentation for [`Optimizer`](https://www.tensorflow.org/api_docs/python/tf/keras/optimizers/Optimizer#use_with_tfdistributestrategy_2).

See the [custom training loop](#custom_training_loops) section below for an example of how to use an `Optimizer` within a strategy.

## Using strategies with Keras

To use Keras (and its standard training loop) with a strategy is simple as the library is strategy aware. Apart from creating the model within the `strategy.scope()` context, the key differences are in preparing the dataset:

* Remember to distribute the dataset
* The dataset needs to be repeated as many times as there are epochs, and the number of steps per epoch need to be passed to the `model.fit` method. This is presumably because distributed datasets do not have a `__len__` method and furthermore the generator is not re-created at the end of each epoch.


In [9]:
# Create training data
X = tf.range(0, 20, dtype=tf.float32)[:, tf.newaxis]
y = 2. * X + 3
train = tf.data.Dataset.from_tensor_slices((X, y))

# Distribute dataset
EPOCHS = 100
PER_REPLICA_BATCH_SIZE = 2
GLOBAL_BATCH_SIZE = PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync
STEPS_PER_EPOCH = len(train) // GLOBAL_BATCH_SIZE
dist_train = strategy.experimental_distribute_dataset(
    train.batch(GLOBAL_BATCH_SIZE).repeat(EPOCHS)
)

# Create model in cross-replica scope and fit, otherwise as usual
def create_and_compile_model():
    model = tf.keras.models.Sequential(
        [tf.keras.layers.Dense(1, input_shape=(1,))]
    )
    loss = tf.keras.losses.MeanSquaredError()
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.1)
    metrics = [tf.keras.metrics.mean_absolute_error]
    model.compile(
        loss=loss, optimizer=optimizer, metrics=metrics
    )
    return model

with strategy.scope():
    model = create_and_compile_model()
    history = model.fit(
        dist_train,
        steps_per_epoch=STEPS_PER_EPOCH,
        epochs=EPOCHS,
        verbose=0,
    )

print(
    "Training complete, final loss {:.6f}.".format(history.history["loss"][-1])
)


INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0', '/job:localhost/replica:0/task:0/device:GPU:0').
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0', '/job:localhost/replica:0/task:0/device:GPU:0').
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:C

### Saving and checkpointing

When a model is created within the `strategy.scope()` context, its variables are (as to be expected) distributed:

In [10]:
print(model.layers[0].kernel)

MirroredVariable:{
  0: <tf.Variable 'dense/kernel:0' shape=(1, 1) dtype=float32, numpy=array([[1.9768484]], dtype=float32)>,
  1: <tf.Variable 'dense/kernel/replica_1:0' shape=(1, 1) dtype=float32, numpy=array([[1.9768559]], dtype=float32)>
}


However, when you call `model.get_weights`, just one set of weights is returned (as for non-distributed models):

In [11]:
print(model.get_weights())

[array([[1.9768484]], dtype=float32), array([3.2932746], dtype=float32)]


This means that saving and checkpointing models is the same as without `tf.distribute`. In fact, you can even save a model that was trained using a certain strategy and load the weights onto a model that was created using a different strategy / without any strategy:

In [12]:
# Save original model
model.save_weights("ckpt")

# Create new (unreplicated model)
nonrep_model = create_and_compile_model()
nonrep_model.load_weights("ckpt")

# Check model performance
perf = nonrep_model.evaluate(train.batch(GLOBAL_BATCH_SIZE))
print("Loaded model loss: {:.6f}".format(perf[0]))

Loaded model loss: 0.023200


One noteworthy point about the outputs above is that during distributed training it is possible for "mirrored" variables' values to diverge slightly between supposedly identical replicas. (Look at the values for the kernel of the dense layer above.) This is actually not surprising, particularly when the replicas are on different device types (a CPU and GPU in this case) as even where the same operations are performed, numerical errors can cause precise values to diverge.

Looking at the outputs of `model.get_weights()` above it seems that the weights for replica id 0 are the "authoritative" weights that get saved. It would be interesting (at some point) to investigate just how far weights can diverge between replicas in practice (for real problems) and how much of an issue this could pose for the reliability of distributed training.

### Custom training loops
<a name="custom_training_loops"/>

In a custom training loop, it is important to set the loss function up correctly. You don't want to average the loss within a replica, but instead divide by the global (across-all-replicas) batch size. This way, when the optimizer's `apply_gradients` method sums gradients across all replicas (or you do it manually using reduce to record the loss) this will be for the actual average loss across the whole batch.

This is the pattern suggested in the [TensorFlow guide](https://www.tensorflow.org/guide/distributed_training#using_tfdistributestrategy_with_custom_training_loops). 

In [13]:
pe_loss = tf.keras.losses.MeanSquaredError(reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
    per_example_loss = pe_loss(labels,predictions)
    return tf.nn.compute_average_loss(
        per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE
    )

@tf.function
def train_step(inputs, model):
    features, labels = inputs
    with tf.GradientTape() as tape:
        predictions = model(features, training=True)
        loss = compute_loss(labels, predictions)
    gradients = tape.gradient(loss, model.trainable_variables)
    model.optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss

def distributed_train_step(dist_inputs, model, strategy):
    per_replica_losses = strategy.run(train_step, args=(dist_inputs, model))
    return strategy.reduce("SUM", per_replica_losses, axis=None)

def distributed_train(dist_inputs, model, strategy, steps_per_epoch, epochs):
    for epoch in tqdm(range(epochs)):
        iterator = iter(dist_inputs)
        for step in range(steps_per_epoch):
            batch = next(iterator)
            distributed_train_step(batch, model, strategy)

with strategy.scope():
    cust_train_model = create_and_compile_model()
distributed_train(dist_train, cust_train_model, strategy, STEPS_PER_EPOCH, EPOCHS)
perf = cust_train_model.evaluate(dist_train, steps=STEPS_PER_EPOCH)
print("Model trained, loss: {:.6f}".format(perf[0]))

HBox(children=(FloatProgress(value=0.0), HTML(value='')))

INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0', '/job:localhost/replica:0/task:0/device:GPU:0').
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0', '/job:localhost/replica:0/task:0/device:GPU:0').
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0', '/job:localhost/replica:0/task:0/device:GPU:0').
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0', '/job:localhost/replica:0/task:0/device:GPU:0').

Model trained, loss: 0.000255


## Using TPUs for training

**Note: you will need to switch runtime type to TPU before running the code in this section**

First initialize your TPU using the following code:

In [1]:
import tensorflow as tf
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("TPU devices available:", tf.config.list_logical_devices("TPU"))

INFO:tensorflow:Initializing the TPU system: grpc://10.86.236.66:8470


INFO:tensorflow:Initializing the TPU system: grpc://10.86.236.66:8470


INFO:tensorflow:Clearing out eager caches


INFO:tensorflow:Clearing out eager caches


INFO:tensorflow:Finished initializing TPU system.


INFO:tensorflow:Finished initializing TPU system.


TPU devices available: [LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU')]


It is now possible to use manual device placement to place computations on a named TPU device. However, typically you will want to use `TPUStrategy` to distribute training across all available TPU devices and manage variable / model placement for you.

Obtain the strategy as follows:

In [2]:
strategy = tf.distribute.TPUStrategy(resolver)

INFO:tensorflow:Found TPU system:


INFO:tensorflow:Found TPU system:


INFO:tensorflow:*** Num TPU Cores: 8


INFO:tensorflow:*** Num TPU Cores: 8


INFO:tensorflow:*** Num TPU Workers: 1


INFO:tensorflow:*** Num TPU Workers: 1


INFO:tensorflow:*** Num TPU Cores Per Worker: 8


INFO:tensorflow:*** Num TPU Cores Per Worker: 8


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)


INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)


Now, code designed to work with any strategy can be run on all available TPUs. See the [TensorFlow guide](https://www.tensorflow.org/guide/tpu) for a standard example (classification with MNIST).

Note that except where a `Dataset` has been generated from in-graph data (e.g. using `from_tensor_slices`) the underlying datafiles will need to be in a GCS bucket. For `tfds` datasets, this is achieved by using setting the `try_gcs` keyword argument to `True` when calling `tfds.load`. The guide has several other tips for maximising performance on TPUs (the key themes to be to try and eliminate bottlenecks caused by loading data and CPU-based processing).