<a href="https://colab.research.google.com/github/adhadse/colab_repo/blob/master/homl/Ch%2019%20Training%20and%20Deploying%20TensorFlow%20Models%20at%20Scale.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Chapter 19: Training and Deploying TensorFlow  Models at Scale
This work is partialy combined text and code from the book [Hands-On Machine Learning with Scikit-Learn, Keras, and TensorFlow, 2nd Edition](https://www.oreilly.com/library/view/hands-on-machine-learning/9781492032632/) is only supposed to be used as reference and is recommended to follow along with a copy of the Book purchased.

In [1]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import os

#Serving a TensorFlow Model


## Using TensorFlow Serving
TF Serving is a very efficient, model server written in C++ which can serve multiple model versions and even autometically and gracefully switch to newer version when found.

### Exporting SavedModels
To use TF Serving we first need to export our models to *SavedModel format* using `tf.saved_model.save()`.

In [2]:
"""
  First let's create the model
"""
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()
X_train_full = X_train_full[..., np.newaxis].astype(np.float32) / 255.
X_test = X_test[..., np.newaxis].astype(np.float32) / 255.

X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
X_new = X_test[:3]

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


In [None]:
model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(300, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])

In [None]:
model.compile(loss="sparse_categorical_crossentropy",
              optimizer="sgd",
              metrics=["accuracy"])
model.fit(X_train, y_train,
          epochs=10, 
          validation_data=(X_valid, y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f01242c4750>

In [None]:
np.round(model.predict(X_new), 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.98, 0.02, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.98, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ]],
      dtype=float32)

In [None]:
model_version = "0001"
model_name = "the_mnist_model"
model_path = os.path.join(model_name, model_version)
tf.saved_model.save(model, model_path)


FOR DEVS: If you are overwriting _tracking_metadata in your class, this property has been used to save metadata in the SavedModel. The metadta field will be deprecated soon, so please move the metadata to a different file.
INFO:tensorflow:Assets written to: the_mnist_model/0001/assets


The SavedModel directory has:
- `saved_model.pb` representing the computation graph.
- *variables* subdirectory contaning the variables values.
- *assets* subdirectory containing additional data, such as vocabulary  files,. etc.

To load this model instead of using `tf.keras.models.load_model` use `tf.saved_model.load()` and set the `training` to be `False`.

In [None]:
!saved_model_cli show --dir {model_path}

The given SavedModel contains the following tag-sets:
serve


A SavedModel contains one or more *metagraphs* each identified by a set of tages.

<mark>A metagraph is a computation graph plus  some function signature definations (including their input and output names, types, and shapes).</mark>

In [None]:
!saved_model_cli show --dir {model_path} --tag_set serve

The given SavedModel MetaGraphDef contains SignatureDefs with the following keys:
SignatureDef key: "__saved_model_init_op"
SignatureDef key: "serving_default"


In [None]:
!saved_model_cli show --dir {model_path} --tag_set serve\
                      --signature_def serving_default

The given SavedModel SignatureDef contains the following input(s):
  inputs['flatten_2_input'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 28, 28, 1)
      name: serving_default_flatten_2_input:0
The given SavedModel SignatureDef contains the following output(s):
  outputs['dense_5'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 10)
      name: StatefulPartitionedCall:0
Method name is: tensorflow/serving/predict


In [None]:
!saved_model_cli show --dir {model_path} --all


MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['__saved_model_init_op']:
  The given SavedModel SignatureDef contains the following input(s):
  The given SavedModel SignatureDef contains the following output(s):
    outputs['__saved_model_init_op'] tensor_info:
        dtype: DT_INVALID
        shape: unknown_rank
        name: NoOp
  Method name is: 

signature_def['serving_default']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['flatten_2_input'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 28, 28, 1)
        name: serving_default_flatten_2_input:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['dense_5'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 10)
        name: StatefulPartitionedCall:0
  Method name is: tensorflow/serving/predict
W0619 11:36:01.354573 140139511453568 deprecation.py:506] From /usr/local/lib/python2.7/dist-packages/tensorflow_co

Let's write the new instances to a `npy` file so we can pass them easily to our model:

In [None]:
np.save("my_mnist_tests.npy", X_new)

input_name = model.input_names[0]
input_name

'flatten_2_input'

In [None]:
!saved_model_cli run --dir {model_path} --tag_set serve \
                     --signature_def serving_default \
                     --inputs {input_name}="my_mnist_tests.npy"

2021-06-19 11:40:40.907974: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcuda.so.1
2021-06-19 11:40:40.918655: E tensorflow/stream_executor/cuda/cuda_driver.cc:351] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-19 11:40:40.918711: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (004b804ae975): /proc/driver/nvidia/version does not exist
2021-06-19 11:40:40.927808: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2299995000 Hz
2021-06-19 11:40:40.928141: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x562fffb72a00 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2021-06-19 11:40:40.928180: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
W0619 11:40:40.934154 139733596084096 deprecation.py:323]

In [None]:
np.round([[1.1348874e-04, 3.4772970e-07, 6.5677706e-04, 2.6318526e-03, 2.2479003e-06,
           4.3850094e-05, 5.6311748e-08, 9.9602258e-01, 2.4540455e-05, 5.0423446e-04],
          [4.2043632e-04, 6.4254651e-05, 9.8750740e-01, 8.0940463e-03, 3.2317416e-08,
           5.3414190e-04, 2.5793216e-03, 1.1070003e-08, 8.0020179e-04, 1.6184826e-08],
          [1.9224688e-05, 9.7866690e-01, 7.8524482e-03, 1.9809359e-03, 5.7935639e-04,
           1.1971863e-03, 1.0196721e-03, 6.2796283e-03, 2.1492112e-03, 2.5542409e-04]], 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.98, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

### Installing TensorFlow Serving

First set up the repository.

Install [Docker](https://docs.docker.com/install/) if you don't have it already. Then run:

```bash
docker pull tensorflow/serving

export ML_PATH=$HOME/ml # or wherever this project is
docker run -it --rm -p 8500:8500 -p 8501:8501 \
   -v "$ML_PATH/my_mnist_model:/models/my_mnist_model" \
   -e MODEL_NAME=my_mnist_model \
   tensorflow/serving
```
Once you are finished using it, press Ctrl-C to shut down the server.

Alternatively, if `tensorflow_model_server` is installed (e.g., if you are running this notebook in Colab), then the following 3 cells will start the server:

In [None]:
os.environ["MODEL_DIR"] = os.path.split(os.path.abspath(model_path))[0]
os.environ["MODEL_DIR"]

'/content/the_mnist_model'

In [None]:
%%bash --bg
nohup tensorflow_model_server \
      --rest_api_port=8501 \
      --model_name="the_mnist_model" \
      --model_base_path="${MODEL_DIR}" > server.log 2>&1

Starting job # 0 in a separate thread.


In [None]:
!sudo apt-get remove tensorflow-model-server

Reading package lists... Done
Building dependency tree       
Reading state information... Done
E: Unable to locate package tensorflow-model-server


In [None]:
!tail server.log

nohup: failed to run command 'tensorflow_model_server': No such file or directory


In [None]:
!echo "deb [arch=amd64] http://storage.googleapis.com/tensorflow-serving-apt stable tensorflow-model-server tensorflow-model-server-universal" | sudo tee /etc/apt/sources.list.d/tensorflow-serving.list && \
!curl https://storage.googleapis.com/tensorflow-serving-apt/tensorflow-serving.release.pub.gpg | sudo apt-key add -

deb [arch=amd64] http://storage.googleapis.com/tensorflow-serving-apt stable tensorflow-model-server tensorflow-model-server-universal
/bin/bash: !curl: command not found
gpg: no valid OpenPGP data found.


In [None]:
!apt-get update
!apt-get install ca-certificates

0% [Working]            Get:1 http://storage.googleapis.com/tensorflow-serving-apt stable InRelease [3,012 B]
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.180% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.180% [1 InRelease gpgv 3,012 B] [Connecting to archive.ubuntu.com] [Waiting for h                                                                               Hit:2 https://download.docker.com/linux/ubuntu bionic InRelease
0% [1 InRelease gpgv 3,012 B] [Connecting to archive.ubuntu.com] [Waiting for h                                                                               Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [1 InRelease gpgv 3,012 B] [Connecting to archive.ubuntu.com] [3 InRelease 1                                                                               Hit:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:5 https://developer.down

In [None]:
!sudo apt-get install tensorflow-model-server

0% [Working]            Get:1 http://storage.googleapis.com/tensorflow-serving-apt stable InRelease [3,012 B]
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.180% [1 InRelease gpgv 3,012 B] [Connecting to archive.ubuntu.com] [Connecting to                                                                               Hit:2 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [1 InRelease gpgv 3,012 B] [Connecting to archive.ubuntu.com] [Connected to                                                                                Hit:3 https://download.docker.com/linux/ubuntu bionic InRelease
0% [1 InRelease gpgv 3,012 B] [Connecting to archive.ubuntu.com] [Waiting for h                                                                               Hit:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [1 InRelease gpgv 3,012 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                     

### Querying TF Serving through the REST API
First we'll create the query.

In [None]:
import json

input_data_json = json.dumps({
    "signature_name": "serving_default",
    "instances": X_new.tolist(),
})

In [None]:
input_data_json

And then we can send the input data to TF Serving by sending an HTTP POST request.

In [None]:
import requests

SERVER_URL = "http://localhost:8501/v1/models/the_mnist_model:predict"
response = requests.post(SERVER_URL, data=input_data_json)
response.raise_for_status() # raise an exception in case of error
response = response.json()

In [None]:
y_proba = np.array(response["predictions"])
y_proba.round(2)

### Serving TF Serving through the gRPC API
The gRPC API expects a serialized `PredictRequest` protocol buffer  as input and it outputs a serialized `PredictResponse` protocol buffer. 

In [None]:
!pip install tensorflow-serving-api

Collecting tensorflow-serving-api
  Downloading https://files.pythonhosted.org/packages/27/a6/a534deae4086c0fef9a77537a4779bb5a9dd8841f8d347c509c96b342b2e/tensorflow_serving_api-2.5.1-py2.py3-none-any.whl
Installing collected packages: tensorflow-serving-api
Successfully installed tensorflow-serving-api-2.5.1


In [None]:
from tensorflow_serving.apis.predict_pb2 import PredictRequest

request = PredictRequest()
request.model_spec.name = model_name
request.model_spec.signature_name = "serving_default"
input_name = model.input_names[0]
request.inputs[input_name].CopyFrom(tf.make_tensor_proto(X_new))

Now we are ready to request the server and get the server. For this we need to install `grpcio` library

In [None]:
!pip install grpcio



In [None]:
import grpc
from tensorflow_serving.apis import prediction_service_pb2_grpc

channel = grpc.insecure_channel("localhost:8500")
predict_service = prediction_service_pb2_grpc.PredictionServiceStub(channel)
response = predict_service.Predict(request, timeout=10.0)

In [None]:
# convert the PredictReponse protocol buffer to a tensor
output_name = model.output_names[0]
output_proto = reponse.outputs[output_name]
y_proba = tf.make_ndarray(output_proto)

y_proba.numpy.round(2)

### Deploying a new model version

In [None]:
model_version = "0002"
model_name = "the_mnist_model"
model_path = os.path.join(model_name, model_version)
tf.saved_model.save(model, model_path)


FOR DEVS: If you are overwriting _tracking_metadata in your class, this property has been used to save metadata in the SavedModel. The metadta field will be deprecated soon, so please move the metadata to a different file.
INFO:tensorflow:Assets written to: the_mnist_model/0002/assets


# Deploy the model to Google Cloud AI Platform

Follow the instructions in the book to deploy the model to Google Cloud AI Platform, download the service account's private key and save it to the `my_service_account_private_key.json` in the project directory. Also, update the `project_id`:

In [None]:
import googleapiclient.discovery

project_id = "onyx-smoke-242003" # change this to your project ID
model_id = "the_mnist_model"
model_path = 'prjoject/{}/models/{}'
ml_resource = googleapiclient.discovery.build("ml", "v1").projects()

In [None]:
def predict(X):
  input_data_json = {"signature_name": "serving_default",
                     "instances": X.tolist()}
  request = ml_resource.predict(name=model_path, body=input_data_json)
  response = request.execute()
  if "error" in response:
    raise RuntimeError(response["error"])
  return np.array([pred[output_name] for pred in response["predictions"]])

y_probas = predict(X_new)
np.round(y_probas, 2)

# Using GPUs to Speed Up Computations


## Getting Your Own GPU
TensorFlow currently only supports Nvidia cards with CUDA (Compute Unified Device Architecture) library  compute capabilities. 

In [None]:
!nvidia-smi

Sat Jun 19 15:38:15 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 465.27       Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   60C    P8    11W /  70W |      0MiB / 15109MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [None]:
tf.config.list_physical_devices("GPU")

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

In [None]:
tf.test.gpu_device_name()

'/device:GPU:0'

### Splitting the GPU into virtual devices 
Split the GPU to 4 virtual GPUs with 3777 GiB of RAM each. 

In [3]:
"""
Do so before using TensorFlow
"""
physical_gpus = tf.config.list_physical_devices("GPU")

tf.config.experimental.set_virtual_device_configuration(
    physical_gpus[0],
    [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=3777),
     tf.config.experimental.VirtualDeviceConfiguration(memory_limit=3777),
     tf.config.experimental.VirtualDeviceConfiguration(memory_limit=3777),
     tf.config.experimental.VirtualDeviceConfiguration(memory_limit=3777),]
)

In [4]:
tf.config.list_logical_devices("GPU")

[LogicalDevice(name='/device:GPU:0', device_type='GPU'),
 LogicalDevice(name='/device:GPU:1', device_type='GPU'),
 LogicalDevice(name='/device:GPU:2', device_type='GPU'),
 LogicalDevice(name='/device:GPU:3', device_type='GPU')]

## Managing the GPU RAM


### Assigning a GPU to single dedicated process
If we have multiple GPU cards on our machine, a simple solution is to assign each of them to a single process.

We can set `CUDA_VISIBLE_DEVICES` environment variable so that each process only sees the appropriate GPU card(s). Also set the `CUDA_DEVICE_ORDER` environment variable  to `PCI_BUS_ID` to ensure that each ID always refers to the same card.

In [None]:
!CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES=0,1 python 3 program_1.py

In [None]:
!CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES=2,3 python 3 program_2.py

### Tell TensorFlow to grab only specific amount of GPU RAM

In [None]:
"""
Do immendiately after importing TF
"""
for gpu in tf.config.experimental.list_physical_devices("GPU"):
  tf.config.experimental.set_virtual_device_configuration(
      gpu,
      [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048)]
  )

### Tell TF to grab memory only when it needs it


In [3]:
for gpu in tf.config.experimental.list_physical_devices("GPU"):
  tf.config.experimental.set_memory_growth(gpu, True)

### Placing Operations and Variables on Devices
`tf.keras` and `tf.data` generally put heavy computation on the GPU, and the data preprocessing on the CPU and are able to good at doing that.

But we can also force to place some operations and variables on each device, if we want more control.

A tensor or variable's `device` attribute  tell where it is placed.

In [4]:
a = tf.Variable(42.0)
a.device

'/job:localhost/replica:0/task:0/device:GPU:0'

In [5]:
b = tf.Variable(42)
b.device

'/job:localhost/replica:0/task:0/device:CPU:0'

Use `device()` context to place an operation on a different device than the default one.

In [7]:
with tf.device("/cpu:0"):
  c = tf.Variable(42.0)

c.device

'/job:localhost/replica:0/task:0/device:CPU:0'

# Training Models Across Multiple Devices
There are two main approaches to this:
- *Model Parallelism*: the model is plit across the devices. Mostly of the time not doable.
- *Data Parallelism*: the model is replicated across every device and each replica is trained on a subset of the data.

## Data Parallelism
The NN is replicated on every device and each training step is ran simulatneously on all replicas using different mini-batach for each. The gradients from the replicas are averaged and used for updating the model parameters.

### Data Parallelism using the mirrored strategy
- Mirrors all the model parameters across all the GPUs and always  apply the exact same parameter updates on every GPU. 
- Imposes synchronous weight updates across all GPUs. That means if there is a slow GPU all other "fast" GPUs have to wait till that finishes to get the average computed and for next training step to begin.

### Data Parallelism with centralized parameters.
- The model parameters are stored outside of the GPU devices performing the computation (the *workers*) and plaecs on one or more  CPU-only servers called *parameter servers*.
- This approach allows either synchronous or asynchronours updates.

### Synchronous updates
The aggregator waits until all gradients are available before it computes the average gradients and passes them to the optimizer.

The downside being the aggregator will have to wait for the slower replicas until it finish its training step. This problem can be eliminited by simpy ignorng the gradients from the slowest few replicas.

### Asynchronous updates
Whenever a replica has finished computing the gradients, it immediately uses them to update the model parameters. There is aggregation and no synchronization.



### Bandwidth saturation
Whether we use synchronous or asynchronous updates, with data parallelism with centralized parameters or with mirrored strategy there always comes a pont where adding an extra GPU will not improve performance at all b/c the time spent moving the data into and out of GPU RAM will outweigh the speedup obtained by splitting the computation load.

To reduce the saturation problem, we should 
- try to use fewer GPUs rather than plenty of weak GPUs. 
- Try droppingg the float precision from 32 Bits (`tf.float32`) to 16 bits (`tf.bfloat16`).
- And if we are using centralized parameters, we should split the parameters acrross multiple parameters servers.

## Trainign at Scale Using Distributin Strategies API

**The Mirrored strategy**


In [6]:
distribution = tf.distribute.MirroredStrategy()

with distribution.scope():
  mirrored_model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(300, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
  ])
  mirrored_model.compile(loss="sparse_categorical_crossentropy",
                optimizer="sgd",
                metrics=["accuracy"])
  
batch_size = 100 # must be divisible by the number of replicas
history = mirrored_model.fit(X_train, y_train,
                             epochs=10)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 1/10
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:GPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1'

To load a model and run it on all available devices, we must call `keras.models.load_model()` withing a distribution context like this:

In [None]:
with distribution.scope():
  mirrored_model = keras.models.load_model("the_mnist_model.h5")

To use a subset of available GPUs:

In [9]:
distribution = tf.distribute.MirroredStrategy(["/gpu:0", "/gpu:1"])

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')


**Data parallelism with centralized parameters**

In [10]:
distribution = tf.distribute.experimental.CentralStorageStrategy()

INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3'], variable_device = '/device:CPU:0'


### Training a Model on a TensorFlow Cluster
A TensorFlow Cluster is a group of TensorFlow process running in parallel, usually on different machines, and talking to each other to complete some work.
 
<mark>Each TF process in the cluster is called a *task* or a *TF server*.</mark>

A TF server or a task has:
- IP address,
- a port
- and a type (or *role* or *job*) which can either be:
  - `worker` performs computation, usually on a machine with one or more GPUs.
  - `chief` performs computations as well, but it handles extra work such as writing TensorBoard logs or saving checkpoints. There is only one chief in a cluster and if no chief is specified, the first worker is the chief.
  - `ps` or *parameter server* which keeps tack of variable values, and it is ususally on a CPU-only machine.
  - `evaluator` takes care of evaluation.

To create a TensorFlow cluster, we must define cluster specification which is dictionary with one key per job and the values are lists of task addreses. (IP:*port*).

In [1]:
cluster_spec = {
    "worker": [
        "machine-a-example.com:2222",    # /job:worker/task:0
        "machine-b-example.com:2222"     # /job:worker/task:1
    ],
    "ps": ["machine-a.example.com:2221"] # /job:ps/task:0
}

To start a task we need to set the `TF_CONFIG` environment variable for TF Server before starting TensorFlow. It must be JSON encoded dictionary containing:
- cluster specification under the `cluster` key.
- the type and the index of the current task under the `task` key.

In [None]:
import os
import json

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": cluster_spec,
    "task": {"type": "worker", "index": 0}
})

Then run the following training code on every worker:

In [None]:
distribution = tf.distribute.experimental.MultiWorkerMirroredStrategy()

with distribution.scope():
  mirrored_model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(300, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
  ])
  mirrored_model.compile(loss="sparse_categorical_crossentropy",
              optimizer="sgd",
              metrics=["accuracy"])

bath_size = 100 # must be divisible by the number of replicas
history = mirrored_model.fit(X_train, y_train, epochs=10)