# Chapter 19 – Training and Deploying TensorFlow Models at Scale
[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/alirezatheh/handson-ml3-notes/blob/main/notebooks/19_training_and_deploying_at_scale.ipynb)
[![Open in Kaggle](https://kaggle.com/static/images/open-in-kaggle.svg)](https://kaggle.com/kernels/welcome?src=https://github.com/alirezatheh/handson-ml3-notes/blob/main/notebooks/19_training_and_deploying_at_scale.ipynb)

**Note**: An *A/B experiment* consists in testing two different versions of our product on different subsets of users in order to check which version works best and get other insights.

## Serving a TensorFlow Model
As our infrastructure grows, there comes a point where it is preferable to wrap our model in a small service whose sole role is to make predictions and have the rest of the infrastructure query it (e.g., via a REST or gRPC API).

This decouples our model from the rest of the infrastructure, making it possible to easily switch model versions or scale the service up as needed (independently from the rest of our infrastructure), perform A/B experiments, and ensure that all our software components rely on the same model versions. It also simplifies testing and development, and more.

### Using TensorFlow Serving
Let’s deploy a trained MNIST model using Keras to TF Serving.

#### Exporting SavedModels
To version the model, we just need to create a subdirectory for each model version.

**Note**: If running on Colab or Kaggle, we need to install the Google AI Platform client library, which will be used later in this notebook. We can ignore the warnings about version incompatibilities.

**Warning**: On Colab, we must restart the Runtime after the installation, and continue with the next cells.

In [1]:
import sys

if 'google.colab' in sys.modules or 'kaggle_secrets' in sys.modules:
    %pip install -q -U google-cloud-aiplatform

**Warning**: This chapter discusses how to run or train a model on one or more 
GPUs, so let’s make sure there’s at least one, or else issue a warning:

In [2]:
import tensorflow as tf

if not tf.config.list_physical_devices('GPU'):
    print('No GPU was detected. Neural nets can be very slow without a GPU.')
    if 'google.colab' in sys.modules:
        print(
            'Go to Runtime > Change runtime and select a GPU hardware '
            'accelerator.'
        )
    if 'kaggle_secrets' in sys.modules:
        print('Go to Settings > Accelerator and select GPU.')

Let’s load the MNIST dataset, scale it, and split it:

In [3]:
from pathlib import Path

import keras

# Load and split the MNIST dataset
mnist = keras.datasets.mnist.load_data()
(X_train_full, y_train_full), (X_test, y_test) = mnist
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]

# Build and train an MNIST model (also handles image preprocessing)
keras.utils.set_random_seed(42)
keras.backend.clear_session()
model = keras.Sequential(
    [
        keras.layers.Flatten(input_shape=[28, 28], dtype=tf.uint8),
        keras.layers.Rescaling(scale=1 / 255),
        keras.layers.Dense(100, activation='relu'),
        keras.layers.Dense(10, activation='softmax'),
    ]
)
model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer=keras.optimizers.SGD(learning_rate=1e-2),
    metrics=['accuracy'],
)
model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

model_name = 'my_mnist_model'
model_version = '0001'
model_path = Path(model_name) / model_version
model.save(model_path, save_format='tf')

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
INFO:tensorflow:Assets written to: my_mnist_model/0001/assets


**Warning**: Since a SavedModel saves the computation graph, it can only be used with models that are based exclusively on TensorFlow operations, excluding the `tf.py_function()` operation, which wraps arbitrary Python code.

Let’s take a look at the file tree:

In [4]:
sorted([str(path) for path in model_path.parent.glob('**/*')])

['my_mnist_model/0001',
 'my_mnist_model/0001/assets',
 'my_mnist_model/0001/keras_metadata.pb',
 'my_mnist_model/0001/saved_model.pb',
 'my_mnist_model/0001/variables',
 'my_mnist_model/0001/variables/variables.data-00000-of-00001',
 'my_mnist_model/0001/variables/variables.index']

TensorFlow comes with a small `saved_model_cli` command-line interface to inspect SavedModels. Let’s inspect the SavedModel:

In [5]:
!saved_model_cli show --dir '{model_path}'

The given SavedModel contains the following tag-sets:
'serve'


A SavedModel contains one or more *metagraphs*. A metagraph is a computation graph plus some function signature definitions, including their input and output names, types, and shapes. Each metagraph is identified by a set of tags. e.g. we may want to have a metagraph containing the full computation graph, including the training operations: we would typically tag this one as `'train'`. And we might have another metagraph containing a pruned computation graph with only the prediction operations, including some GPU-specific operations: this one might be tagged as `'serve'`, `'gpu'`. This can be done using TensorFlow’s low-level [SavedModel API](https://homl.info/savedmodel). However, when we save a Keras model using its `save()` method, it saves a single metagraph tagged as `'serve'`. Let’s inspect this `'serve'` tag set:

In [6]:
!saved_model_cli show --dir '{model_path}' --tag_set serve

The given SavedModel MetaGraphDef contains SignatureDefs with the following keys:
SignatureDef key: "__saved_model_init_op"
SignatureDef key: "serving_default"


This metagraph contains two signature definitions:
- An initialization function called `'__saved_model_init_op'`, which we do not need to worry about.
- A default serving function called `'serving_default'`. When saving a Keras model, the default serving function is the model’s `call()` method. Let’s get more details about this serving function:

In [7]:
!saved_model_cli show \
    --dir '{model_path}' \
    --tag_set serve \
    --signature_def serving_default

The given SavedModel SignatureDef contains the following input(s):
  inputs['flatten_input'] tensor_info:
      dtype: DT_UINT8
      shape: (-1, 28, 28)
      name: serving_default_flatten_input:0
The given SavedModel SignatureDef contains the following output(s):
  outputs['dense_1'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 10)
      name: StatefulPartitionedCall:0
Method name is: tensorflow/serving/predict


These correspond to the Keras model’s input and output layer names. For even more details, we can run the following command:
```ipython
!saved_model_cli show --dir '{model_path}' --all
```

#### Installing and Starting TensorFlow Serving
There are many ways to install TF Serving:
- Using the system’s package manager
- Using a Docker image
- Installing from source
- And more

**Note**: Docker allows us to easily download a set of applications packaged in a *Docker image* (including all their dependencies and usually some good default configuration) and then run them on our system using a *Docker engine*. When we run an image, the engine creates a *Docker container* that keeps the applications well isolated from our own system, but we can give it some limited access if we want. It is similar to a virtual machine, but much faster and lighter, as the container relies directly on the host’s kernel. This means that the image does not need to include or run its own kernel.

Since Colab runs on Ubuntu, we can use Ubuntu’s apt package manager like this:

In [8]:
if 'google.colab' in sys.modules or 'kaggle_secrets' in sys.modules:
    url = 'https://storage.googleapis.com/tensorflow-serving-apt'
    src = 'stable tensorflow-model-server tensorflow-model-server-universal'
    !echo 'deb {url} {src}' > /etc/apt/sources.list.d/tensorflow-serving.list
    !curl '{url}/tensorflow-serving.release.pub.gpg' | apt-key add -
    !apt update -q && apt-get install -y tensorflow-model-server
    %pip install -q -U tensorflow-serving-api

This code:
- Starts by adding TensorFlow’s package repository to Ubuntu’s list of package sources. 
- Then it downloads TensorFlow’s public GPG key and adds it to the package manager’s key list so it can verify TensorFlow’s package 
signatures.
- Next, it uses apt to install the `tensorflow-model-server` package. 
- Lastly, it installs the `tensorflow-serving-api` library, which we will need to communicate with the server.

The following 2 cells will start the server. If our OS is Windows, we may need to run the `tensorflow_model_server` command in a terminal, and replace `${MODEL_DIR}` with the full path to the `my_mnist_model` directory.

In [9]:
import os

os.environ['MODEL_DIR'] = str(model_path.parent.absolute())

In [10]:
%%bash --bg
tensorflow_model_server \
    --port=8500 \
    --rest_api_port=8501 \
    --model_name=my_mnist_model \
    --model_base_path='${MODEL_DIR}' >my_server.log 2>&1

- In Jupyter or Colab, the `%%bash --bg` magic command executes the cell as a bash script, running it in the background.
- The `>my_server.log 2>&1` part redirects the standard output and standard error to the my_server.log file.

<div style="border: 1px solid;">

##### Running TF Serving in a Docker Container 
If we are running this notebook on our own machine, and we prefer to install TF Serving using Docker, first make sure [Docker](https://docs.docker.com/install/) is installed, then run the following commands in a terminal.
```bash
# Downloads the latest TF Serving image
docker pull tensorflow/serving

docker run -it --rm -v '/path/to/my_mnist_model:/models/my_mnist_model' \
    -p 8500:8500 -p 8501:8501 -e MODEL_NAME=my_mnist_model tensorflow/serving
```
Here is what all these command-line options mean:
- `-it`: Makes the container interactive (so we can press Ctrl-C to stop it) and displays the server’s output
- `--rm`: Deletes the container when we stop it: no need to clutter our machine with interrupted containers. However, it does not delete the image.
- `-v '/path/to/my_mnist_model:/models/my_mnist_model'`: Makes the host’s *my_mnist_model* directory available to the container at the path */models/mnist_model*. We must replace */path/to/my_mnist_model* with the absolute path of this directory. On Windows, remember to use \ instead of / in the host path, but not in the container path (since the container runs on Linux).
- `-p 8500:8500`: Makes the Docker engine forward the host’s TCP port 8500 to the container’s TCP port 8500. By default, TF Serving uses this port to serve the gRPC API.
- `-p 8501:8501`: Forwards the host’s TCP port 8501 to the container’s TCP port 8501. The Docker image is configured to use this port by default to serve the REST API.
- `-e MODEL_NAME=my_mnist_model`: Sets the container’s `MODEL_NAME` environment variable, so TF Serving knows which model to serve. By default, it will look for models in the */models* directory, and it will automatically serve the latest version it finds.
- `tensorflow/serving`: This is the name of the image to run.
</div>

#### Querying TF Serving through the REST API
Let’s start by creating the query. It must contain the name of the function signature we want to call, and of course the input data. Since the request must use the JSON format, we have to convert the input images from a NumPy array to a Python list:

In [11]:
import json

# Pretend we have 3 new digit images to classify
X_new = X_test[:3]
request_json = json.dumps(
    {
        'signature_name': 'serving_default',
        'instances': X_new.tolist(),
    }
)

In [12]:
request_json[:100] + '...' + request_json[-10:]

'{"signature_name": "serving_default", "instances": [[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0..., 0, 0]]]}'

Now let’s use TensorFlow Serving’s REST API to make predictions:

In [13]:
import requests

server_url = 'http://localhost:8501/v1/models/my_mnist_model:predict'
response = requests.post(server_url, data=request_json)
# Raise an exception in case of error
response.raise_for_status()
response = response.json()

In [14]:
import numpy as np

y_proba = np.array(response['predictions'])
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

**Tip**: When transferring large amounts of data, or when latency is important, it is much better to use the gRPC API, if the client supports it, as it uses a compact binary format and an efficient communication protocol based on HTTP/2 framing.

#### Querying TF Serving through the gRPC API
The gRPC API expects a serialized `PredictRequest` protocol buffer as input, and it outputs a serialized `PredictResponse` protocol buffer. These protobufs are part of the `tensorflow-serving-api` library. First, let’s create the request:

In [15]:
from tensorflow_serving.apis.predict_pb2 import PredictRequest

request = PredictRequest()
request.model_spec.name = model_name
request.model_spec.signature_name = 'serving_default'
# == 'flatten_input'
input_name = model.input_names[0]
request.inputs[input_name].CopyFrom(tf.make_tensor_proto(X_new))

In [16]:
import grpc
from tensorflow_serving.apis import prediction_service_pb2_grpc

# No encryption, no authentication
channel = grpc.insecure_channel('localhost:8500')
predict_service = prediction_service_pb2_grpc.PredictionServiceStub(channel)
response = predict_service.Predict(request, timeout=10.0)

Convert the response to a tensor:

In [17]:
# == 'dense_1'
output_name = model.output_names[0]
outputs_proto = response.outputs[output_name]
y_proba = tf.make_ndarray(outputs_proto)

In [18]:
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]],
      dtype=float32)

If our client does not include the TensorFlow library, we can convert the response to a NumPy array like this:

In [19]:
# Shows how to avoid using tf.make_ndarray()
output_name = model.output_names[0]
outputs_proto = response.outputs[output_name]
shape = [dim.size for dim in outputs_proto.tensor_shape.dim]
y_proba = np.array(outputs_proto.float_val).reshape(shape)
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

#### Deploying a new model version

In [20]:
# Build and train a new MNIST model version
keras.utils.set_random_seed(42)
keras.utils.set_random_seed(42)
model = keras.Sequential(
    [
        keras.layers.Flatten(input_shape=[28, 28], dtype=tf.uint8),
        keras.layers.Rescaling(scale=1 / 255),
        keras.layers.Dense(50, activation='relu'),
        keras.layers.Dense(50, activation='relu'),
        keras.layers.Dense(10, activation='softmax'),
    ]
)
model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer=keras.optimizers.SGD(learning_rate=1e-2),
    metrics=['accuracy'],
)
history = model.fit(
    X_train, y_train, epochs=10, validation_data=(X_valid, y_valid)
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [21]:
model_version = '0002'
model_path = Path(model_name) / model_version
model.save(model_path, save_format='tf')

INFO:tensorflow:Assets written to: my_mnist_model/0002/assets


Let’s take a look at the file tree again:

In [22]:
sorted([str(path) for path in model_path.parent.glob('**/*')])

['my_mnist_model/0001',
 'my_mnist_model/0001/assets',
 'my_mnist_model/0001/keras_metadata.pb',
 'my_mnist_model/0001/saved_model.pb',
 'my_mnist_model/0001/variables',
 'my_mnist_model/0001/variables/variables.data-00000-of-00001',
 'my_mnist_model/0001/variables/variables.index',
 'my_mnist_model/0002',
 'my_mnist_model/0002/assets',
 'my_mnist_model/0002/keras_metadata.pb',
 'my_mnist_model/0002/saved_model.pb',
 'my_mnist_model/0002/variables',
 'my_mnist_model/0002/variables/variables.data-00000-of-00001',
 'my_mnist_model/0002/variables/variables.index']

At regular intervals (the delay is configurable), TF Serving checks the model directory for new model versions. If it finds one, it automatically handles the transition gracefully: by default, it answers pending requests (if any) with the previous model version, while handling new requests with the new version. As soon as every pending request has been answered, the previous model version is unloaded. We can see this at work in the TF Serving logs (in *my_server.log*):
```text
[...]
Reading SavedModel from: /models/my_mnist_model/0002
Reading meta graph with tags { serve }
[...]
Successfully loaded servable version {name: my_mnist_model version: 2}
Quiescing servable version {name: my_mnist_model version: 1}
Done quiescing servable version {name: my_mnist_model version: 1}
Unloading servable version {name: my_mnist_model version: 1}
```

**Warning**: We may need to wait a minute before the new model is loaded by TensorFlow Serving.

In [23]:
server_url = 'http://localhost:8501/v1/models/my_mnist_model:predict'

response = requests.post(server_url, data=request_json)
response.raise_for_status()
response = response.json()

In [24]:
response.keys()

dict_keys(['predictions'])

In [25]:
y_proba = np.array(response['predictions'])
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.99, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ]])

**Tip**: If the SavedModel contains some example instances in the *assets/extra* directory, we can configure TF Serving to run the new model on these instances before starting to use it to serve requests. This is called *model warmup*: it will ensure that everything is properly loaded, avoiding long response times for the first requests.

This approach offers a smooth transition, but it may use too much RAM. We can configure TF Serving so that it handles all pending requests with the previous model version and unloads it before loading and using the new model version.

If we discover that version 2 does not work as well as we expected, then rolling back to version 1 is as simple as removing the *my_mnist_model/0002* directory.

**Tip**: We can activate automatic batching using the `--enable_batching` option upon startup. When TF Serving receives multiple requests within a short period of time (the delay is configurable), it will automatically batch them together before using the model. This offers a significant performance boost by leveraging the power of the GPU. Once the model returns the predictions, TF Serving dispatches each prediction to the right client. We can trade a bit of latency for a greater throughput by increasing the batching delay (see the `--batching_parameters_file` option).

If we expect to get many queries per second, we will want to deploy TF Serving on multiple servers and load-balance the queries. This will require deploying and managing many TF Serving containers across these servers. One way to handle that is to use a tool such as [Kubernetes](https://kubernetes.io), which is an open source system for simplifying container orchestration across many servers. If we do not want to purchase, maintain, and upgrade all the hardware infrastructure, we will want to use virtual machines on a cloud platform such as Amazon AWS, Microsoft Azure, Google Cloud Platform, IBM Cloud, Alibaba Cloud, Oracle Cloud, or some other platform as a service (PaaS) offering. 

All of this can be a full-time job, but some service providers can take care of all this for us. In this chapter we will use Vertex AI: it’s the only platform with TPUs today; it supports TensorFlow 2, Scikit-Learn, and XGBoost; and it offers a nice suite of AI services. There are other providers such as Amazon AWS SageMaker and Microsoft AI Platform.

### Creating a Prediction Service on Vertex AI
Vertex AI is a platform within Google Cloud Platform (GCP) that offers a wide range of AI-related tools and services:
- We can upload datasets, get humans to label them, store commonly used features in a feature store and use them for training or in production, and train models across many GPU or TPU servers with automatic hyperparameter tuning or model architecture search (AutoML).
- We can also manage our trained models, use them to make batch predictions on large amounts of data, schedule multiple jobs for our data workflows, serve our models via REST or gRPC at scale, and experiment with our data and models within a hosted Jupyter environment called the *Workbench*.
- There’s even a *Matching Engine* service that lets us compare vectors very efficiently (i.e., approximate nearest neighbors).

Before we start, there’s a little bit of setup to take care of:
1. Log in to our Google account, and then go to the [Google Cloud Platform 
console](https://console.cloud.google.com).

<center>
  <img 
    src="../images/19/gcp_console.png" 
    onerror="
      this.onerror = null;
      const repo = 'https://github.com/alirezatheh/handson-ml3-notes/blob/main';
      this.src = repo + this.src.split('..')[1];
    "
  >
</center>

2. If it’s our first time using GCP, we’ll have to read and accept the terms and conditions. New users are offered a free trial, including $300 worth of GCP credit that we can use over the course of 90 days.
3. If we have used GCP before and our free trial has expired, then the services we will use in this chapter will cost us some money. To check, open the ☰ navigation menu at the top left and click Billing, then make sure we have set up a payment method and that the billing account is active.
4. Every resource in GCP belongs to a *project*. This includes all the VMs we may use, the files we store, and the training jobs we run. 
   - When we create an account, GCP automatically creates a project for us, called “My First Project”. If we want, we can change its display name by going to the project settings: in the ☰ navigation menu, select “IAM and admin → Settings”, change the project’s display name, and click SAVE. 
   - Note that the project also has a unique ID and number. We can choose the project ID when we create a project, but we cannot change it later. The project number is automatically generated and cannot be changed.
   - If we want to create a new project, click the project name at the top of the page, then click NEW PROJECT and enter the project name. We can also click EDIT to set the project ID. Make sure billing is active for this new project so that service fees can be billed (to our free credits, if any).
5. Now we must activate the APIs we need. In the ☰ navigation menu, select “APIs and services”, and make sure the Cloud Storage API is enabled. If needed, click + ENABLE APIS AND SERVICES, find Cloud Storage, and enable it. Also enable the Vertex AI API.

**Warning**: We should always set an alarm to remind ourselves to turn services off when we know we will only need them for a few hours, or else we might leave them running for days or months, incurring potentially significant costs.

We could continue to do everything via the GCP console, but I recommend using Python instead: this way we can write scripts to automate just about anything we want with GCP.

<div style="border: 1px solid;">

#### Google Cloud CLI and Shell
Google Cloud’s command-line interface (CLI) includes the `gcloud` command, which lets us control almost everything in GCP, and `gsutil`, which lets us interact with Google Cloud Storage. This CLI is preinstalled in Colab: all we need to do is authenticate using `google.auth.authenticate_user()`. e.g. `!gcloud config list` will display the configuration.

GCP also offers a preconfigured shell environment called the Google Cloud Shell, which we can use directly in our web browser; it runs on a free Linux VM (Debian) with the Google Cloud SDK already preinstalled and configured for us, so there’s no need to authenticate.

<center>
  <img 
    src="../images/19/gc_shell.png" 
    onerror="
      this.onerror = null;
      const repo = 'https://github.com/alirezatheh/handson-ml3-notes/blob/main';
      this.src = repo + this.src.split('..')[1];
    "
  >
</center>

We can also [install the CLI on our machine](https://homl.info/gcloud).
</div>

The first thing you need to do before we can use any GCP service is to authenticate. The simplest solution when using Colab is to execute the following code:
```python
from google.colab import auth

auth.authenticate_user()
```
The authentication process is based on [OAuth 2.0](https://oauth.net): a pop-up window will ask us to confirm that we want the Colab notebook to access our Google credentials.

<div style="border: 1px solid;">

#### Authentication and Authorization on GCP
In general, using OAuth 2.0 authentication is only recommended when an application must access the user’s personal data or resources from another application, on the user’s behalf.

When an application needs to access a service on GCP on its own behalf, then it should generally use a *service account*. To create a service account:
- Select “IAM and admin → Service accounts” in the GCP console’s ☰ navigation menu (or use the search box), then click + CREATE SERVICE ACCOUNT, fill in the first page of the form (service account name, ID, description), and click CREATE AND CONTINUE.
- Next, we must give this account some access rights. Select the “Vertex AI user” role: this will allow the service account to make predictions and use other Vertex AI services, but nothing else. Click CONTINUE. 
- We can now optionally grant some users access to the service account: this is useful when our GCP user account is part of an organization and we wish to authorize other users in the organization to:
  - Deploy applications that will be based on this service account;
  - Or to manage the service account itself. 
  
  Next, click DONE.

Once we have created a service account, our application must authenticate as that service account. There are several ways to do that:
- If our application is hosted on GCP, then the simplest and safest solution is to attach the service account to the GCP resource that hosts our website, such as a VM instance or a Google App Engine service. This can be done when creating the GCP resource, by selecting the service account in the “Identity and API access” section. Some resources, such as VM instances, also let us attach the service account after the VM instance is created: we must stop it and edit its settings. In any case, once a service account is attached to a VM instance, or any other GCP resource running our code, GCP’s client libraries will automatically authenticate as the chosen service account, with no extra step needed.
- If our application is hosted using Kubernetes, then we should use Google’s Work‐load Identity service to map the right service account to each Kubernetes service account.
- If our application is not hosted on GCP, then we can either use the Workload Identity Federation service (that’s the safest but hardest option), or just generate an access key for our service account, save it to a JSON file, and point the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to it so our client application can access it. We can manage access keys by clicking the service account we just created, and then opening the KEYS tab. 

For more details on setting up authentication and authorization so our application can access GCP services, check out the [documentation](https://homl.info/gcpauth).
</div>

If we are not running this notebook in Colab, after following the instructions explained we must create a service account and generate a key for it, download it to this notebook’s directory, and name it `my_service_account_key.json` (or make sure the `GOOGLE_APPLICATION_CREDENTIALS` environment variable points to our key).

In [26]:
# CHANGE THIS TO OUR PROJECT ID
project_id = 'my_project'

if 'google.colab' in sys.modules:
    from google.colab import auth

    auth.authenticate_user()
elif 'kaggle_secrets' in sys.modules:
    from kaggle_secrets import UserSecretsClient

    UserSecretsClient().set_gcloud_credentials(project=project_id)
else:
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = (
        'my_service_account_key.json'
    )

Now let’s create a Google Cloud Storage bucket to store our SavedModels (a GCS *bucket* is a container for our data). We first create a `Client` object, which will serve as the interface with GCS, then we use it to create the bucket:

In [27]:
from google.cloud import storage

# CHANGE THIS TO A UNIQUE BUCKET NAME
bucket_name = 'my_bucket'
location = 'us-central1'

storage_client = storage.Client(project=project_id)
bucket = storage_client.create_bucket(bucket_name, location=location)
# To reuse a bucket instead
# bucket = storage_client.bucket(bucket_name)

- GCS uses a single worldwide namespace for buckets, so simple names like “machine-learning” will most likely not be available. Make sure the bucket name conforms to DNS naming conventions, as it may be used in DNS records. Moreover, bucket names are public, so do not put anything private in the name. It is common to use our domain name, our company name, or our project ID as a prefix to ensure uniqueness, or simply use a random number as part of the name.
- We can change the region if we want, see [Google Cloud’s list of regions](https://homl.info/regions) and [Vertex AI’s documentation on locations](https://homl.info/locations) for more details.

Next, let’s upload the *my_mnist_model* directory to the new bucket. Files in GCS are called *blobs* (or *objects*), and under the hood they are all just placed in the bucket without any directory structure. Blob names can be arbitrary Unicode strings, and they can even contain forward slashes (/). The GCP console and other tools use these slashes to give the illusion that there are directories. So, when we upload the *my_mnist_model* directory, we only care about the files, not the directories:

In [28]:
def upload_directory(bucket: storage.Bucket, dirpath: str) -> None:
    dirpath = Path(dirpath)
    for filepath in dirpath.glob('**/*'):
        if filepath.is_file():
            blob = bucket.blob(filepath.relative_to(dirpath.parent).as_posix())
            blob.upload_from_filename(filepath)


upload_directory(bucket, 'my_mnist_model')

This function works fine now, but it would be very slow if there were many files to upload.

In [29]:
# A much faster multithreaded implementation of upload_directory() which
# also accepts a prefix for the target path, and prints stuff

from concurrent import futures


def upload_file(
    bucket: storage.Bucket, filepath: Path, blob_path: str
) -> None:
    blob = bucket.blob(blob_path)
    blob.upload_from_filename(filepath)


def upload_directory(
    bucket: storage.Bucket,
    dirpath: str,
    prefix: str = None,
    max_workers: int = 50,
) -> None:
    dirpath = Path(dirpath)
    prefix = prefix or dirpath.name
    with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_filepath = {
            executor.submit(
                upload_file,
                bucket,
                filepath,
                f'{prefix}/{filepath.relative_to(dirpath).as_posix()}',
            ): filepath
            for filepath in sorted(dirpath.glob('**/*'))
            if filepath.is_file()
        }
        for future in futures.as_completed(future_to_filepath):
            filepath = future_to_filepath[future]
            try:
                _ = future.result()
            except Exception as ex:
                # f!s is str(f)
                print(f'Error uploading {filepath!s:60}: {ex}')
            else:
                print(f'Uploaded {filepath!s:60}', end='\r')

    print(f'Uploaded {dirpath!s:60}')

Alternatively, if we installed Google Cloud CLI (it’s preinstalled on Colab), then we can use the following `gsutil` command:

In [30]:
# !gsutil -m cp -r my_mnist_model gs://{bucket_name}/

Next, let’s tell Vertex AI about our MNIST model. 
- First we initialize the library, just to specify some default values for the project ID and the location.
- Then we can create a new Vertex AI model: we specify a display name, the GCS path to our model (in this case the version 0001), and the URL of the Docker container we want Vertex AI to use to run this model. If we visit that URL and navigate up one level, we will find other containers we can use. This one supports TensorFlow 2.8 with a GPU:

In [31]:
from google.cloud import aiplatform

server_image = 'gcr.io/cloud-aiplatform/prediction/tf2-gpu.2-8:latest'

aiplatform.init(project=project_id, location=location)
mnist_model = aiplatform.Model.upload(
    display_name='mnist',
    artifact_uri=f'gs://{bucket_name}/my_mnist_model/0001',
    serving_container_image_uri=server_image,
)

Creating Model
Create Model backing LRO: projects/522977795627/locations/us-central1/models/4798114811986575360/operations/53403898236370944
Model created. Resource name: projects/522977795627/locations/us-central1/models/4798114811986575360
To use this Model in another session:
model = aiplatform.Model('projects/522977795627/locations/us-central1/models/4798114811986575360')


Now let’s deploy this model so we can query it via a gRPC or REST API to make predictions:
- For this we first need to create an *endpoint*. This is what client applications connect to when they want to access a service.
- Then we need to deploy our model to this endpoint:

**Warning**: This cell may take several minutes to run, as it waits for Vertex AI to provision the compute nodes:

In [32]:
endpoint = aiplatform.Endpoint.create(display_name='mnist-endpoint')

endpoint.deploy(
    mnist_model,
    min_replica_count=1,
    max_replica_count=5,
    machine_type='n1-standard-4',
    accelerator_type='NVIDIA_TESLA_K80',
    accelerator_count=1,
)

Creating Endpoint
Create Endpoint backing LRO: projects/522977795627/locations/us-central1/endpoints/5133373499481522176/operations/4135354010494304256
Endpoint created. Resource name: projects/522977795627/locations/us-central1/endpoints/5133373499481522176
To use this Endpoint in another session:
endpoint = aiplatform.Endpoint('projects/522977795627/locations/us-central1/endpoints/5133373499481522176')
Deploying Model projects/522977795627/locations/us-central1/models/4798114811986575360 to Endpoint : projects/522977795627/locations/us-central1/endpoints/5133373499481522176
Deploy Endpoint model backing LRO: projects/522977795627/locations/us-central1/endpoints/5133373499481522176/operations/388359120522051584
Endpoint model deployed. Resource name: projects/522977795627/locations/us-central1/endpoints/5133373499481522176


See https://homl.info/machinetypes for other machine types and https://homl.info/accelerators for other GPU types.

**Note**: GCP enforces various GPU quotas, both world‐wide and per region: we cannot create thousands of GPU nodes without prior authorization from Google. To check our quotas, open “IAM and admin → Quotas” in the GCP console. If some quotas are too low (e.g., if we need more GPUs in a particular region), we can ask for them to be increased; it often takes about 48 hours.

Vertex AI will initially spawn the minimum number of compute nodes (just one in this case), and whenever the number of queries per second becomes too high, it will spawn more nodes (up to the maximum number we defined, five in this case) and will load-balance the queries between them. If the QPS rate goes down for a while, Vertex AI will stop the extra compute nodes automatically. The cost is therefore directly linked to the load, as well as the machine and accelerator types we selected and the amount of data we store on GCS. This pricing model is great for occasional users and for services with important usage spikes. It’s also ideal for startups: the price remains low until the startup actually starts up.

Now let’s query this prediction service:

In [33]:
response = endpoint.predict(instances=X_new.tolist())

In [34]:
import numpy as np

np.round(response.predictions, 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

In [35]:
# Undeploy all models from the endpoint
endpoint.undeploy_all()
endpoint.delete()

Undeploying Endpoint model: projects/522977795627/locations/us-central1/endpoints/5133373499481522176
Undeploy Endpoint model backing LRO: projects/522977795627/locations/us-central1/endpoints/5133373499481522176/operations/3579722406467469312
Endpoint model undeployed. Resource name: projects/522977795627/locations/us-central1/endpoints/5133373499481522176
Deleting Endpoint : projects/522977795627/locations/us-central1/endpoints/5133373499481522176
Delete Endpoint  backing LRO: projects/522977795627/locations/us-central1/operations/4738836360561950720
Endpoint deleted. . Resource name: projects/522977795627/locations/us-central1/endpoints/5133373499481522176


### Running Batch Prediction Jobs on Vertex AI
If we have a large number of predictions to make, then instead of calling our prediction service repeatedly, we can ask Vertex AI to run a prediction job for us. This does not require an endpoint, only a model.

One way to do this is to create a file containing one instance per line, each formatted as a JSON value, this format is called *JSON Lines*, then pass this file to Vertex AI. So let’s create a JSON Lines file in a new directory, then upload this directory to GCS:

In [36]:
batch_path = Path('my_mnist_batch')
batch_path.mkdir(exist_ok=True)
with open(batch_path / 'my_mnist_batch.jsonl', 'w') as jsonl_file:
    for image in X_test[:100].tolist():
        jsonl_file.write(json.dumps(image))
        jsonl_file.write('\n')

upload_directory(bucket, batch_path)

Uploaded my_mnist_batch                                              


Now let’s launch the prediction job:

In [37]:
batch_prediction_job = mnist_model.batch_predict(
    job_display_name='my_batch_prediction_job',
    machine_type='n1-standard-4',
    starting_replica_count=1,
    max_replica_count=5,
    accelerator_type='NVIDIA_TESLA_K80',
    accelerator_count=1,
    gcs_source=[f'gs://{bucket_name}/{batch_path.name}/my_mnist_batch.jsonl'],
    gcs_destination_prefix=f'gs://{bucket_name}/my_mnist_predictions/',
    # Set to False if we don’t want to wait for completion
    sync=True,
)

Creating BatchPredictionJob
BatchPredictionJob created. Resource name: projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544
To use this BatchPredictionJob in another session:
bpj = aiplatform.BatchPredictionJob('projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544')
View Batch Prediction Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/4346926367237996544?project=522977795627
BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state:
JobState.JOB_STATE_PENDING
BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state:
JobState.JOB_STATE_RUNNING
BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544 current state:
JobState.JOB_STATE_RUNNING
BatchPredictionJob projects/522977795627/locations/us-central1/batchPredictionJobs/

**Note**: For large batches, we can split the inputs into multiple JSON Lines files and list them all via the `gcs_source` argument.

In [38]:
# Shows the output directory
batch_prediction_job.output_info

gcs_output_directory: "gs://my_bucket/my_mnist_predictions/prediction-mnist-2022_04_12T21_30_08_071Z"

The predictions will be available in a set of files named something like *prediction.results-00001-of-00002*. These files use the JSON Lines format by default, and each value is a dictionary containing an instance and its corresponding prediction (i.e., 10 probabilities). The instances are listed in the same order as the inputs. The job also outputs *prediction-errors\** files, which can be useful for debugging if something goes wrong. We can iterate through all these output files using `batch_prediction_job.iter_outputs()`:

In [39]:
y_probas = []
for blob in batch_prediction_job.iter_outputs():
    print(blob.name)
    if 'prediction.results' in blob.name:
        for line in blob.download_as_text().splitlines():
            y_proba = json.loads(line)['prediction']
            y_probas.append(y_proba)

my_mnist_predictions/prediction-mnist-2022_04_12T21_30_08_071Z/prediction.errors_stats-00000-of-00001
my_mnist_predictions/prediction-mnist-2022_04_12T21_30_08_071Z/prediction.results-00000-of-00002
my_mnist_predictions/prediction-mnist-2022_04_12T21_30_08_071Z/prediction.results-00001-of-00002


In [40]:
y_pred = np.argmax(y_probas, axis=1)
accuracy = np.sum(y_pred == y_test[:100]) / 100

In [41]:
accuracy

0.98

In [42]:
mnist_model.delete()

Deleting Model : projects/522977795627/locations/us-central1/models/4798114811986575360
Delete Model  backing LRO: projects/522977795627/locations/us-central1/operations/598902403101622272
Model deleted. . Resource name: projects/522977795627/locations/us-central1/models/4798114811986575360


The JSON Lines format is the default, but when dealing with large instances such as images, it is too verbose. The `batch_predict()` method accepts an `instances_format` argument that lets us choose another format if we want. It defaults to `'jsonl'`, but we can change it to `'csv'`, `'tf-record'`, `'tf-record-gzip'`, `'bigquery'`, or `'file-list'`. If we set it to `'file-list'`, then the `gcs_source` argument should point to a text file containing one input filepath per line; for instance, pointing to PNG image files. Vertex AI will read these files as binary, encode them using Base64, and pass the resulting byte strings to the model. This means that we must add a preprocessing layer in our model to parse the Base64 strings, using `tf.io.decode_base64()`. If the files are images, we must then parse the result using a function like `tf.io.decode_image()` or `tf.io.decode_png()`.

Let’s delete all the directories we created on GCS (i.e., all the blobs with these prefixes):

In [43]:
for prefix in ['my_mnist_model/', 'my_mnist_batch/', 'my_mnist_predictions/']:
    blobs = bucket.list_blobs(prefix=prefix)
    for blob in blobs:
        blob.delete()

# Uncomment and run if the bucket is empty and we want to delete the
# bucket itself
# bucket.delete()
batch_prediction_job.delete()

Deleting BatchPredictionJob : projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544
Delete BatchPredictionJob  backing LRO: projects/522977795627/locations/us-central1/operations/6699028098374959104
BatchPredictionJob deleted. . Resource name: projects/522977795627/locations/us-central1/batchPredictionJobs/4346926367237996544


## Deploying a Model to a Mobile or Embedded Device
ML models can run closer to the source of data (this is called *edge computing*), e.g. in the user’s mobile device or in an embedded device. 

Pros:
- It allows the device to be smart even when it’s not connected to the internet.
- It reduces latency by not having to send data to a remote server and reduces the load on the servers.
- It may improve privacy, since the user’s data can stay on the device.

Cons:
- The device’s computing resources are generally tiny compared to a beefy multi-GPU server.
- A large model may not fit in the device, it may use too much RAM and CPU, and it may take too long to download. As a result, the application may become unresponsive, and the device may heat up and quickly run out of battery.

The TFLite library provides several tools to help us deploy our models to the edge, with three main objectives:
- Reduce the model size, to shorten download time and reduce RAM usage.
- Reduce the amount of computations needed for each prediction, to reduce latency, battery usage, and heating.
- Adapt the model to device-specific constraints.

**Note**: Also check TensorFlow’s [Graph Transform Tool](https://homl.info/tfgtt) for modifying and optimizing computational graphs.

TFLite’s model converter can take a SavedModel and compress it to a much lighter format based on [FlatBuffers](https://google.github.io/flatbuffers). This is an efficient cross-platform serialization library initially created by Google for gaming. It is designed so we can load FlatBuffers straight to RAM without any preprocessing: this reduces the loading time and memory footprint. Once the model is loaded into a mobile or embedded device, the TFLite interpreter will execute it to make predictions. Here is how we can convert a SavedModel to a FlatBuffer and save it to a *.tflite* file:

In [44]:
converter = tf.lite.TFLiteConverter.from_saved_model(str(model_path))
tflite_model = converter.convert()
with open('my_converted_savedmodel.tflite', 'wb') as f:
    f.write(tflite_model)

2022-04-10 09:03:52.237094: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:357] Ignored output_format.
2022-04-10 09:03:52.237108: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:360] Ignored drop_control_dependency.
2022-04-10 09:03:52.237830: I tensorflow/cc/saved_model/reader.cc:43] Reading SavedModel from: my_mnist_model/0001
2022-04-10 09:03:52.238869: I tensorflow/cc/saved_model/reader.cc:78] Reading meta graph with tags { serve }
2022-04-10 09:03:52.238881: I tensorflow/cc/saved_model/reader.cc:119] Reading SavedModel debug info (if present) from: my_mnist_model/0001
2022-04-10 09:03:52.242108: I tensorflow/cc/saved_model/loader.cc:228] Restoring SavedModel bundle.
2022-04-10 09:03:52.263868: I tensorflow/cc/saved_model/loader.cc:212] Running initialization op on SavedModel bundle at path: my_mnist_model/0001
2022-04-10 09:03:52.271298: I tensorflow/cc/saved_model/loader.cc:301] SavedModel load for tags { serve }; Status: success: OK. Too

**Tip**: We can also save a Keras model directly to a FlatBuffer using `tf.lite.TFLiteConverter.from_keras_model(model)`:

In [45]:
# Shows how to convert a Keras model
converter = tf.lite.TFLiteConverter.from_keras_model(model)

The converter also optimizes the model, both to shrink it and to reduce its latency. It prunes all the operations that are not needed to make predictions (such as training operations), and it optimizes computations whenever possible (e.g. $3\times a+4\times a+5\times a\to 12\times a$). Also it tries to fuse operations whenever possible (e.g. batch normalization layers end up folded into the previous layer’s addition and multiplication operations). 

Another way we can reduce the model size is by using smaller bit-widths: TFLite does this by quantizing the model weights down to fixed-point, 8-bit integers! This leads to a fourfold size reduction compared to using 32-bit floats. The simplest approach is called *post-training quantization*: it just quantizes the weights after training, using a fairly basic but efficient symmetrical quantization technique. It finds the maximum absolute weight value, $m$, then it maps the floating-point range $-m$ to $+m$ to the fixed-point (integer) range –127 to +127. e.g. if the weights range from –1.5 to +0.8, then the bytes –127, 0, and +127 will correspond to the floats –1.5, 0.0, and +1.5, respectively. Note that 0.0 always maps to 0 when using symmetrical quantization. Also note that the byte values +68 to +127 will not be used in this example, since they map to floats greater than +0.8.

To perform this post-training quantization:

In [46]:
converter.optimizations = [tf.lite.Optimize.DEFAULT]

In [47]:
tflite_model = converter.convert()
with open('my_converted_keras_model.tflite', 'wb') as f:
    f.write(tflite_model)

INFO:tensorflow:Assets written to: /var/folders/wy/h39t6kb11pnbb0pzhksd_fqh0000gq/T/tmp6ffbc1qs/assets


INFO:tensorflow:Assets written to: /var/folders/wy/h39t6kb11pnbb0pzhksd_fqh0000gq/T/tmp6ffbc1qs/assets
2022-04-10 09:26:30.319286: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:357] Ignored output_format.
2022-04-10 09:26:30.319301: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:360] Ignored drop_control_dependency.
2022-04-10 09:26:30.319417: I tensorflow/cc/saved_model/reader.cc:43] Reading SavedModel from: /var/folders/wy/h39t6kb11pnbb0pzhksd_fqh0000gq/T/tmp6ffbc1qs
2022-04-10 09:26:30.320420: I tensorflow/cc/saved_model/reader.cc:78] Reading meta graph with tags { serve }
2022-04-10 09:26:30.320431: I tensorflow/cc/saved_model/reader.cc:119] Reading SavedModel debug info (if present) from: /var/folders/wy/h39t6kb11pnbb0pzhksd_fqh0000gq/T/tmp6ffbc1qs
2022-04-10 09:26:30.323773: I tensorflow/cc/saved_model/loader.cc:228] Restoring SavedModel bundle.
2022-04-10 09:26:30.345416: I tensorflow/cc/saved_model/loader.cc:212] Running initialization

At runtime the quantized weights get converted back to floats before they are used. To avoid recomputing the float values all the time, which would severely slow down the model, TFLite caches them: unfortunately, this means that this technique does not reduce RAM usage, and it doesn’t speed up the model either.

The most effective way to reduce latency and power consumption is to also quantize the activations so that the computations can be done entirely with integers. Even when using the same bit-width, integer computations use less CPU cycles, consume less energy, and produce less heat. And if we also reduce the bit-width, we can get huge speedups.

The main problem with quantization is that it loses a bit of accuracy: it is similar to adding noise to the weights and activations. If the accuracy drop is too severe, then we may need to use *quantization-aware training*. This means adding fake quantization operations to the model so it can learn to ignore the quantization noise during training; the final weights will then be more robust to quantization. Moreover, the calibration step can be taken care of automatically during training, which simplifies the whole process.

## Running a Model in a Web Page
This can be useful in many scenarios, such as:
- When our web application is often used in situations where the user’s connectivity is intermittent or slow (e.g., a website for hikers).
- When we need the model’s responses to be as fast as possible (e.g., for an online game).
- When our web service makes predictions based on some private user data

For all these scenarios, we can use the [TensorFlow.js (TFJS) JavaScript library](https://tensorflow.org/js). This library can load a TFLite model and make predictions directly in the user’s browser.

The following JavaScript module imports the TFJS library, downloads a pretrained MobileNet model, and uses this model to classify an image and log the predictions:
```js
import "https://cdn.jsdelivr.net/npm/@tensorflow/tfjs@latest";
import "https://cdn.jsdelivr.net/npm/@tensorflow-models/mobilenet@1.0.0";
const image = document.getElementById("image");

mobilenet.load().then(model => {
  model.classify(image).then(predictions => {
    for (var i = 0; i < predictions.length; i++) {
      let className = predictions[i].className
      let proba = (predictions[i].probability * 100).toFixed(1)
      console.log(className + " : " + proba + "%");
    } 
  });
});
```

Code examples for this section are hosted on Glitch.com, a website that lets us create Web apps for free
- https://homl.info/tfjscode a simple TFJS Web app that loads a pretrained model and classifies an image.
- https://homl.info/tfjswpa the same Web app setup as a *progressive web app* PWA. Try opening this link on various platforms, including mobile devices. See https://homl.info/wpacode for this PWA’s source code.

**Note**: Check out many more demos of machine learning models running in our browser at https://tensorflow.org/js/demos.

TFJS also supports training a model directly in the web browser! If our computer has a GPU card, then TFJS can generally use it, even if it’s not an Nvidia card. A model can be trained centrally, and then fine-tuned locally, in the browser, based on that user’s data. Check out [federated learning](https://tensorflow.org/federated).

## Using GPUs to Speed Up Computations

### Getting Our Own GPU
We will need to consider the amount of RAM we will need for our tasks (e.g., typically at least 10 GB for image processing or NLP), the bandwidth (i.e., how fast we can send data into and out of the GPU), the number of cores, the cooling system, etc. Tim Dettmers wrote an excellent [blog post](https://homl.info/66) to help us choose. TensorFlow only supports [Nvidia cards with CUDA Compute Capability 3.5+](https://homl.info/cudagpus) (as well as Google’s TPUs), but it may extend its support to other manufacturers, so make sure to check [TensorFlow’s documentation](https://tensorflow.org/install) to see what devices are supported today.

If we go for an Nvidia GPU card, we will need to install the appropriate Nvidia drivers and several Nvidia libraries. These include the *Compute Unified Device Architecture* library (CUDA) Toolkit, which allows developers to use CUDA-enabled GPUs for all sorts of computations, and the *CUDA Deep Neural Network* library (cuDNN), a GPU-accelerated library of common DNN computations such as activation layers, normalization, forward and backward convolutions, and pooling. cuDNN is part of Nvidia’s Deep Learning SDK. TensorFlow uses CUDA and cuDNN to control the GPU cards and accelerate computations. We can use the `nvidia-smi` command to check that everything is properly installed:

In [48]:
!nvidia-smi

Let’s check that TensorFlow can see the GPU:

In [49]:
physical_gpus = tf.config.list_physical_devices('GPU')
physical_gpus

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


### Managing the GPU RAM
By default TensorFlow automatically grabs almost all the RAM in all available GPUs the first time we run a computation. It does this to limit GPU RAM fragmentation. If we need to run multiple programs for some reason (e.g., to train two different models in parallel on the same machine), we will need to split the GPU RAM between these processes more evenly.

If we have multiple GPU cards on our machine, a simple solution is to assign each of them to a single process. To do this, we can set the `CUDA_VISIBLE_DEVICES` environment variable so that each process only sees the appropriate GPU card(s). Also set the `CUDA_DEVICE_ORDER` environment variable to `PCI_BUS_ID` to ensure that each ID always refers to the same GPU card. e.g. for four GPU cards:
```bash
CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES=0,1 python3 program_1.py
# And in another terminal
CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES=3,2 python3 program_2.py
```

Program 1 will then only see GPU cards 0 and 1, named `'/gpu:0'` and `'/gpu:1'`, in TensorFlow, and program 2 will only see GPU cards 2 and 3, named `'/gpu:1'` and `'/gpu:0'`.

Another option is to tell TensorFlow to grab only a specific amount of GPU RAM. This must be done immediately after importing TensorFlow. We must create a *logical GPU device* (sometimes called a *virtual GPU device*) for each physical GPU device and set its memory limit:

In [50]:
# for gpu in physical_gpus:
#     tf.config.set_logical_device_configuration(
#         gpu, [tf.config.LogicalDeviceConfiguration(memory_limit=2048)]
#     )

To make TensorFlow grab memory as it needs it (only releasing it when the 
process shuts down):

In [51]:
# for gpu in physical_gpus:
#     tf.config.experimental.set_memory_growth(gpu, True)

Another way to do this is to set the `TF_FORCE_GPU_ALLOW_GROWTH` environment 
variable to `true`.

To split a physical GPU into two logical GPUs (this is useful if we only have 
one physical GPU but we want to test a multi-GPU algorithm):

In [52]:
# tf.config.set_logical_device_configuration(
#     physical_gpus[0],
#     [
#         tf.config.LogicalDeviceConfiguration(memory_limit=2048),
#         tf.config.LogicalDeviceConfiguration(memory_limit=2048),
#     ],
# )

In [53]:
logical_gpus = tf.config.list_logical_devices('GPU')
logical_gpus

[LogicalDevice(name='/device:GPU:0', device_type='GPU')]


### Placing Operations and Variables on Devices
We can place operations and variables manually on each device, if we want more control:
- We generally want to place the data preprocessing operations on the CPU, and place the neural network operations on the GPUs.
- GPUs usually have a fairly limited communication bandwidth, so it is important to avoid unnecessary data transfers into and out of the GPUs.
- Adding more CPU RAM to a machine is simple and fairly cheap, so there’s usually plenty of it, whereas the GPU RAM is baked into the GPU: it is an expensive and thus limited resource, so if a variable is not needed in the next few training steps, it should probably be placed on the CPU (e.g., datasets generally belong on the CPU).

By default, all variables and all operations will be placed on the first GPU (the one named `'/gpu:0'`), except for variables and operations that don’t have a GPU kernel: these are placed on the CPU (always named `'/cpu:0'`). A tensor or variable’s `device` attribute tells us which device it was placed on:

In [54]:
# To log every variable and operation placement (this must be run just
# after importing TensorFlow):

# Log level is INFO by default
# tf.get_logger().setLevel('DEBUG')
# tf.debugging.set_log_device_placement(True)

In [55]:
# float32 variable goes to the GPU
a = tf.Variable([1.0, 2.0, 3.0])
a.device

'/job:localhost/replica:0/task:0/device:GPU:0'

In [56]:
# int32 variable goes to the CPU
b = tf.Variable([1, 2, 3])
b.device

'/job:localhost/replica:0/task:0/device:CPU:0'

We can place variables and operations manually on the desired device using a `tf.device()` context:

In [57]:
with tf.device('/cpu:0'):
    c = tf.Variable([1.0, 2.0, 3.0])

c.device

'/job:localhost/replica:0/task:0/device:CPU:0'

If we specify a device that does not exist, or for which there is no kernel, TensorFlow will silently fallback to the default placement:

In [58]:
with tf.device('/gpu:1234'):
    d = tf.Variable([1.0, 2.0, 3.0])

d.device

"'/job:localhost/replica:0/task:0/device:GPU:0'"

If we want TensorFlow to throw an exception when we try to use a device that does not exist, instead of falling back to the default device:

In [59]:
tf.config.set_soft_device_placement(False)

try:
    with tf.device('/gpu:1000'):
        d = tf.Variable([1.0, 2.0, 3.0])
except tf.errors.InvalidArgumentError as ex:
    print(ex)

tf.config.set_soft_device_placement(True)

Could not satisfy device specification '/job:localhost/replica:0/task:0/device:GPU:1000'. enable_soft_placement=0. Supported device types [CPU]. All available devices [/job:localhost/replica:0/task:0/device:CPU:0].


### Parallel Execution Across Multiple Devices
One of the benefits of using TF functions is parallelism. When TensorFlow runs a TF function, it starts by analyzing its graph to find the list of operations that need to be evaluated, and it counts how many dependencies each of them has. TensorFlow then adds each operation with zero dependencies (i.e., each source operation) to the evaluation queue of this operation’s device. Once an operation has been evaluated, the dependency counter of each operation that depends on it is decremented. Once an operation’s dependency counter reaches zero, it is pushed to the evaluation queue of its device. And once all the outputs have been computed, they are returned.

Operations in the CPU’s evaluation queue are dispatched to a thread pool called the *inter-op thread pool*. If the CPU has multiple cores, then these operations will effectively be evaluated in parallel. Some operations have multithreaded CPU kernels: these kernels split their tasks into multiple suboperations, which are placed in another evaluation queue and dispatched to a second thread pool called the *intra-op thread pool* (shared by all multithreaded CPU kernels). In short, multiple operations and suboperations may be evaluated in parallel on different CPU cores.

Operations in a GPU’s evaluation queue are evaluated sequentially. However, most operations have multithreaded GPU kernels, typically implemented by libraries that TensorFlow depends on, such as CUDA and cuDNN. These implementations have their own thread pools, and they typically exploit as many GPU threads as they can (which is the reason why there is no need for an inter-op thread pool in GPUs: each operation already floods most GPU threads).

<center>
  <img 
    src="../images/19/parallel_tf_graph_execution.png" 
    onerror="
      this.onerror = null;
      const repo = 'https://github.com/alirezatheh/handson-ml3-notes/blob/main';
      this.src = repo + this.src.split('..')[1];
    "
  >
</center>

In the figure, operations A, B, and C are source ops, so they can immediately be evaluated. Operations A and B are placed on the CPU, so they are sent to the CPU’s evaluation queue, then they are dispatched to the inter-op thread pool and immediately evaluated in parallel. Operation A happens to have a multi‐threaded kernel; its computations are split into three parts, which are executed in parallel by the intra-op thread pool. Operation C goes to GPU #0’s evaluation queue, and in this example its GPU kernel happens to use cuDNN, which manages its own intra-op thread pool and runs the operation across many GPU threads in parallel. Suppose C finishes first. The dependency counters of D and E are decremented and they reach 0, so both operations are pushed to GPU #0’s evaluation queue, and they are executed sequentially. Note that C only gets evaluated once, even though both D and E depend on it. Suppose B finishes next. Then F’s dependency counter is decremented from 4 to 3, and since that’s not 0, it does not run yet. Once A, D, and E are finished, then F’s dependency counter reaches 0, and it is pushed to the CPU’s evaluation queue and evaluated. Finally, TensorFlow returns the requested outputs. 

When the TF function modifies a stateful resource, such as a variable, TensorFlow ensures that the order of execution matches the order in the code, even if there is no explicit dependency between the statements. e.g. if our TF function contains `v.assign_add(1)` followed by `v.assign(v * 2)`, TensorFlow will ensure that these operations are executed in that order.

**Tip**: We can control the number of threads in the inter-op thread pool by calling `tf.config.threading.set_inter_op_parallelism_threads()`. To set the number of intra-op threads, use `tf.config.threading.set_intra_op_parallelism_threads()`. This is useful if we do not want TensorFlow to use all the CPU cores or if we want it to be single-threaded. This can be useful if we want to guarantee perfect reproducibility, Check out [this video](https://homl.info/repro).

In [60]:
# tf.config.threading.set_inter_op_parallelism_threads(10)
# tf.config.threading.set_intra_op_parallelism_threads(10)

## Training Models Across Multiple Devices
There are two main approaches to training a single model across multiple devices: *model parallelism*, where the model is split across the devices, and *data parallelism*, where the model is replicated across every device, and each replica is trained on a different subset of the data.

### Model Parallelism
Unfortunately, model parallelism turns out to be pretty tricky, and its effectiveness really depends on the architecture of our neural network.

For fully connected networks, there is generally not much to be gained from this approach (cross-device communication are represented by the dashed arrows):

<center>
  <img 
    src="../images/19/split_fcn.png" 
    onerror="
      this.onerror = null;
      const repo = 'https://github.com/alirezatheh/handson-ml3-notes/blob/main';
      this.src = repo + this.src.split('..')[1];
    "
  >
</center>

Some neural network architectures, such as convolutional neural networks, contain layers that are only partially connected to the lower layers, so it is much easier to distribute chunks across devices in an efficient way:

<center>
  <img 
    src="../images/19/split_pcn.png" 
    onerror="
      this.onerror = null;
      const repo = 'https://github.com/alirezatheh/handson-ml3-notes/blob/main';
      this.src = repo + this.src.split('..')[1];
    "
  >
</center>

Deep recurrent neural networks can be split a bit more efficiently across multiple GPUs. If we split the network horizontally by placing each layer on a different device, and feed the network with an input sequence to process, then at the first time step only one device will be active (working on the sequence’s first value), at the second step two will be active (the second layer will be handling the output of the first layer for the first value, while the first layer will be handling the second value), and by the time the signal propagates to the output layer, all devices will be active simultaneously. There is still a lot of cross-device communication going on, but since each cell may be fairly complex, the benefit of running multiple cells in parallel may (in theory) outweigh the communication penalty. However, in practice a regular stack of `LSTM` layers running on a single GPU actually runs much faster.

<center>
  <img 
    src="../images/19/split_rnn.png" 
    onerror="
      this.onerror = null;
      const repo = 'https://github.com/alirezatheh/handson-ml3-notes/blob/main';
      this.src = repo + this.src.split('..')[1];
    "
  >
</center>

### Data Parallelism
Another way is to replicate neural network on every device and run each training step simultaneously on all replicas, using a different mini-batch for each. The gradients computed by each replica are then averaged, and the result is used to update the model parameters. This is called *data parallelism*, or sometimes *single program, multiple data* (SPMD).

#### Data parallelism using the mirrored strategy
The simplest approach is to completely mirror all the model parameters across all the GPUs and always apply the exact same parameter updates on every GPU. This way, all replicas always remain perfectly identical. This is called the *mirrored strategy*, and it’s quite efficient, especially when using a single machine.

The tricky part when using this approach is to efficiently compute the mean of all the gradients from all the GPUs and distribute the result across all the GPUs. This can be done using an *AllReduce* algorithm, a class of algorithms where multiple nodes collaborate to efficiently perform a *reduce operation* (such as computing the mean, sum, and max), while ensuring that all nodes obtain the same final result.

#### Data parallelism with centralized parameters
Another approach is to store the model parameters outside of the GPU devices performing the computations (called *workers*); e.g. on the CPU. In a distributed setup, we may place all the parameters on one or more CPU-only servers called *parameter servers*, whose only role is to host and update the parameters.

The mirrored strategy imposes synchronous weight updates across all GPUs, the centralized approach allows either synchronous or asynchronous updates. Let’s take a look at the pros and cons of both options.

##### Synchronous updates
With *synchronous updates*, the aggregator waits until all gradients are available before it computes the average gradients and passes them to the optimizer, which will update the model parameters. Once a replica has finished computing its gradients, it must wait for the parameters to be updated before it can proceed to the next mini-batch. The downside is that the fast devices will have to wait for the slow ones at every step, making the whole process as slow as the slowest device. Moreover, the parameters will be copied to every device almost at the same time (immediately after the gradients are applied), which may saturate the parameter servers’ bandwidth.

**Tip**: To reduce the waiting time at each step, we could ignore the gradients from the slowest few replicas (typically ~10%). As soon as the parameters are updated, the first 90% replicas can start working again immediately, without having to wait for the 10% slowest replicas. This setup is generally described as having 90% replicas plus 10% *spare replicas*.

##### Asynchronous updates
With asynchronous updates, whenever a replica has finished computing the gradients, the gradients are immediately used to update the model parameters. There is no aggregation and no synchronization. Since there is no waiting for the other replicas, this approach runs more training steps per minute. Moreover, although the parameters still need to be copied to every device at every step, this happens at different times for each replica, so the risk of bandwidth saturation is reduced.

However, although it works reasonably well in practice, it is almost surprising that it works at all! By the time a replica has finished computing the gradients based on some parameter values, these parameters will have been updated several times by other replicas (on average $N–1$ times, if there are $N$ replicas), and there is no guarantee that the computed gradients will still be pointing in the right direction. When gradients are severely out of date, they are called *stale* gradients: they can slow down convergence, introducing noise and wobble effects (the learning curve may contain temporary oscillations), or they can even make the training algorithm diverge.

<center>
  <img 
    src="../images/19/stale_gradients.png" 
    onerror="
      this.onerror = null;
      const repo = 'https://github.com/alirezatheh/handson-ml3-notes/blob/main';
      this.src = repo + this.src.split('..')[1];
    "
  >
</center>

There are a few ways we can reduce the effect of stale gradients:
- Reduce the learning rate.
- Drop stale gradients or scale them down.
- Adjust the mini-batch size.
- Start the first few epochs using just one replica (this is called the *warmup phase*). Stale gradients tend to be more damaging at the beginning of training, when gradients are typically large and the parameters have not settled into a valley of the cost function yet, so different replicas may push the parameters in quite different directions.

A paper [“Revisiting Distributed Synchronous SGD”](https://homl.info/68) published by Jianmin Chen et al. from the Google Brain team in 2016 benchmarked various approaches and found that using synchronous updates with a few spare replicas was more efficient than using asynchronous updates, not only converging faster but also producing a better model.

#### Bandwidth saturation
Unfortunately, there often comes a point where adding an extra GPU will not improve performance at all because the time spent moving the data into and out of GPU RAM (and across the network in a distributed setup) will outweigh the speedup obtained by splitting the computation load.

Saturation is more severe for large dense models, since they have a lot of parameters and gradients to transfer. It is less severe for small models (but the parallelization gain is limited) and for large sparse models, where the gradients are typically mostly zeros and so can be communicated efficiently.

A 2018 paper [“PipeDream: Fast and Efficient Pipeline Parallel DNN Training”](https://homl.info/pipedream) by Aaron Harlap et al. a team of researchers from Carnegie Mellon University, Stanford University, and Microsoft Research proposed a system called *PipeDream* that managed to reduce network communications by over 90%, making it possible to train large models across many machines. They achieved this using a new technique called *pipeline parallelism*, which combines model parallelism and data parallelism: the model is chopped into consecutive parts, called *stages*, each of which is trained on a different machine. This results in an asynchronous pipeline in which all machines work in parallel with very little idle time. During training, each stage alternates one round of forward propagation and one round of backpropagation: it pulls a mini-batch from its input queue, processes it, and sends the outputs to the next stage’s input queue, then it pulls one mini-batch of gradients from its gradient queue, backpropagates these gradients and updates its own model parameters, and pushes the backpropagated gradients to the previous stage’s gradient queue. Each stage can also use regular data parallelism (e.g., using the mirrored strategy), independently from the other stages.

<center>
  <img 
    src="../images/19/pipedream.png" 
    onerror="
      this.onerror = null;
      const repo = 'https://github.com/alirezatheh/handson-ml3-notes/blob/main';
      this.src = repo + this.src.split('..')[1];
    "
  >
</center>

As it’s presented here, PipeDream would not work so well. To understand why, consider mini-batch #5: when it went through stage 1 during the forward pass, the gradients from mini-batch #4 had not yet been backpropagated through that stage, but by the time #5’s gradients flow back to stage 1, #4’s gradients will have been used to update the model parameters, so #5’s gradients will be a bit stale. The paper’s authors proposed methods to mitigate this issue, though: e.g. each stage saves weights during forward propagation and restores them during backpropagation, to ensure that the same weights are used for both the forward pass and the backward pass. This is called *weight stashing*.

In a 2022 paper [“Pathways: Asynchronous Distributed Dataflow for ML”](https://homl.info/pathways) by Google researchers Paul Barham et al. they developed a system called *Pathways* that uses automated model parallelism, asynchronous gang scheduling, and other techniques to reach close to 100% hardware utilization across thousands of TPUs! *Scheduling* means organizing when and where each task must run, and *gang scheduling* means running related tasks at the same time in parallel and close to each other to reduce the time tasks have to wait for the others’ outputs. This system was used to train a massive language model across over 6,000 TPUs, with close to 100% hardware utilization: that’s a mindblowing engineering feat.

To reduce the saturation problem, we’ll probably want to use a few powerful GPUs rather than plenty of weak GPUs, and if we need to train a model across multiple servers, we should group our GPUs on few and very well interconnected servers. We can also try dropping the float precision from 32 bits (`tf.float32`) to 16 bits (`tf.bfloat16`). If we are using centralized parameters, we can shard (split) the parameters across multiple parameter servers: adding more parameter servers will reduce the network load on each server and limit the risk of bandwidth saturation.

### Training at Scale Using the Distribution Strategies API
To train a Keras model across all available GPUs (on a single machine) using data parallelism with the mirrored strategy, just create a `MirroredStrategy` object, call its `scope()` method to get a distribution context, and wrap the creation and compilation of our model inside that context. Then call the model’s `fit()` method normally:

In [61]:
# Creates a CNN model for MNIST using Keras


def create_model() -> keras.Model:
    return keras.Sequential(
        [
            keras.layers.Reshape(
                [28, 28, 1], input_shape=[28, 28], dtype=tf.uint8
            ),
            keras.layers.Rescaling(scale=1 / 255),
            keras.layers.Conv2D(
                filters=64, kernel_size=7, activation='relu', padding='same'
            ),
            keras.layers.MaxPooling2D(pool_size=2),
            keras.layers.Conv2D(
                filters=128, kernel_size=3, activation='relu', padding='same'
            ),
            keras.layers.Conv2D(
                filters=128, kernel_size=3, activation='relu', padding='same'
            ),
            keras.layers.MaxPooling2D(pool_size=2),
            keras.layers.Flatten(),
            keras.layers.Dense(units=64, activation='relu'),
            keras.layers.Dropout(0.5),
            keras.layers.Dense(units=10, activation='softmax'),
        ]
    )

In [62]:
keras.utils.set_random_seed(42)

strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    # Create a Keras model normally
    model = create_model()
    # Compile the model normally
    model.compile(
        loss='sparse_categorical_crossentropy',
        optimizer=keras.optimizers.SGD(learning_rate=1e-2),
        metrics=['accuracy'],
    )

# Preferably divisible by the number of replicas
batch_size = 100
model.fit(
    X_train,
    y_train,
    epochs=10,
    validation_data=(X_valid, y_valid),
    batch_size=batch_size,
)

In [63]:
type(model.weights[0])

tensorflow.python.distribute.values.MirroredVariable

Note that the `fit()` method will automatically split each training batch across all the replicas. Also calling the `predict()` method will automatically split the batch across all replicas, making predictions in parallel:

In [64]:
# The batch is split across all replicas
model.predict(X_new).round(2)

array([[0., 0., 0., 0., 0., 0., 0., 1., 0., 0.],
       [0., 0., 1., 0., 0., 0., 0., 0., 0., 0.],
       [0., 1., 0., 0., 0., 0., 0., 0., 0., 0.]], dtype=float32)


If we call the model’s `save()` method, it will be saved as a regular model, 
*not* as a mirrored model with multiple replicas:

In [65]:
# Shows that saving a model does not preserve its distribution strategy
model.save('my_mirrored_model', save_format='tf')
model = keras.models.load_model('my_mirrored_model')
type(model.weights[0])

INFO:tensorflow:Assets written to: my_mirrored_model/assets


tensorflow.python.ops.resource_variable_ops.ResourceVariable

In [66]:
with strategy.scope():
    model = keras.models.load_model('my_mirrored_model')

In [67]:
type(model.weights[0])

tensorflow.python.distribute.values.MirroredVariable


If we want to specify the list of GPUs to use:

In [68]:
strategy = tf.distribute.MirroredStrategy(devices=['/gpu:0', '/gpu:1'])

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')


By default, the `MirroredStrategy` class uses the *NVIDIA Collective Communications Library* (NCCL) for the AllReduce mean operation, but we can change it by setting the `cross_device_ops` argument to an instance of the `tf.distribute.HierarchicalCopyAllReduce` class, or an instance of the `tf.distribute.ReductionToOneDevice` class. The default NCCL option is based on the `tf.distribute.NcclAllReduce` class, which is usually faster, but this depends on the number and types of GPUs, so we may want to give the alternatives a try. For more details on AllReduce algorithms, read [Yuichiro Ueno’s post](https://homl.info/uenopost) on the technologies behind deep learning and [Sylvain Jeaugey’s post](https://homl.info/ncclalgo) on massively scaling deep learning training with NCCL.

In [69]:
strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce()
)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


If we want to use data parallelism with centralized parameters:

In [70]:
strategy = tf.distribute.experimental.CentralStorageStrategy()

INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:CPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:CPU:0'


We can optionally set the `compute_devices` argument to specify the list of devices we want to use as workers (by default it will use all available GPUs) and we can optionally set the `parameter_device` argument to specify the device we want to store the parameters on. By default it will use the CPU, or the GPU if there is just one.

### Training a Model on a TensorFlow Cluster
A *TensorFlow cluster* is a group of TensorFlow processes running in parallel, usually on different machines, and talking to each other to complete some work, e.g. training or executing a neural network model. Each TF process in the cluster is called a *task*, or a *TF server*. It has an IP address, a port, and a type (also called its *role* or its *job*, actually the set of tasks that share the same type is often called a *job*). The type can be either `'worker'`, `'chief'`, `'ps'` (parameter server), or `'evaluator'`:
- Each *worker* performs computations, usually on a machine with one or more GPUs.
- The *chief* performs computations as well (it is a worker), but it also handles extra work such as writing TensorBoard logs or saving checkpoints. There is a single chief in a cluster. If no chief is specified explicitly, then by convention the first worker is the chief.
- A *parameter server* only keeps track of variable values, and it is usually on a CPU-only machine. This type of task is only used with the `ParameterServerStrategy`.
- An *evaluator* obviously takes care of evaluation. This type is not used often, and when it’s used, there’s usually just one evaluator.

To start a TensorFlow cluster, we must first define it. This means specifying all the tasks (IP address, TCP port, and type). e.g. the following *cluster specification* defines a cluster with 3 tasks (2 workers and 1 parameter server). It’s a dictionary with one key per job, and the values are lists of task addresses (*IP*:*port*):

In [71]:
cluster_spec = {
    'worker': [
        # /job:worker/task:0
        'machine-a.example.com:2222',
        # /job:worker/task:1
        'machine-b.example.com:2222',
    ],
    # /job:ps/task:0
    'ps': ['machine-a.example.com:2221'],
}

In general there will be a single task per machine, but as this example shows, we can configure multiple tasks on the same machine if we want. In this case, if they share the same GPUs, make sure the RAM is split appropriately.

**Warning**: Every task in the cluster may communicate with every other task in the server, so make sure to configure our firewall to authorize all communications between these machines on these ports (it’s usually simpler if we use the same port on every machine).

When a task is started, it needs to be told which one it is: its type and index (the task index is also called the task id). A common way to specify everything at once (both the cluster spec and the current task’s type and id) is to set the `TF_CONFIG` environment variable before starting the program. It must be a JSON-encoded dictionary containing a cluster specification (under the `'cluster'` key), and the type and index of the task to start (under the `'task'` key). The following `TF_CONFIG` environment variable defines the same cluster as above, with 2 workers and 1 parameter server, and specifies that the task to start is worker \#0:

In [72]:
os.environ['TF_CONFIG'] = json.dumps(
    {'cluster': cluster_spec, 'task': {'type': 'worker', 'index': 0}}
)

**Tip**: In general we want to define the `TF_CONFIG` environment variable outside of Python, so the code does not need to include the current task’s type and index (this makes it possible to use the same code across all workers).

Some platforms (e.g., Google Vertex AI) automatically set this environment variable for us. TensorFlow’s `TFConfigClusterResolver` class reads the cluster configuration from this environment variable:

In [73]:
resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
resolver.cluster_spec()

ClusterSpec({'ps': ['machine-a.example.com:2221'], 'worker': ['machine-a.example.com:2222', 'machine-b.example.com:2222']})

In [74]:
resolver.task_type

'worker'

In [75]:
resolver.task_id

0

Now let’s run a simpler cluster with just two worker tasks, both running on the local machine. We will use the `MultiWorkerMirroredStrategy` to train a model across these two tasks.

The first step is to write the training code. As this code will be used to run both workers, each in its own process, we write this code to a separate Python file, `my_mnist_multiworker_task.py`. The code is relatively straightforward, but there are a couple important things to note:
- We create the `MultiWorkerMirroredStrategy` before doing anything else with TensorFlow.
- Only one of the workers will take care of logging to TensorBoard. As mentioned earlier, this worker is called the *chief*. When it is not defined explicitly, then by convention it is worker #0.

In [76]:
%%writefile my_mnist_multiworker_task.py

import tempfile
import tensorflow as tf

# At the start!
strategy = tf.distribute.MultiWorkerMirroredStrategy()
resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
print(f'Starting task {resolver.task_type} #{resolver.task_id}')


if resolver.task_id == 0:
    # The chief uses the right locations
    model_dir = 'my_mnist_multiworker_model'
    tensorboard_log_dir = 'my_mnist_multiworker_logs'
    checkpoint_dir = 'my_mnist_multiworker_checkpoints'
else:
    # Other workers use a temporary dirs
    tmp_dir = Path(tempfile.mkdtemp())
    model_dir = tmp_dir / 'model'
    tensorboard_log_dir = tmp_dir / 'logs'
    checkpoint_dir = tmp_dir / 'ckpt'

callbacks = [
    tf.keras.callbacks.TensorBoard(tensorboard_log_dir),
    tf.keras.callbacks.ModelCheckpoint(checkpoint_dir),
]

# Load and split the MNIST dataset
mnist = keras.datasets.mnist.load_data()
(X_train_full, y_train_full), (X_test, y_test) = mnist
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]

with strategy.scope():
    model = keras.Sequential(
        [
            keras.layers.Reshape(
                [28, 28, 1], input_shape=[28, 28], dtype=tf.uint8
            ),
            keras.layers.Rescaling(scale=1 / 255),
            keras.layers.Conv2D(
                filters=64,
                kernel_size=7,
                activation='relu',
                padding='same',
                input_shape=[28, 28, 1],
            ),
            keras.layers.MaxPooling2D(pool_size=2),
            keras.layers.Conv2D(
                filters=128, kernel_size=3, activation='relu', padding='same'
            ),
            keras.layers.Conv2D(
                filters=128, kernel_size=3, activation='relu', padding='same'
            ),
            keras.layers.MaxPooling2D(pool_size=2),
            keras.layers.Flatten(),
            keras.layers.Dense(units=64, activation='relu'),
            keras.layers.Dropout(0.5),
            keras.layers.Dense(units=10, activation='softmax'),
        ]
    )
    model.compile(
        loss='sparse_categorical_crossentropy',
        optimizer=keras.optimizers.SGD(learning_rate=1e-2),
        metrics=['accuracy'],
    )

model.fit(X_train, y_train, validation_data=(X_valid, y_valid), epochs=10)

model.save(model_dir, save_format='tf')

if resolver.task_id != 0:
    # And we can delete this directory at the end!
    tf.io.gfile.rmtree(tmp_dir)

Writing my_mnist_multiworker_task.py


**Warning**: When using the `MultiWorkerMirroredStrategy`, it’s important to ensure that all workers do the same thing, including saving model checkpoints or writing TensorBoard logs, even though we will only keep what the chief writes. This is because these operations may need to run the AllReduce operations, so all workers must be in sync.

Since we’re running both workers on the same machine, to avoid out-of-memory (OOM) error, we could use the `CUDA_VISIBLE_DEVICES` environment variable to assign a different GPU to each worker. Alternatively, we can simply disable GPU support, by setting `CUDA_VISIBLE_DEVICES` to an empty string.

We are now ready to start both workers, each in its own process. Notice that we change the task index:

In [77]:
%%bash --bg

export CUDA_VISIBLE_DEVICES=""
export TF_CONFIG="{
    'cluster': {'worker': ['127.0.0.1:9901', '127.0.0.1:9902']},
    'task': {'type': 'worker', 'index': 0}
}"
python my_mnist_multiworker_task.py > my_worker_0.log 2>&1

In [78]:
%%bash --bg

export CUDA_VISIBLE_DEVICES=""
export TF_CONFIG="{
    'cluster': {'worker': ['127.0.0.1:9901', '127.0.0.1:9902']},
    'task': {'type': 'worker', 'index': 1}
}"
python my_mnist_multiworker_task.py > my_worker_1.log 2>&1

**Note**: If we get warnings about `AutoShardPolicy`, we can safely ignore them. See [TF issue #42146](https://github.com/tensorflow/tensorflow/issues/42146) for more details.

That’s it! Our TensorFlow cluster is now running, but we can’t see it in this notebook because it’s running in separate processes (but we can see the progress in `my_worker_*.log`).

Since the chief (worker #0) is writing to TensorBoard, we use TensorBoard to view the training progress. Run the following cell, then click on the settings button (i.e., the gear icon) in the TensorBoard interface and check the “Reload data” box to make TensorBoard automatically refresh every 30s. Once the first epoch of training is finished (which may take a few minutes), and once TensorBoard refreshes, the SCALARS tab will appear. Click on this tab to view the progress of the model’s training and validation accuracy.

In [79]:
%load_ext tensorboard
%tensorboard --logdir=./my_mnist_multiworker_logs --port=6006

There are two AllReduce implementations for this distribution strategy: a ring AllReduce algorithm based on gRPC for the network communications, and NCCL’s implementation. The best algorithm to use depends on the number of workers, the number and types of GPUs, and the network. By default, TensorFlow will apply some heuristics to select the right algorithm for us, but we can force NCCL (or RING) like this:

In [80]:
# strategy = tf.distribute.MultiWorkerMirroredStrategy(
#     communication_options=tf.distribute.experimental.CommunicationOptions(
#         implementation=tf.distribute.experimental.CollectiveCommunication.NCCL
#     )
# )

If we prefer to implement asynchronous data parallelism with parameter servers, change the strategy to `ParameterServerStrategy`, add one or more parameter servers, and configure `TF_CONFIG` appropriately for each task.

If we have access to [TPUs on Google Cloud](https://cloud.google.com/tpu), e.g. if we use Colab and we set the accelerator type to TPU, then we can create a `TPUStrategy` like this:

In [81]:
# # To train on a TPU in Google Colab:
# if 'google.colab' in sys.modules and 'COLAB_TPU_ADDR' in os.environ:
#     tpu_address = 'grpc://' + os.environ['COLAB_TPU_ADDR']
# else:
#     tpu_address = ''
# resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
#     tpu_address
# )
# tf.config.experimental_connect_to_cluster(resolver)
# tf.tpu.experimental.initialize_tpu_system(resolver)
# strategy = tf.distribute.experimental.TPUStrategy(resolver)

**Tip**: We may be eligible to use TPUs for free for research purposes; see https://tensorflow.org/tfrc for more details.

### Running Large Training Jobs on Vertex AI
Let’s copy the training script, but instead of saving the model to a local directory, the chief must save things to GCS, using the paths provided by Vertex AI:

In [82]:
%%writefile my_vertex_ai_training_task.py

import os
from pathlib import Path
import tempfile
import tensorflow as tf

# At the start!
strategy = tf.distribute.MultiWorkerMirroredStrategy()
resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()

if resolver.task_type == 'chief':
    # Paths provided by Vertex AI
    model_dir = os.getenv('AIP_MODEL_DIR')
    tensorboard_log_dir = os.getenv('AIP_TENSORBOARD_LOG_DIR')
    checkpoint_dir = os.getenv('AIP_CHECKPOINT_DIR')
else:
    # Other workers use a temporary dirs
    tmp_dir = Path(tempfile.mkdtemp())
    model_dir = tmp_dir / 'model'
    tensorboard_log_dir = tmp_dir / 'logs'
    checkpoint_dir = tmp_dir / 'ckpt'

callbacks = [
    keras.callbacks.TensorBoard(tensorboard_log_dir),
    keras.callbacks.ModelCheckpoint(checkpoint_dir),
]

# Load and prepare the MNIST dataset
mnist = keras.datasets.mnist.load_data()
(X_train_full, y_train_full), (X_test, y_test) = mnist
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]

# Build and compile the Keras model using the distribution strategy
with strategy.scope():
    model = keras.Sequential(
        [
            keras.layers.Reshape(
                [28, 28, 1], input_shape=[28, 28], dtype=tf.uint8
            ),
            keras.layers.Lambda(lambda X: X / 255),
            keras.layers.Conv2D(
                filters=64,
                kernel_size=7,
                activation='relu',
                padding='same',
                input_shape=[28, 28, 1],
            ),
            keras.layers.MaxPooling2D(pool_size=2),
            keras.layers.Conv2D(
                filters=128, kernel_size=3, activation='relu', padding='same'
            ),
            keras.layers.Conv2D(
                filters=128, kernel_size=3, activation='relu', padding='same'
            ),
            keras.layers.MaxPooling2D(pool_size=2),
            keras.layers.Flatten(),
            keras.layers.Dense(units=64, activation='relu'),
            keras.layers.Dropout(0.5),
            keras.layers.Dense(units=10, activation='softmax'),
        ]
    )
    model.compile(
        loss='sparse_categorical_crossentropy',
        optimizer=keras.optimizers.SGD(learning_rate=1e-2),
        metrics=['accuracy'],
    )

model.fit(
    X_train,
    y_train,
    validation_data=(X_valid, y_valid),
    epochs=10,
    callbacks=callbacks,
)
model.save(model_dir, save_format='tf')

Writing my_vertex_ai_training_task.py


**Tip**: If we place the training data on GCS, we can create a `tf.data.TextLineDataset` or `tf.data.TFRecordDataset` to access it: just use the GCS paths as the filenames (e.g., *gs://my_bucket/data/001.csv*). These datasets rely on the `tf.io.gfile` package to access files: it supports both local files and GCS files.

Now we can create a custom training job on Vertex AI:

In [83]:
custom_training_job = aiplatform.CustomTrainingJob(
    display_name='my_custom_training_job',
    script_path='my_vertex_ai_training_task.py',
    # The Docker image to use for training
    container_uri='gcr.io/cloud-aiplatform/training/tf-gpu.2-4:latest',
    # The one to use for predictions (after training)
    model_serving_container_image_uri=server_image,
    # Not needed, this is just an example
    requirements=['gcsfs==2022.3.0'],
    # To store the training script, trained model, tb logs, model
    # checkpoints
    staging_bucket=f'gs://{bucket_name}/staging',
)

And now let’s run it on two workers, each with two GPUs:

In [84]:
mnist_model2 = custom_training_job.run(
    machine_type='n1-standard-4',
    replica_count=2,
    accelerator_type='NVIDIA_TESLA_K80',
    accelerator_count=2,
)

Training script copied to:
gs://my_bucket/aiplatform-2022-04-14-10:08:24.124-aiplatform_custom_trainer_script-0.1.tar.gz.
Training Output directory:
gs://my_bucket/aiplatform-custom-training-2022-04-14-10:08:25.226 
View Training:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/5407999068506947584?project=522977795627
CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state:
PipelineState.PIPELINE_STATE_PENDING
CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state:
PipelineState.PIPELINE_STATE_RUNNING
View backing custom job:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/6685701948726837248?project=522977795627
CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/5407999068506947584 current state:
PipelineState.PIPELINE_STATE_RUNNING
CustomTrainingJob projects/522977795627/locations/us-c

Vertex AI will provision the compute nodes we requested (within our quotas), and it will run our training script across them. Once the job is complete, the `run()` method will return a trained model: we can deploy it to an endpoint, or use it to make batch predictions. If anything goes wrong during training, we can view the logs in the GCP console: in the ☰ navigation menu, select Vertex AI → Training, click on our training job, and click VIEW LOGS. Alternatively, we can click the CUSTOM JOBS tab and copy the job’s ID (e.g., 1234), then select Logging from the ☰ navigation menu and query `resource.labels.job_id=1234`.

**Tip**: To visualize the training progress, just start TensorBoard and point its `--logdir` to the GCS path of the logs. It will use *application default credentials*, which we can set up using `gcloud auth application-default login`. Vertex AI also offers hosted TensorBoard servers if we prefer.

To try out a few hyperparameter values, one option is to run multiple jobs. we can pass the hyperparameter values to our script as command-line arguments by setting the `args` parameter when calling the `run()` method, or we can pass them as environment variables using the `environment_variables` parameter.

Let’s clean up:

In [85]:
mnist_model2.delete()
custom_training_job.delete()
blobs = bucket.list_blobs(prefix=f'gs://{bucket_name}/staging/')
for blob in blobs:
    blob.delete()

### Hyperparameter Tuning on Vertex AI
Vertex AI’s hyperparameter tuning service is based on a Bayesian optimization algorithm, capable of quickly finding optimal combinations of hyperparameters. To use it, we first need to create a training script that accepts hyperparameter values as command-line arguments.

The hyperparameter tuning service will call our script multiple times, each time with different hyperparameter values: each run is called a *trial*, and the set of trials is called a *study*. Our training script must then use the given hyperparameter values to build and compile a model. We can use a mirrored distribution strategy if we want, in case each trial runs on a multi-GPU machine. Then the script can load the dataset and train the model.

Lastly, the script must report the model’s performance back to Vertex AI’s hyperparameter tuning service, so it can decide which hyperparameters to try next. For this, we must use the `hypertune` library, which is automatically installed on Vertex AI training VMs:

In [86]:
%%writefile my_vertex_ai_trial.py

import os
import argparse

import tensorflow as tf
import hypertune


parser = argparse.ArgumentParser()
parser.add_argument('--n_hidden', type=int, default=2)
parser.add_argument('--n_neurons', type=int, default=256)
parser.add_argument('--learning_rate', type=float, default=1e-2)
parser.add_argument('--optimizer', default='adam')
args = parser.parse_args()


def build_model(args: argparse.Namespace) -> keras.Model:
    with tf.distribute.MirroredStrategy().scope():
        model = keras.Sequential()
        model.add(keras.layers.Flatten(input_shape=[28, 28], dtype=tf.uint8))
        for _ in range(args.n_hidden):
            model.add(keras.layers.Dense(args.n_neurons, activation='relu'))
        model.add(keras.layers.Dense(10, activation='softmax'))
        opt = keras.optimizers.get(args.optimizer)
        opt.learning_rate = args.learning_rate
        model.compile(
            loss='sparse_categorical_crossentropy',
            optimizer=opt,
            metrics=['accuracy'],
        )
        return model


# Loads and splits the dataset
mnist = keras.datasets.mnist.load_data()
(X_train_full, y_train_full), (X_test, y_test) = mnist
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]

# Use the AIP_* environment variable and create the callbacks
model_dir = os.getenv('AIP_MODEL_DIR')
tensorboard_log_dir = os.getenv('AIP_TENSORBOARD_LOG_DIR')
checkpoint_dir = os.getenv('AIP_CHECKPOINT_DIR')
trial_id = os.getenv('CLOUD_ML_TRIAL_ID')
tensorboard_cb = keras.callbacks.TensorBoard(tensorboard_log_dir)
early_stopping_cb = keras.callbacks.EarlyStopping(patience=5)
callbacks = [tensorboard_cb, early_stopping_cb]

model = build_model(args)
history = model.fit(
    X_train,
    y_train,
    validation_data=(X_valid, y_valid),
    epochs=10,
    callbacks=callbacks,
)
model.save(model_dir, save_format='tf')

hypertune = hypertune.HyperTune()
hypertune.report_hyperparameter_tuning_metric(
    # Name of the reported metric
    hyperparameter_metric_tag='accuracy',
    # Max accuracy value
    metric_value=max(history.history['val_accuracy']),
    global_step=model.optimizer.iterations.numpy(),
)

Writing my_vertex_ai_trial.py


We then must define a custom job, which Vertex AI will use as a template for each trial:

In [87]:
trial_job = aiplatform.CustomJob.from_local_script(
    display_name='my_search_trial_job',
    # Path to our training script
    script_path='my_vertex_ai_trial.py',
    container_uri='gcr.io/cloud-aiplatform/training/tf-gpu.2-4:latest',
    staging_bucket=f'gs://{bucket_name}/staging',
    accelerator_type='NVIDIA_TESLA_K80',
    # In this example, each trial will have 2 GPUs
    accelerator_count=2,
)

Training script copied to:
gs://homl3-mybucket5/staging/aiplatform-2022-04-18-18:14:02.860-aiplatform_custom_trainer_script-0.1.tar.gz.


Finally, we’re ready to create and run the hyperparameter tuning job:

In [88]:
from google.cloud.aiplatform import hyperparameter_tuning as hpt

hp_job = aiplatform.HyperparameterTuningJob(
    display_name='my_hp_search_job',
    custom_job=trial_job,
    metric_spec={'accuracy': 'maximize'},
    # Names must match the command-line arguments of the training script
    parameter_spec={
        'learning_rate': hpt.DoubleParameterSpec(
            min=1e-3, max=10, scale='log'
        ),
        'n_neurons': hpt.IntegerParameterSpec(min=1, max=300, scale='linear'),
        'n_hidden': hpt.IntegerParameterSpec(min=1, max=10, scale='linear'),
        'optimizer': hpt.CategoricalParameterSpec(['sgd', 'adam']),
    },
    max_trial_count=100,
    parallel_trial_count=20,
)
hp_job.run()

Creating HyperparameterTuningJob
HyperparameterTuningJob created. Resource name: projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568
To use this HyperparameterTuningJob in another session:
hpt_job = aiplatform.HyperparameterTuningJob.get('projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568')
View HyperparameterTuningJob:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/5825136187899117568?project=522977795627
HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state:
JobState.JOB_STATE_RUNNING
HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state:
JobState.JOB_STATE_RUNNING
HyperparameterTuningJob projects/522977795627/locations/us-central1/hyperparameterTuningJobs/5825136187899117568 current state:
JobState.JOB_STATE_RUNNING
HyperparameterTuningJ

Once the job is completed, we can fetch the trial results using `hp_job.trials`. Each trial result is represented as a protobuf object, containing the hyperparameter values and the resulting metrics. Let’s find the best trial:

In [89]:
from google.cloud.aiplatform.compat.types import study


def get_final_metric(trial: study.Trial, metric_id: str) -> float:
    for metric in trial.final_measurement.metrics:
        if metric.metric_id == metric_id:
            return metric.value


aiplatform.HyperparameterTuningJob().trials
trials = hp_job.trials
trial_accuracies = [get_final_metric(trial, 'accuracy') for trial in trials]
best_trial = trials[np.argmax(trial_accuracies)]

In [90]:
max(trial_accuracies)

0.977400004863739

In [91]:
best_trial.id

'98'

In [92]:
best_trial.parameters

[parameter_id: "learning_rate"
value {
  number_value: 0.001
}
, parameter_id: "n_hidden"
value {
  number_value: 8.0
}
, parameter_id: "n_neurons"
value {
  number_value: 216.0
}
, parameter_id: "optimizer"
value {
  string_value: "adam"
}
]

<div style="border: 1px solid;">

#### Hyperparameter Tuning Using Keras Tuner on Vertex AI
Instead of using Vertex AI’s hyperparameter tuning service, we can use [Keras Tuner](https://keras.io/keras_tuner/) and run it on Vertex AI VMs. Keras Tuner provides a simple way to scale hyperparameter search by distributing it across multiple machines: it only requires setting three environment variables on each machine, then running our regular Keras Tuner code on each machine. We can use the exact same script on all machines. One of the machines acts as the chief (i.e., the oracle), and the others act as workers. Each worker asks the chief which hyperparameter values to try, then the worker trains the model using these hyperparameter values, and finally it reports the model’s performance back to the chief, which can then decide which hyperparameter values the worker should try next.

The three environment variables we need to set on each machine are:
- `KERASTUNER_TUNER_ID`: Equal to `'chief'` on the chief machine, or a unique identifier on each worker machine, such as `'worker0'`, `'worker1'`, etc.
- `KERASTUNER_ORACLE_IP`: The IP address or hostname of the chief machine. The chief itself should generally use `'0.0.0.0'` to listen on every IP address on the machine.
- `KERASTUNER_ORACLE_PORT`: The TCP port that the chief will be listening on.

We can use distributed Keras Tuner on any set of machines. If we want to run it on Vertex AI machines, then we can spawn a regular training job, and just modify the training script to set the three environment variables properly before using Keras Tuner.
</div>

#### Extra Material – Implement Distributed Keras Tuner on Vertex AI
The script below starts by parsing the `TF_CONFIG` environment variable, which will be automatically set by Vertex AI, just like earlier. It finds the address of the task of type `'chief'`, and it extracts the IP address or hostname, and the TCP port. It then defines the tuner ID as the task type followed by the task index, for example `'worker0'`. If the tuner ID is `'chief0'`, it changes it to `'chief'`, and it sets the IP to `'0.0.0.0'`: this will make it listen on all IPv4 address on its machine. Then it defines the environment variables for Keras Tuner. Next, the script creates a tuner, then it runs the search, and finally it saves the best model to the location given by Vertex AI:

In [93]:
%%writefile my_keras_tuner_search.py

import json
import os
from pathlib import Path

import keras_tuner as kt
import tensorflow as tf

tf_config = json.loads(os.environ['TF_CONFIG'])

chief_ip, chief_port = tf_config['cluster']['chief'][0].rsplit(':', 1)
tuner_id = f"{tf_config['task']['type']}{tf_config['task']['index']}"
if tuner_id == 'chief0':
    tuner_id = 'chief'
    chief_ip = '0.0.0.0'
    # Since the chief doesn’t work much, we can optimize compute
    # resources by running a worker on the same machine. To do this, we
    # can just make the chief start another process, after tweaking the
    # TF_CONFIG environment variable to set the task type to 'worker'
    # and the task index to a unique value. Uncomment the next few lines
    # to give this a try:
    # import subprocess
    # import sys

    # # The worker on the chief’s machine
    # tf_config['task']['type'] = 'workerX'
    # os.environ['TF_CONFIG'] = json.dumps(tf_config)
    # subprocess.Popen(
    #     [sys.executable] + sys.argv,
    #     stdout=sys.stdout,
    #     stderr=sys.stderr,
    # )

os.environ['KERASTUNER_TUNER_ID'] = tuner_id
os.environ['KERASTUNER_ORACLE_IP'] = chief_ip
os.environ['KERASTUNER_ORACLE_PORT'] = chief_port

# Replace with our bucket’s name
gcs_path = '/gcs/my_bucket/my_hp_search'


def build_model(hp: kt.HyperParameters) -> keras.Model:
    n_hidden = hp.Int('n_hidden', min_value=0, max_value=8, default=2)
    n_neurons = hp.Int('n_neurons', min_value=16, max_value=256)
    learning_rate = hp.Float(
        'learning_rate', min_value=1e-4, max_value=1e-2, sampling='log'
    )
    optimizer = hp.Choice('optimizer', values=['sgd', 'adam'])
    if optimizer == 'sgd':
        optimizer = keras.optimizers.SGD(learning_rate=learning_rate)
    else:
        optimizer = keras.optimizers.Adam(learning_rate=learning_rate)

    model = keras.Sequential()
    model.add(keras.layers.Flatten(input_shape=[28, 28], dtype=tf.uint8))
    for _ in range(n_hidden):
        model.add(keras.layers.Dense(n_neurons, activation='relu'))
    model.add(keras.layers.Dense(10, activation='softmax'))
    model.compile(
        loss='sparse_categorical_crossentropy',
        optimizer=optimizer,
        metrics=['accuracy'],
    )
    return model


hyperband_tuner = kt.Hyperband(
    build_model,
    objective='val_accuracy',
    seed=42,
    max_epochs=10,
    factor=3,
    hyperband_iterations=2,
    distribution_strategy=tf.distribute.MirroredStrategy(),
    directory=gcs_path,
    project_name='mnist',
)

# Load and split the MNIST dataset
mnist = keras.datasets.mnist.load_data()
(X_train_full, y_train_full), (X_test, y_test) = mnist
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]

tensorboard_log_dir = os.environ['AIP_TENSORBOARD_LOG_DIR'] + '/' + tuner_id
tensorboard_cb = keras.callbacks.TensorBoard(tensorboard_log_dir)
early_stopping_cb = keras.callbacks.EarlyStopping(patience=5)
hyperband_tuner.search(
    X_train,
    y_train,
    epochs=10,
    validation_data=(X_valid, y_valid),
    callbacks=[tensorboard_cb, early_stopping_cb],
)

if tuner_id == 'chief':
    best_hp = hyperband_tuner.get_best_hyperparameters()[0]
    best_model = hyperband_tuner.hypermodel.build(best_hp)
    best_model.save(os.getenv('AIP_MODEL_DIR'), save_format='tf')

Writing my_keras_tuner_search.py


Note that Vertex AI automatically mounts the `/gcs` directory to GCS, using the open source [GCS Fuse adapter](https://cloud.google.com/storage/docs/gcs-fuse). This gives us a shared directory across the workers and the chief, which is required by Keras Tuner. Also note that we set the distribution strategy to a `MirroredStrategy`. This will allow each worker to use all the GPUs on its machine, if there’s more than one.

Replace `/gcs/my_bucket/` with <code>/gcs/<i>{bucket_name}</i>/</code>:

In [94]:
with open('my_keras_tuner_search.py') as f:
    script = f.read()

with open('my_keras_tuner_search.py', 'w') as f:
    f.write(script.replace('/gcs/my_bucket/', f'/gcs/{bucket_name}/'))

Now all we need to do is to start a custom training job based on this script. Don’t forget to add `keras-tuner` to the list of `requirements`:

In [95]:
hp_search_job = aiplatform.CustomTrainingJob(
    display_name='my_hp_search_job',
    script_path='my_keras_tuner_search.py',
    container_uri='gcr.io/cloud-aiplatform/training/tf-gpu.2-4:latest',
    model_serving_container_image_uri=server_image,
    requirements=['keras-tuner~=1.1.2'],
    staging_bucket=f'gs://{bucket_name}/staging',
)

In [96]:
mnist_model3 = hp_search_job.run(
    machine_type='n1-standard-4',
    replica_count=3,
    accelerator_type='NVIDIA_TESLA_K80',
    accelerator_count=2,
)

Training script copied to:
gs://my_bucket/staging/aiplatform-2022-04-15-13:34:32.591-aiplatform_custom_trainer_script-0.1.tar.gz.
Training Output directory:
gs://my_bucket/staging/aiplatform-custom-training-2022-04-15-13:34:34.453 
View Training:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/8601543785521872896?project=522977795627
View backing custom job:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/5022607048831926272?project=522977795627
CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:
PipelineState.PIPELINE_STATE_RUNNING
CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:
PipelineState.PIPELINE_STATE_RUNNING
CustomTrainingJob projects/522977795627/locations/us-central1/trainingPipelines/8601543785521872896 current state:
PipelineState.PIPELINE_STATE_RUNNING
CustomTrainingJob projects/52297779562

And we have a model!

Let’s clean up:

In [97]:
mnist_model3.delete()
hp_search_job.delete()
blobs = bucket.list_blobs(prefix=f'gs://{bucket_name}/staging/')
for blob in blobs:
    blob.delete()

### Extra Material – Using AutoML to Train a Model
Let’s start by exporting the MNIST dataset to PNG images, and prepare an `import.csv` pointing to each image, and indicating the split (training, validation, or test) and the label:

In [98]:
import matplotlib.pyplot as plt

mnist_path = Path('datasets/mnist')
mnist_path.mkdir(parents=True, exist_ok=True)
idx = 0
with open(mnist_path / 'import.csv', 'w') as import_csv:
    for split, X, y in zip(
        ('training', 'validation', 'test'),
        (X_train, X_valid, X_test),
        (y_train, y_valid, y_test),
    ):
        for image, label in zip(X, y):
            print(f'\r{idx + 1}/70000', end='')
            filename = f'{idx:05d}.png'
            plt.imsave(mnist_path / filename, np.tile(image, 3))
            line = f'{split},gs://{bucket_name}/mnist/{filename},{label}\n'
            import_csv.write(line)
            idx += 1

70000/70000

Let’s upload this dataset to GCS:

In [99]:
upload_directory(bucket, mnist_path)

Uploaded datasets/mnist                                              


Now let’s create a managed image dataset on Vertex AI:

In [100]:
from aiplatform.schema.dataset.ioformat.image import (
    single_label_classification,
)

mnist_dataset = aiplatform.ImageDataset.create(
    display_name='mnist-dataset',
    gcs_source=[f'gs://{bucket_name}/mnist/import.csv'],
    project=project_id,
    import_schema_uri=single_label_classification,
    sync=True,
)

Creating ImageDataset
Create ImageDataset backing LRO: projects/522977795627/locations/us-central1/datasets/7532459492777132032/operations/3812233931370004480
ImageDataset created. Resource name: projects/522977795627/locations/us-central1/datasets/7532459492777132032
To use this ImageDataset in another session:
ds = aiplatform.ImageDataset('projects/522977795627/locations/us-central1/datasets/7532459492777132032')
Importing ImageDataset data: projects/522977795627/locations/us-central1/datasets/7532459492777132032
Import ImageDataset data backing LRO: projects/522977795627/locations/us-central1/datasets/7532459492777132032/operations/3010593197698056192
ImageDataset data imported. Resource name: projects/522977795627/locations/us-central1/datasets/7532459492777132032


Create an AutoML training job on this dataset:

In [101]:
# TODO

## Exercises
1. What does a SavedModel contain? How do we inspect its content?
> A SavedModel contains a TensorFlow model, including its architecture (a computation graph) and its weights. It is stored as a directory containing a *saved_model.pb* file, which defines the computation graph (represented as a serialized protocol buffer), and a *variables* subdirectory containing the variable values. For models containing a large number of weights, these variable values may be split across multiple files. A SavedModel also includes an *assets* subdirectory that may contain additional data, such as vocabulary files, class names, or some example instances for this model. To be more accurate, a SavedModel can contain one or more *metagraphs*. A metagraph is a computation graph plus some function signature definitions (including their input and output names, types, and shapes). Each metagraph is identified by a set of tags. To inspect a SavedModel, we can use the command-line tool `saved_model_cli` or just load it using `tf.saved_model.load()` and inspect it in Python.
2. When should we use TF Serving? What are its main features? What are some tools we can use to deploy it?
> TF Serving allows us to deploy multiple TensorFlow models (or multiple versions of the same model) and make them accessible to all our applications easily via a REST API or a gRPC API. Using our models directly in our applications would make it harder to deploy a new version of a model across all applications. Implementing our own microservice to wrap a TF model would require extra work, and it would be hard to match TF Serving’s features. TF Serving has many features: it can monitor a directory and autodeploy the models that are placed there, and we won’t have to change or even restart any of our applications to benefit from the new model versions; it’s fast, well tested, and scales very well; and it supports A/B testing of experimental models and deploying a new model version to just a subset of our users (in this case the model is called a *canary*). TF Serving is also capable of grouping individual requests into batches to run them jointly on the GPU. To deploy TF Serving, we can install it from source, but it is much simpler to install it using a Docker image. To deploy a cluster of TF Serving Docker images, we can use an orchestration tool such as Kubernetes, or use a fully hosted solution such as Google Vertex AI.
3. How do we deploy a model across multiple TF Serving instances?
> All we need to do is configure these TF Serving instances to monitor the same *models* directory, and then export our new model as a SavedModel into a subdirectory.
4. When should we use the gRPC API rather than the REST API to query a model served by TF Serving?
> The gRPC API is more efficient than the REST API. However, its client libraries are not as widely available, and if we activate compression when using the REST API, we can get almost the same performance. So, the gRPC API is most useful when we need the highest possible performance and the clients are not limited to the REST API.
5. What are the different ways TFLite reduces a model’s size to make it run on a mobile or embedded device?
> - It provides a converter which can optimize a SavedModel: it shrinks the model and reduces its latency. To do this, it prunes all the operations that are not needed to make predictions (such as training operations), and it optimizes and fuses operations whenever possible.
> - The converter can also perform post-training quantization: this technique dramatically reduces the model’s size, so it’s much faster to download and store.
> - It saves the optimized model using the FlatBuffer format, which can be loaded to RAM directly, without parsing. This reduces the loading time and memory footprint.
6. What is quantization-aware training, and why would we need it?
> Quantization-aware training consists in adding fake quantization operations to the model during training. This allows the model to learn to ignore the quantization noise; the final weights will be more robust to quantization.
7. What are model parallelism and data parallelism? Why is the latter generally recommended?
> Model parallelism means chopping our model into multiple parts and running them in parallel across multiple devices, hopefully speeding up the model during training or inference. Data parallelism means creating multiple exact replicas of our model and deploying them across multiple devices. At each iteration during training, each replica is given a different batch of data, and it computes the gradients of the loss with regard to the model parameters. In synchronous data parallelism, the gradients from all replicas are then aggregated and the optimizer performs a gradient descent step. The parameters may be centralized (e.g., on parameter servers) or replicated across all replicas and kept in sync using AllReduce. In asynchronous data parallelism, the parameters are centralized and the replicas run independently from each other, each updating the central parameters directly at the end of each training iteration, without having to wait for the other replicas. To speed up training, data parallelism turns out to work better than model parallelism, in general. This is mostly because it requires less communication across devices. Moreover, it is much easier to implement, and it works the same way for any model, whereas model parallelism requires analyzing the model to determine the best way to chop it into pieces. That said, research in this domain is making quick progress (e.g., PipeDream or Pathways), so a mix of model parallelism and data parallelism is probably the way forward.
8. When training a model across multiple servers, what distribution strategies can we use? How do we choose which one to use?
> - The `MultiWorkerMirroredStrategy` performs mirrored data parallelism. The model is replicated across all available servers and devices, and each replica gets a different batch of data at each training iteration and computes its own gradients. The mean of the gradients is computed and shared across all replicas using a distributed AllReduce implementation (NCCL by default), and all replicas perform the same gradient descent step. This strategy is the simplest to use since all servers and devices are treated in exactly the same way, and it performs fairly well. In general, we should use this strategy. Its main limitation is that it requires the model to fit in RAM on every replica.
> - The `ParameterServerStrategy` performs asynchronous data parallelism. The model is replicated across all devices on all workers, and the parameters are sharded across all parameter servers. Each worker has its own training loop, running asynchronously with the other workers; at each training iteration, each worker gets its own batch of data and fetches the latest version of the model parameters from the parameter servers, then it computes the gradients of the loss with regard to these parameters, and it sends them to the parameter servers. Lastly, the parameter servers perform a gradient descent step using these gradients. This strategy is generally slower than the previous strategy, and a bit harder to deploy, since it requires managing parameter servers. However, it can be useful in some situations, especially when we can take advantage of the asynchronous updates, e.g. to reduce I/O bottlenecks. This depends on many factors, including hardware, network topology, number of servers, model size, and more, so our mileage may vary.
9. Train a model (any model we like) and deploy it to TF Serving or Google Vertex AI. Write the client code to query it using the REST API or the gRPC API. Update the model and deploy the new version. Our client code will now query the new version. Roll back to the first version
> Please follow the steps in the [Using TensorFlow Serving](#using-tensorflow-serving) section above.
10. Train any model across multiple GPUs on the same machine using the `MirroredStrategy` (if we do not have access to GPUs, we can use Google Colab with a GPU Runtime and create two virtual GPUs). Train the model again using the `CentralStorageStrategy `and compare the training time.
> Please follow the steps in the [Training at Scale Using the Distribution Strategies API](#training-at-scale-using-the-distribution-strategies-api) section above.
11. Fine-tune a model of our choice on Vertex AI, using either Keras Tuner or Vertex AI’s hyperparameter tuning service.
> Please follow the instructions in the [Hyperparameter Tuning on Vertex AI](#hyperparameter-tuning-on-vertex-ai) section above.