# Parameter Server Training

**Learning Objectives**

1. Instantiate a ParameterServerStrategy
2. Training with Model.fit
3. Training with Custom Training Loop
4. Define and run an evaluation loop





## Introduction 

[Parameter server training](https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-li_mu.pdf)
is a common data-parallel method to scale up model training on multiple
machines. A parameter server training cluster consists of workers and parameter
servers. Variables are created on parameter servers and they are read and updated by workers in each step. By default, workers read and update these variables independently without synchronizing with each other. This is why sometimes parameter server-style training is called asynchronous training.

In TF2, parameter server training is powered by the
`tf.distribute.experimental.ParameterServerStrategy` class, which distributes
the training steps to a cluster that scales up to thousands of workers
(accompanied by parameter servers). There are two main supported training APIs:
Keras Training API, also known as `Model.fit`, and Custom Training Loop (CTL).
`Model.fit` is recommended when users prefer a high-level abstraction
and handling of training, while CTL is recommended when users prefer to define the details of their training
loop.

Regardless of the API of choice, distributed training in TF2 involves a
"cluster" with several "jobs", and each of the jobs may have one or more
"tasks". When using parameter server training, it is recommended to have one
coordinator job (which has the job name `chief`), multiple worker jobs (job name
`worker`), and multiple parameter server jobs (job name `ps`).

While the coordinator creates resources, dispatches training tasks, writes
checkpoints, and deals with task failures, workers and parameter servers run `tf.distribute.Server` that listen for requests from the coordinator.

### Parameter server training with `Model.fit` API

Parameter server training with `Model.fit` API requires the coordinator to use a `tf.distribute.experimental.ParameterServerStrategy` object, and a `tf.keras.utils.experimental.DatasetCreator` as the input. Similar to `Model.fit` usage with no strategy, or with other strategies, the workflow
involves creating and compiling the model, preparing the callbacks, followed by a `Model.fit` call.

### Parameter server training with custom training loop (CTL) API

With CTLs, the `tf.distribute.experimental.coordinator.ClusterCoordinator`
class is the key component used for the coordinator. The `ClusterCoordinator`
class needs to work in conjunction with a `tf.distribute.Strategy` object. This
`tf.distribute.Strategy` object is needed to provide the information of the cluster and is used to define a training step as we have seen in [custom training with `MirroredStrategy`](https://www.tensorflow.org/tutorials/distribute/custom_training#training_loop). The `ClusterCoordinator` object then dispatches the execution of these training
steps to remote workers. For parameter server training, the `ClusterCoordinator`
needs to work with a `tf.distribute.experimental.ParameterServerStrategy`.

The most important API provided by the `ClusterCoordinator` object is `schedule`. The `schedule` API enqueues a `tf.function` and returns a future-like `RemoteValue` immediately. The queued functions will be dispatched to remote workers in background threads and their `RemoteValue`s will be filled asynchronously. Since `schedule` doesn’t require worker assignment, the `tf.function` passed in can be executed on any available worker. If the worker it is executed on becomes unavailable before its completion, the function will be retried on another available worker. Because of this fact and the fact that function execution is not atomic, a function may be executed more than once.

In addition to dispatching remote functions, the `ClusterCoordinator` also helps
to create datasets on all the workers and rebuild these datasets when a worker recovers from failure.

Each learning objective will correspond to a _#TODO_ in this student lab notebook -- try to complete this notebook first and then review the [solution notebook](https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/courses/machine_learning/deepdive2/production_ml/solutions/parameter_server_training.ipynb)

## Setup

The tutorial will branch into CTL or `Model.fit` paths, and you can choose the
one that fits your need. Sections other than "Training with X" are appliable to
both paths.

In [2]:
!pip install -q portpicker
!pip install --upgrade tensorflow==2.6



**NOTE: Please ignore any incompatibility warnings and errors and re-run the above cell before proceeding.**


In [3]:
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

This notebook uses TF2.x.
Please check your tensorflow version using the cell below.

In [None]:
# Show the currently installed version of TensorFlow
print("TensorFlow version: ",tf.version.VERSION)

TensorFlow version:  2.6.0


## Cluster Setup

As mentioned above, a parameter server training cluster requires a coordinator task that runs your training program, one or several workers and parameter server tasks that run TensorFlow servers, i.e. `tf.distribute.Server`, and possibly an additional evaluation task that runs side-car evaluation (see the side-car evaluation section below). The
requirements to set them up are:

*   The coordinator task needs to know the addresses and ports of all other TensorFlow servers except the evaluator.
*   The workers and parameter servers need to know which port they need to listen to. For the sake of simplicity, we usually pass in the complete cluster information when we create TensorFlow servers on these tasks.
*   The evaluator task doesn’t have to know the setup of the training cluster. If it does, it should not attempt to connect to the training cluster.
*   Workers and parameter servers should have task types as “worker” and “ps” respectively. The coordinator should use “chief” as the task type for legacy reasons.

In this tutorial, we will create an in-process cluster so that the whole parameter server training can be run in colab. We will introduce how to set up real clusters in a later section.

### In-process cluster

In this tutorial, we will start a bunch of TensorFlow servers in advance and
connect to them later. Note that this is only for the purpose of this tutorial's
demonstration, and in real training the servers will be started on worker and ps
machines.

In [4]:
def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

The in-process cluster setup is frequently used in our unit testing. Here is
[one example](https://github.com/tensorflow/tensorflow/blob/7621d31921c2ed979f212da066631ddfda37adf5/tensorflow/python/distribute/coordinator/cluster_coordinator_test.py#L437).

## Instantiate a `ParameterServerStrategy`

Before we dive into the training code, let's instantiate a `ParameterServerStrategy` object. Note that this is needed regardless of whether you are proceeding with a custom training loop or `Model.fit`. `variable_partitioner` argument will be explained in the [next section](#variable-sharding).

In [5]:
variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)

INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:18923', 'localhost:24530'], 'worker': ['localhost:19174', 'localhost:22565', 'localhost:23430']})


INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:18923', 'localhost:24530'], 'worker': ['localhost:19174', 'localhost:22565', 'localhost:23430']})


INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:CPU:0'], variable_device = '/job:chief/replica:0/task:0/device:CPU:0'


INFO:tensorflow:Number of GPUs on workers: 0


In order to use GPUs for training, allocate GPUs visible to each worker.
`ParameterServerStrategy` will use all the available GPUs on each worker,
with the restriction that all workers should have the same number of GPUs
available. 

### Variable sharding

Variable sharding refers to splitting a variable into multiple smaller
variables. We call these smaller variables *shard*s. Variable sharding may be
useful to distribute the network load when accessing these shards. It is also
useful to distribute computation and storage of a normal variable across
multiple parameter servers.

To enable variable sharding, you can pass in a `variable_partitioner` when
constructing a `ParameterServerStrategy` object. The `variable_partitioner` will
be invoked every time when a variable is created and it is expected to return
the number of shards along each dimension of the variable. Some out-of-box
`variable_partitioner`s are provided such as
`tf.distribute.experimental.partitioners.FixedShardsPartitioner`.

When a `variable_partitioner` is passed in and if you create a variable directly
under `strategy.scope()`, it will become a container type with a `variables`
property which provides access to the list of shards. In most cases, this
container will be automatically converted to a Tensor by concatenating all the
shards. As a result, it can be used as a normal variable. On the other hand,
some TensorFlow methods such as `tf.nn.embedding_lookup` provide efficient
implementation for this container type and in these methods automatic
concatenation will be avoided.

Please see the API docstring of `ParameterServerStrategy` for more details.

## Training with `Model.fit`
<a id="training_with_modelfit"></a>

Keras provides an easy-to-use training API via `Model.fit` that handles the
training loop under the hood, with the flexbility of overridable `train_step`,
and callbacks which provide functionalities such as checkpoint saving, or
summary saving for TensorBoard. With `Model.fit`, the same training code can be
used for other strategies with a simple swap of the strategy object.

### Input data

`Model.fit` with parameter server training requires that the input data be
provided in a callable that takes a single argument of type
`tf.distribute.InputContext`, and returns a `tf.data.Dataset`. Then, create a
`tf.keras.utils.experimental.DatasetCreator` object that takes such `callable`,
and an optional `tf.distribute.InputOptions` object via `input_options`
argument. Note that it is recommended to shuffle and repeat the data with
parameter server training, and specify `steps_per_epoch` in `fit` call so the library knows the
epoch boundaries.

Please see
[Distributed Input](https://www.tensorflow.org/tutorials/distribute/input#usage_2)
guide for more information about the `InputContext` argument.

In [6]:
def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))
  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)
  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

The code in `dataset_fn` will be invoked on the input device, which is usually
the CPU, on each of the worker machines.

### Model construction and compiling

Now, you will create a `tf.keras.Model` with the APIs of choice (a trivial
`tf.keras.models.Sequential` model is being used as a demonstration here),
followed by a `Model.compile` call to incorporate components such as optimizer,
metrics, or parameters such as `steps_per_execution`:

In [7]:
# TODO: Your code goes here

### Callbacks and training

<a id="callbacks-and-training"> </a>

Before you call `model.fit` for the actual training, let's prepare the needed
callbacks for common tasks such as:

*   `ModelCheckpoint` - to save the model weights.

*   `BackupAndRestore` - to make sure the training progress is automatically
    backed up, and recovered if the cluster experiences unavailability (such as
    abort or preemption), or

*   `TensorBoard` - to save the progress reports into summary files which get
    visualized in TensorBoard tool.

Note that due to performance consideration, custom callbacks cannot have batch
level callbacks overridden when used with `ParameterServerStrategy`. Please
modify your custom callbacks to make them epoch level calls, and adjust
`steps_per_epoch` to a suitable value. In addition, `steps_per_epoch` is a
required argument for `Model.fit` when used with `ParameterServerStrategy`.

In [8]:
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    #tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)

Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).


INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).


INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).


INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).


INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).


INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).


INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).


INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).


INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).


INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).


INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).


INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).


INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).


INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).


20/20 - 3s - loss: 0.4391


INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets


Epoch 2/5


20/20 - 0s - loss: 0.4103


INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets


Epoch 3/5
20/20 - 0s - loss: 0.3056




INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets




Epoch 4/5
20/20 - 0s - loss: 0.2992


INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets


Epoch 5/5
20/20 - 0s - loss: 0.2698


INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets


<tensorflow.python.keras.callbacks.History at 0x7fefe405c048>

### Direct usage with `ClusterCoordinator` (optional)

Even if you choose `Model.fit` training path, you can optionally instantiate a
`ClusterCoordinator` object to schedule other functions you would like to be
executed on the workers. See below
Training with Custom Training Loop
section for more details and examples.

## Training with Custom Training Loop

<a id="training_with_custom_training_loop"> </a>

Custom training loop with `tf.distribute.Strategy` 
provides great flexibility to define training loops. With the `ParameterServerStrategy` defined above, you will use a
`ClusterCoordinator` to dispatch the execution of training steps to remote
workers.


Then, you will create a model, define a dataset and a step function as we have
seen in the training loop with other `tf.distribute.Strategy`s. You can find
more details in this
[tutorial](https://www.tensorflow.org/tutorials/distribute/custom_training).

To ensure efficient dataset prefetching, use the recommended 
distributed dataset creation APIs mentioned in the
[Dispatch Training steps to remote workers](https://www.tensorflow.org/tutorials/distribute/parameter_server_training#dispatch_training_steps_to_remote_workers)
section below. Also, make sure to call `strategy.run` inside worker_fn 
to take full advantage of GPUs allocated on workers. Rest of the steps 
are the same for training with or without GPUs.

Let’s create these components in following steps:

### Setup the data
First, write a function that creates a dataset that includes preprocessing logic implemented by Keras preprocessing layers. We will create these layers outside the `dataset_fn` but apply the transformation inside the `dataset_fn` since you will wrap the `dataset_fn` into a `tf.function` which doesn't allow variables to be created inside it.

In [9]:
feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

Generate toy examples in a dataset:

In [10]:
def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Then we create the training dataset wrapped in a dataset_fn:

In [11]:
def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = # TODO: Your code goes here

### Build the model
Second, we create the model and other objects. Make sure to create all variables
under `strategy.scope`.

In [12]:
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  # TODO: Your code goes here

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

Let's confirm that the use of `FixedShardsPartitioner` split all variables into two shards and each shard was assigned to different parameter servers:

In [13]:
assert len(emb_layer.weights) == 2
#assert emb_layer.weights[0].shape == (4, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

### Define the training step
Third, create the training step wrapped into a `tf.function`:

In [14]:
@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

In the above step function, calling `strategy.run` and `strategy.reduce` in the
`step_fn` can support multiple GPUs per worker. If the workers have GPUs
allocated, `strategy.run` will distribute the datasets on multiple replicas.


### Dispatch training steps to remote workers
<a id="dispatch_training_steps_to_remote_workers"> </a>

After all the computations are defined by `ParameterServerStrategy`, we will use
the `ClusterCoordinator` class to create resources and distribute the training
steps to remote workers.

Let’s first create a `ClusterCoordinator` object and pass in the strategy
object:

In [15]:
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Then we create a per-worker dataset and an iterator. In the `per_worker_dataset_fn` below, wrapping the `dataset_fn` into
`strategy.distribute_datasets_from_function` is recommended to allow efficient
prefetching to GPUs seamlessly.

In [16]:
@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

 # TODO: Your code goes here



The final step is to distribute the computation to remote workers using `schedule`. The `schedule` method enqueues a `tf.function` and returns a future-like `RemoteValue` immediately. The queued functions will be dispatched to remote workers in background threads and the `RemoteValue` will be filled asynchronously. The `join` method can be used to wait until all scheduled functions are excuted.

In [17]:
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))

INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).


INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).


Finished epoch 0, accuracy is 0.818750.
Finished epoch 1, accuracy is 1.000000.
Finished epoch 2, accuracy is 1.000000.


Finished epoch 3, accuracy is 1.000000.


Here is how you can fetch the result of a `RemoteValue`:

In [18]:
# TODO: Your code goes here

Final loss is 0.004459


Alternatively, you can launch all steps and do something while waiting for
completion:

```Python
for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.
```

For the complete training and serving workflow for this particular example,
please check out this
[test](https://github.com/tensorflow/tensorflow/blob/master/tensorflow/python/keras/distribute/parameter_server_evaluation_test.py).


### More about dataset creation

The dataset in the above code is created using the `create_per_worker_dataset`
API. It creates one dataset per worker and returns a container object. You can
call `iter` method on it to create a per-worker iterator. The per-worker
iterator contains one iterator per worker and the corresponding slice of a
worker will be substituted in the input argument of the function passed to the
`schedule` method before the function is executed on a particular worker.

Currently the `schedule` method assumes workers are equivalent and thus assumes
the datasets on different workers are the same except they may be shuffled
differently if they contain a
[dataset.shuffle](https://www.tensorflow.org/api_docs/python/tf/data/Dataset#shuffle)
operation. Because of this, we also recommend the datasets to be repeated
indefinitely and schedule a finite number of steps instead of relying on the
`OutOfRangeError` from a dataset.

Another important note is that `tf.data` datasets don’t support implicit
serialization and deserialization across task boundaries. So it is important to
create the whole dataset inside the function passed to
`create_per_worker_dataset`.

## Evaluation

There are more than one way to define and run an evaluation loop in distributed training. Each has its own pros and cons as described below. The inline evaluation method is recommended if you don't have a preference.

### Inline evaluation

In this method the coordinator alternates between training and evaluation and thus we call it inline evaluation. There are several benefits of inline evaluation. For example, it can support large evaluation models and evaluation datasets that a single task cannot hold. For another example, the evaluation results can be used to make decisions for training next epoch.

There are two ways to implement inline evaluation:

- **Direct evaluation** - For small models and evaluation datasets the coordinator can run evaluation directly on the distributed model with the evaluation dataset on the coordinator:

In [19]:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())



Evaluation accuracy: 1.000000


- **Distributed evaluation** - For large models or datasets that are infeasible to run directly on the coordinator, the coordinator task can distribute evaluation tasks to the workers via the `schedule`/`join` methods:

In [20]:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())



Evaluation accuracy: 1.000000


Note: currently the `schedule`/`join` methods don’t support visitation guarantee or exactly-once semantics. In other words, there is no guarantee that all evaluation examples in a dataset will be evaluated exactly once; some may not be visited and some may be evaluated multiple times. Visitation guarantee on evaluation dataset is being worked on.

### Side-car evaluation

Another method is called side-car evaluation which is to create a dedicated evaluator task that repeatedly reads checkpoints and runs evaluation on a latest checkpoint. It allows your training program to finish early if you don't need to change your training loop based on evaluation results. However, it requires an additional evaluator task and periodic checkpointing to trigger evaluation. Following is a possible side-car evaluation loop:

```Python
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break
```

## Clusters in Real-world
<a id="real_clusters"></a>

Note: this section is not necessary for running the tutorial code in this page.

In a real production environment, you will run all tasks in different processes
on different machines. The simplest way to configure cluster information on each
task is to set "TF_CONFIG" environment variables and use a
`tf.distribute.cluster_resolver.TFConfigClusterResolver` to parse "TF_CONFIG".
For a general description about "TF_CONFIG" environment variables, please see
the [distributed training guide](https://www.tensorflow.org/guide/distributed_training#setting_up_tf_config_environment_variable).

If you start your training tasks using Kubernetes or other configuration templates, it is very likely that these templates have already set “TF_CONFIG” for you.

### Set “TF_CONFIG” environment variable

Suppose you have 3 workers and 2 parameter servers, the “TF_CONFIG” of worker 1
can be:

```Python
os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})
```

The “TF_CONFIG” of the evaluator can be:

```Python
os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})
```

The “cluster” part in the above “TF_CONFIG” string for the evaluator is
optional.

### If you use the same binary for all tasks

If you prefer to run all these tasks using a single binary, you will need to let
your program branch into different roles at the very beginning:

```Python
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.
```

The following code starts a TensorFlow server and waits:

```Python
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()
```

## Handling Task Failure

### Worker failure

`ClusterCoordinator` or `Model.fit` provides built-in fault tolerance for worker
failure. Upon worker recovery, the previously provided dataset function (either
to `create_per_worker_dataset` for CTL, or `DatasetCreator` for `Model.fit`)
will be invoked on the workers to re-create the datasets.

### Parameter server or coordinator failure

However, when the coordinator sees a parameter server error, it will raise an `UnavailableError` or `AbortedError` immediately. You can restart the coordinator in this case. The coordinator itself can also become unavailable. Therefore, certain tooling is recommended in order to not lose the training progress:

*   For `Model.fit`, you should use a `BackupAndRestore` callback, which handles
    the progress saving and restoration automatically. See
    [Callbacks and training](#callbacks-and-training) section above for an
    example.

*   For CTLs, you should checkpoint the model variables periodically and load
    model variables from a checkpoint, if any, before training starts. The
    training progress can be inferred approximately from `optimizer.iterations`
    if an optimizer is checkpointed:

```Python
checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()
```

### Fetching a `RemoteValue`

Fetching a `RemoteValue` is guaranteed to succeed if a function is executed
successfully. This is because currently the return value is immediately copied
to the coordinator after a function is executed. If there is any worker failure
during the copy, the function will be retried on another available worker.
Therefore, if you want to optimize for performance, you can schedule functions
without a return value.

## Error Reporting

Once the coordinator sees an error such as `UnavailableError` from parameter
servers or other application errors such as an `InvalidArgument` from
`tf.debugging.check_numerics`, it will cancel all pending and queued functions
before raising the error. Fetching their corresponding `RemoteValue`s will raise
a `CancelledError`.

After an error is raised, the coordinator will not raise the same error or any
error from cancelled functions.

## Performance Improvement

There are several possible reasons if you see performance issues when you train
with `ParameterServerStrategy` and `ClusterResolver`.

One common reason is parameter servers have unbalanced load and some
heavily-loaded parameter servers have reached capacity. There can also be
multiple root causes. Some simple methods to mitigate this issue are to

1.  shard your large model variables via specifying a `variable_partitioner`
    when constructing a `ParameterServerStrategy`.
2.  avoid creating a hotspot variable that is required by all parameter servers
    in a single step if possible. For example, use a constant learning rate
    or subclass `tf.keras.optimizers.schedules.LearningRateSchedule` in
    optimizers since the default behavior is that the learning rate will become
    a variable placed on a particular parameter server and requested by all
    other parameter servers in each step.
3.  shuffle your large vocabularies before passing them to Keras preprocessing
    layers.

Another possible reason for performance issues is the coordinator. Our first
implementation of `schedule`/`join` is Python-based and thus may have threading
overhead. Also the latency between the coordinator and the workers can be large.
If this is the case,

*   For `Model.fit`, you can set `steps_per_execution` argument provided at
    `Model.compile` to a value larger than 1.

*   For CTLs, you can pack multiple steps into a single `tf.function`:

```
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))
```

As we continue to optimize the library, we hope most users don’t have to
manually pack steps in the future.

In addition, a small trick for performance improvement is to schedule functions
without a return value as explained in the handling task failure section above.

## Known Limitations

Most of the known limitations are covered in above sections. This section
provides a summary.

### `ParameterServerStrategy` general

* `os.environment["grpc_fail_fast"]="use_caller"` is needed on every task, including the coordinator, to make fault tolerance work properly. 
* Synchronous parameter server training is not supported.
* It is usually necessary to pack multiple steps into a single function to achieve optimal performance.
* It is not supported to load a saved_model via `tf.saved_model.load` containing sharded variables. Note loading such a saved_model using TensorFlow Serving is expected to work.
* It is not supported to load a checkpoint containg sharded optimizer slot variables into a different number of shards.
* It is not supported to recover from parameter server failure without restarting the coordinator task.

### `Model.fit` specifics

*   `steps_per_epoch` argument is required in `Model.fit`. You can select a
    value that provides appropriate intervals in an epoch.
*   `ParameterServerStrategy` does not have support for custom callbacks that
    have batch-level calls for performance reason. You should convert those
    calls into epoch-level calls with suitably picked `steps_per_epoch`, so that
    they are called every `steps_per_epoch` number of steps. Built-in callbacks
    are not affected: their batch-level calls have been modified to be
    performant. Supporting batch-level calls for `ParameterServerStrategy` is
    being planned.
*   For the same reason, unlike other strategies, progress bar and metrics are
    logged only at epoch boundaries.
*   Input for `Model.fit` only takes the type `DatasetCreator`.
*   `run_eagerly` is not supported.
*   Evaluation in `Model.fit` is not yet supported. This is one of the
    priorities.
*   `Model.evaluate` and `Model.predict` are not yet supported.

### Custom Training Loop specifics

*   `ClusterCoordinator.schedule` doesn't support visitation guarantees for a dataset.
*   When `ClusterCoordinator.create_per_worker_dataset` is used, the whole dataset must be created inside the function passed to it.
*   `tf.data.Options` is ignored in dataset created by `ClusterCoordinator.create_per_worker_dataset`.