# Deploy and Distribute TensorFlow

In this notebook you will learn how to deploy TensorFlow models to TensorFlow Serving (TFS), using the REST API or the gRPC API, and how to train a model across multiple devices.

<table align="left">
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/ageron/tf2_course/blob/master/04_deploy_and_distribute_tf2.ipynb"><img src="https://www.tensorflow.org/images/colab_logo_32px.png" />Run in Google Colab</a>
  </td>
</table>

## Imports

In [1]:
%matplotlib inline

In [2]:
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import sklearn
import sys
import tensorflow as tf
from tensorflow import keras
import time

2023-05-13 16:41:55.554702: I tensorflow/core/util/util.cc:169] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


In [3]:
print("python", sys.version)
for module in mpl, np, pd, sklearn, tf, keras:
    print(module.__name__, module.__version__)

python 3.7.12 | packaged by conda-forge | (default, Oct 26 2021, 06:08:21) 
[GCC 9.4.0]
matplotlib 3.5.3
numpy 1.21.6
pandas 1.3.5
sklearn 1.0.2
tensorflow 2.9.0
keras.api._v2.keras 2.9.0


In [4]:
assert sys.version_info >= (3, 5) # Python ≥3.5 required
assert tf.__version__ >= "2.0"    # TensorFlow ≥2.0 required

In [5]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
            # logical_gpus = tf.config.experimental.list_logical_devices('GPU')
            # print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

![Exercise](https://c1.staticflickr.com/9/8101/8553474140_c50cf08708_b.jpg)

## Exercise 1 – Deploying a Model to TensorFlow Serving

## Save/Load a `SavedModel`

In [6]:
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.fashion_mnist.load_data()
X_train_full = X_train_full / 255.
X_test = X_test / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]

In [7]:
model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28]),
    keras.layers.Dense(100, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=keras.optimizers.SGD(lr=1e-3),
              metrics=["accuracy"])
model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

2023-05-13 16:42:11.941035: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-05-13 16:42:16.325690: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 20661 MB memory:  -> device: 0, name: NVIDIA L4, pci bus id: 0000:00:03.0, compute capability: 8.9
2023-05-13 16:42:16.329883: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:1 with 20661 MB memory:  -> device: 1, name: NVIDIA L4, pci bus id: 0000:00:04.0, compute capability: 8.9
2023-05-13 16:42:16.333248: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:G

Epoch 1/10
  81/1719 [>.............................] - ETA: 3s - loss: 2.3213 - accuracy: 0.0995

2023-05-13 16:42:17.959522: I tensorflow/stream_executor/cuda/cuda_blas.cc:1786] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.


Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7ff56808f0d0>

In [8]:
MODEL_NAME = "my_fashion_mnist"
!rm -rf {MODEL_NAME}

In [9]:
import time

model_version = int(time.time())
model_path = os.path.join(MODEL_NAME, str(model_version))
os.makedirs(model_path)

In [10]:
tf.saved_model.save(model, model_path)

INFO:tensorflow:Assets written to: my_fashion_mnist/1683996175/assets


In [11]:
for root, dirs, files in os.walk(MODEL_NAME):
    indent = '    ' * root.count(os.sep)
    print('{}{}/'.format(indent, os.path.basename(root)))
    for filename in files:
        print('{}{}'.format(indent + '    ', filename))

my_fashion_mnist/
    1683996175/
        saved_model.pb
        assets/
        variables/
            variables.data-00000-of-00001
            variables.index


In [12]:
!saved_model_cli show --dir {model_path}

The given SavedModel contains the following tag-sets:
'serve'


In [13]:
!saved_model_cli show --dir {model_path} --tag_set serve

The given SavedModel MetaGraphDef contains SignatureDefs with the following keys:
SignatureDef key: "__saved_model_init_op"
SignatureDef key: "serving_default"


In [14]:
!saved_model_cli show --dir {model_path} --tag_set serve \
                      --signature_def serving_default

The given SavedModel SignatureDef contains the following input(s):
  inputs['flatten_input'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 28, 28)
      name: serving_default_flatten_input:0
The given SavedModel SignatureDef contains the following output(s):
  outputs['dense_1'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 10)
      name: StatefulPartitionedCall:0
Method name is: tensorflow/serving/predict


In [15]:
!saved_model_cli show --dir {model_path} --all


MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['__saved_model_init_op']:
  The given SavedModel SignatureDef contains the following input(s):
  The given SavedModel SignatureDef contains the following output(s):
    outputs['__saved_model_init_op'] tensor_info:
        dtype: DT_INVALID
        shape: unknown_rank
        name: NoOp
  Method name is: 

signature_def['serving_default']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['flatten_input'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 28, 28)
        name: serving_default_flatten_input:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['dense_1'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 10)
        name: StatefulPartitionedCall:0
  Method name is: tensorflow/serving/predict

Concrete Functions:
  Function Name: '__call__'
    Option #1
      Callable with:
        Argument #1
          flatten_

**Warning**: if you are using an old version of TensorFlow, the method name for the `serving_default` signature may be empty, due to [TensorFlow issue #25235](https://github.com/tensorflow/tensorflow/issues/25235). In this case, you need to use `keras.experimental.export()` instead of `tf.saved_model.save()` (or upgrade to the latest TensorFlow version).

Let's write a few test instances to a `npy` file so we can pass them easily to our model:

In [16]:
X_new = X_test[:3]
np.save("my_fashion_mnist_tests.npy", X_new, allow_pickle=False)

In [17]:
input_name = model.input_names[0]
input_name

'flatten_input'

And now let's use `saved_model_cli` to make predictions for the instances we just saved:

In [18]:
!saved_model_cli run --dir {model_path} --tag_set serve \
                     --signature_def serving_default    \
                     --inputs {input_name}=my_fashion_mnist_tests.npy

Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.loader.load or tf.compat.v1.saved_model.load. There will be a new function for importing SavedModels in Tensorflow 2.0.
INFO:tensorflow:Restoring parameters from my_fashion_mnist/1683996175/variables/variables
Result for output key dense_1:
[[5.09453857e-05 8.94931509e-05 3.26894078e-04 3.21647327e-04
  3.58094490e-04 2.20947385e-01 2.31091341e-04 2.37283036e-01
  1.34908715e-02 5.26900530e-01]
 [3.61943530e-04 2.66755487e-05 8.83024633e-01 2.78247811e-04
  1.85168218e-02 7.41482609e-06 9.75365043e-02 4.13040738e-07
  2.44038631e-04 3.38524069e-06]
 [1.20029268e-04 9.98825610e-01 1.38644362e-04 4.02376609e-04
  4.89923754e-04 2.88232059e-07 1.72519121e-05 2.37191512e-06
  1.34158211e-06 2.13890212e-06]]


## TensorFlow Serving

Install [Docker](https://docs.docker.com/install/) if you don't have it already. Then run:

```bash
docker pull tensorflow/serving

docker run -it --rm -p 8501:8501 \
   -v "`pwd`/my_fashion_mnist:/models/my_fashion_mnist" \
   -e MODEL_NAME=my_fashion_mnist \
   tensorflow/serving
```

Once you are finished using it, press Ctrl-C to shut down the server.

In [19]:
import json

input_data_json = json.dumps({
    "signature_name": "serving_default",
    "instances": X_new.tolist(),
})
print(input_data_json[:200] + "..." + input_data_json[-200:])

{"signature_name": "serving_default", "instances": [[[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], [0.0,... 0.0, 0.3843137254901961, 0.6235294117647059, 0.2784313725490196, 0.0, 0.0, 0.26666666666666666, 0.6901960784313725, 0.6431372549019608, 0.22745098039215686, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]]]}


Now let's use TensorFlow Serving's REST API to make predictions:

In [21]:
import requests

SERVER_URL = 'http://localhost:8501/v1/models/my_fashion_mnist:predict'
            
response = requests.post(SERVER_URL, data=input_data_json)
response.raise_for_status()
response = response.json()

ConnectionError: HTTPConnectionPool(host='localhost', port=8501): Max retries exceeded with url: /v1/models/my_fashion_mnist:predict (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff671006d90>: Failed to establish a new connection: [Errno 111] Connection refused'))

In [None]:
response.keys()

In [None]:
y_proba = np.array(response["predictions"])
y_proba.round(2)

### Using Serialized Examples

In [22]:
serialized = []
for image in X_new:
    image_data = tf.train.FloatList(value=image.ravel())
    features = tf.train.Features(
        feature={
            "image": tf.train.Feature(float_list=image_data),
        }
    )
    example = tf.train.Example(features=features)
    serialized.append(example.SerializeToString())

In [23]:
[data[:100]+b'...' for data in serialized]

[b'\n\xd3\x18\n\xd0\x18\n\x05image\x12\xc6\x18\x12\xc3\x18\n\xc0\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00...',
 b'\n\xd3\x18\n\xd0\x18\n\x05image\x12\xc6\x18\x12\xc3\x18\n\xc0\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xd1\xd0P=\x87\x86\x86>\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xc9\xc8H>\x99\x98\x18>\x00\x00\x00\x00\x00\x00...',
 b'\n\xd3\x18\n\xd0\x18\n\x05image\x12\xc6\x18\x12\xc3\x18\n\xc0\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x81\x80\x80;\x00\x00\x00\x00\x87\x86\x86>\xb2\xb1

In [24]:
def parse_images(serialized):
    expected_features = {
        "image": tf.io.FixedLenFeature([28 * 28], dtype=tf.float32)
    }
    examples = tf.io.parse_example(serialized, expected_features)
    return tf.reshape(examples["image"], (-1, 28, 28))

In [25]:
parse_images(serialized)

<tf.Tensor: shape=(3, 28, 28), dtype=float32, numpy=
array([[[0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        ...,
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.]],

       [[0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        ...,
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.]],

       [[0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        ...,
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.]]], dtype=float32)>

In [26]:
serialized_inputs = keras.layers.Input(shape=[], dtype=tf.string)
images = keras.layers.Lambda(lambda serialized: parse_images(serialized))(serialized_inputs)
y_proba = model(images)
ser_model = keras.models.Model(inputs=[serialized_inputs], outputs=[y_proba])

In [27]:
SER_MODEL_NAME = "my_ser_fashion_mnist"

ser_model_version = int(time.time())
ser_model_path = os.path.join(SER_MODEL_NAME, str(ser_model_version))
os.makedirs(ser_model_path)
tf.saved_model.save(ser_model, ser_model_path)

INFO:tensorflow:Assets written to: my_ser_fashion_mnist/1683996429/assets


In [28]:
!saved_model_cli show --dir {ser_model_path} --all


MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['__saved_model_init_op']:
  The given SavedModel SignatureDef contains the following input(s):
  The given SavedModel SignatureDef contains the following output(s):
    outputs['__saved_model_init_op'] tensor_info:
        dtype: DT_INVALID
        shape: unknown_rank
        name: NoOp
  Method name is: 

signature_def['serving_default']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['input_1'] tensor_info:
        dtype: DT_STRING
        shape: (-1)
        name: serving_default_input_1:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['sequential'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 10)
        name: StatefulPartitionedCall:0
  Method name is: tensorflow/serving/predict

Concrete Functions:
  Function Name: '__call__'
    Option #1
      Callable with:
        Argument #1
          input_1: TensorSpec(shap

```bash
docker run -it --rm -p 8500:8500 -p 8501:8501 \
   -v "`pwd`/my_ser_fashion_mnist:/models/my_ser_fashion_mnist" \
   -e MODEL_NAME=my_ser_fashion_mnist \
   tensorflow/serving
```

In [29]:
import base64
import json

ser_input_data_json = json.dumps({
    "signature_name": "serving_default",
    "instances": [{"b64": base64.b64encode(data).decode("utf-8")}
                  for data in serialized],
})
print(ser_input_data_json[:200] + "..." + ser_input_data_json[-200:])

{"signature_name": "serving_default", "instances": [{"b64": "CtMYCtAYCgVpbWFnZRLGGBLDGArAGAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA...7j4AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAxcTEPqCfHz+Pjo4+AAAAAAAAAACJiIg+sbAwP6WkJD/p6Gg+AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="}]}


In [30]:
import requests

SER_SERVER_URL = 'http://localhost:8501/v1/models/my_ser_fashion_mnist:predict'
            
response = requests.post(SER_SERVER_URL, data=ser_input_data_json)
response.raise_for_status()
response = response.json()

ConnectionError: HTTPConnectionPool(host='localhost', port=8501): Max retries exceeded with url: /v1/models/my_ser_fashion_mnist:predict (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff670e6b950>: Failed to establish a new connection: [Errno 111] Connection refused'))

In [None]:
response.keys()

In [None]:
y_proba = np.array(response["predictions"])
y_proba.round(2)

In [None]:
!python3 -m pip install --no-deps tensorflow-serving-api

In [None]:
import grpc
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc

channel = grpc.insecure_channel('localhost:8500')
predict_service = prediction_service_pb2_grpc.PredictionServiceStub(channel)

request = predict_pb2.PredictRequest()
request.model_spec.name = SER_MODEL_NAME
request.model_spec.signature_name = "serving_default"
input_name = ser_model.input_names[0]
request.inputs[input_name].CopyFrom(tf.compat.v1.make_tensor_proto(serialized))

result = predict_service.Predict(request, 10.0)

In [None]:
result

In [None]:
output_name = ser_model.output_names[0]
output_name

In [None]:
shape = [dim.size for dim in result.outputs[output_name].tensor_shape.dim]
shape

In [None]:
y_proba = np.array(result.outputs[output_name].float_val).reshape(shape)
y_proba.round(2)

![Exercise](https://c1.staticflickr.com/9/8101/8553474140_c50cf08708_b.jpg)

## Exercise 2 – Distributed Training

In [None]:
keras.backend.clear_session()

In [None]:
distribution = tf.distribute.MirroredStrategy()

with distribution.scope():
    model = keras.models.Sequential([
        keras.layers.Flatten(input_shape=[28, 28]),
        keras.layers.Dense(100, activation="relu"),
        keras.layers.Dense(10, activation="softmax")
    ])
    model.compile(loss="sparse_categorical_crossentropy",
                  optimizer=keras.optimizers.SGD(lr=1e-3),
                  metrics=["accuracy"])

In [None]:
model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid), batch_size=25)