##### Copyright 2021 The TensorFlow Authors.

In [None]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Migration examples: Multi-worker training with CPU/GPU

<table class="tfo-notebook-buttons" align="left">
  <td>
    <a target="_blank" href="https://www.tensorflow.org/guide/migrate/multi_worker_cpu_gpu_training">
    <img src="https://www.tensorflow.org/images/tf_logo_32px.png" />
    View on TensorFlow.org</a>
  </td>
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/guide/migrate/multi_worker_cpu_gpu_training.ipynb">
    <img src="https://www.tensorflow.org/images/colab_logo_32px.png" />
    Run in Google Colab</a>
  </td>
  <td>
    <a target="_blank" href="https://github.com/tensorflow/docs/blob/master/site/en/guide/migrate/multi_worker_cpu_gpu_training.ipynb">
    <img src="https://www.tensorflow.org/images/GitHub-Mark-32px.png" />
    View source on GitHub</a>
  </td>
  <td>
    <a href="https://storage.googleapis.com/tensorflow_docs/docs/site/en/guide/migrate/multi_worker_cpu_gpu_training.ipynb"><img src="https://www.tensorflow.org/images/download_logo_32px.png" />Download notebook</a>
  </td>
</table>

Multi-worker distributed training has been traditionally done in TF1 via `tf.estimator.train_and_evaluate` API with a `tf.estimator.Estimator`. In TF2, it is recommended you write models with metrics, losses, and optimizers in Keras, and distribute them across multiple workers with `Strategy`s found in `tf.distribute`. When it comes to multi-worker training with CPU/GPU, `tf.distribute.experimental.ParameterServerStrategy` and `tf.distribute.MultiWorkerMirroredStrategy` are the strategies of choice, and this guide demonstrates how TF1 distributed training code can migrate to TF2.

For more information about other `Strategy`s including when to use what, please refer to [Distributed Training with Tensorflow guide](https://www.tensorflow.org/guide/distributed_training).

## Setup

Start with a couple of necessary install and TensorFlow imports:

In [None]:
!pip install portpicker
import tensorflow as tf
import tensorflow.compat.v1 as tf1

and prepare some simple data for demonstration:

In [None]:
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]

The environment variable `TF_CONFIG` is required in TF1 to specify the cluster, and the tasks' addresses. 

In [None]:
import json
import os

tf_config = {
    'cluster': {
        'chief': ['localhost:11111'],
        'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
        'ps': ['localhost:12121', 'localhost:13131'],
    },
    'task': {'type': 'chief', 'index': 0}
}

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Unfortunately, since TF1 multi-worker training with `Estimator` requires multiple  clients, which would be especially tricky to be done here in a notebook, you will make the notebook be runnable without a `TF_CONFIG` so it falls back to local training. Note that regular TF1 multi-worker training does use the `TF_CONFIG` without the following `del` this notebook performs here.

In [None]:
del os.environ['TF_CONFIG']

### TF1: Using `tf.estimator.train_and_evaluate`

This code snippet demonstrates the canonical workflow of multi-worker training in TF1: you will use a `tf.estimator.Estimator`, a `tf.estimator.TrainSpec`, a `tf.estimator.EvalSpec`, and the `tf.estimator.train_and_evaluate` API to distribute the training.

In [None]:
def _input_fn():
  return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)

def _eval_input_fn():
  return tf1.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).batch(1)

def _model_fn(features, labels, mode):
  logits = tf1.layers.Dense(1)(features)
  loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
  optimizer = tf1.train.AdagradOptimizer(0.05)
  train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
  return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

### TF2: Keras training API with `tf.distribute.Strategy`.

In TF2, computation is distributed via `tf.distribute.Strategy`s. In this example, you will see two `Strategy`s: `tf.distribute.experimental.ParameterServerStrategy` and `tf.distribute.MultiWorkerMirroredStrategy`, as these two strategies have been designed for CPU/GPU training with multiple workers.

`ParameterServerStrategy` employs a coordinator which make it more friendly with the environment this notebook is in, and you will be using some utilities here to set up the supporting elements essential for a runnable experience here: you will create a so-called "in-process cluster", where threads are used to simulate the parameter servers (PS) and workers. 

For more information about parameter server training, please refer to [Parameter Server Training guide](https://www.tensorflow.org/tutorials/distribute/parameter_server_training) 

Here, you will first define the `TF_CONFIG` environment variable with a `tf.distribute.cluster_resolver.TFConfigClusterResolver` to provide the cluster information.


In [None]:
# Find ports that are available for chief (the coordinator), workers, and PS.
import portpicker
chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]

# Dump the cluster information to `TF_CONFIG`.
tf_config = {
    'cluster': {
        'chief': ["localhost:%s" % chief_port],
        'worker': ["localhost:%s" % port for port in worker_ports],
        'ps':  ["localhost:%s" % port for port in ps_ports],
    },
    'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)

# Use a cluster resolver to bridge the info to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()

Then, you will create those servers for the workers and PS one-by-one, and end up creating a `tf.distribute.experimental.ParameterServerStrategy`.

In [None]:
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4

for i in range(3):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="worker",
      task_index=i,
      config=worker_config)

for i in range(2):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="ps",
      task_index=i)

Note that in real distributed training, instead of starting all the `tf.distribute.Server`s on the coordinator, you will use multiple machines, and the ones that are designated as worker and PS will each run a `Server`. 

With everything ready, you will now create the `ParameterServerStrategy` object.

In [None]:
strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)

Once you define a strategy object, you will create the model, optimizer, and other variables, as well as calling `Model.compile` within the `strategy.scope` API in order to distribute the training. Please refer to [`Strategy.scope` guide](https://www.tensorflow.org/api_docs/python/tf/distribute/Strategy#scope) for more information about this API, and [Distributed input guide](https://www.tensorflow.org/tutorials/distribute/input) for how the dataset function below is set up. If you prefer to define the details including the actual forward/backward passes, please see [parameter server training guide](https://www.tensorflow.org/tutorials/distribute/parameter_server_training) for more information.


In [None]:
def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size=64)
  dataset = tf.data.Dataset.from_tensor_slices(
      (features, labels)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines, input_context.input_pipeline_id)
  return dataset.batch(batch_size).prefetch(2)

eval_dataset = tf.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).batch(1)

x = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
  optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
  model.compile(optimizer, "mse")

model.fit(x, epochs=5, steps_per_epoch=10)

Alternatively, a `MultiWorkerMirroredStrategy` object can be used:

In [None]:
del os.environ['TF_CONFIG']  # To clean up the `TF_CONFIG` used for PS strategy.
strategy = tf.distribute.MultiWorkerMirroredStrategy()

And you can replace the strategy used above with this `MultiWorkerMirroredStrategy` object to perform a training with this strategy. 

Unfortunately, as with estimator, since `MultiWorkerMirroredStrategy` is a multi-client strategy, there is not an easy way to run distributed training in this notebook, so replacing code above with this strategy ends up running things locally. For an experience running `MultiWorkerMirroredStrategy` distributedly in a colab, please refer to [Multi-worker training with Keras](https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras) guide.