## Tensorflow Model Serving

### Using TF Serving

In [None]:
# make and save model
from pathlib import Path
import tensorflow as tf

mnist = tf.keras.datasets.mnist.load_data()
(X_train_full, y_train_full), (X_test, y_test) = mnist
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]

tf.random.set_seed(42)
tf.keras.backend.clear_session()
model = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=[28, 28], dtype=tf.uint8),
    tf.keras.layers.Rescaling(scale=1 / 255),
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),
              metrics=["accuracy"])
model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

model_name = "my_mnist_model"
model_version = "0001"
model_path = Path(model_name) / model_version
model.save(model_path, save_format="tf")

In [None]:
# inspect SavedModel.
# Output will be a 'tag', which is a classification of metagraph(calculation graph + function signature(e.g. type, input & output size)).
!saved_model_cli show --dir '{model_path}'

In [None]:
# Look at the 'tag set' above.
# Output will be a two signature definition, '__saved_model_init_op' and 'serving_default'.
!saved_model_cli show --dir '{model_path}' --tag_set serve

In [None]:
# Look closely at the basic serving function 'serving_default'.
!saved_model_cli show --dir '{model_path}' --tag_set serve \
                      --signature_def serving_default

In [None]:
# install tensorflow serving
url = "https://storage.googleapis.com/tensorflow-serving-apt"
src = "stable tensorflow-model-server tensorflow-model-server-universal"
!echo 'deb {url} {src}' > /etc/apt/sources.list.d/tensorflow-serving.list
!curl '{url}/tensorflow-serving.release.pub.gpg' | apt-key add -
!apt update -q && apt-get install -y tensorflow-model-server
%pip install -q -U tensorflow-serving-api==2.11.1

In [None]:
import os

os.environ["MODEL_DIR"] = str(model_path.parent.absolute())

In [None]:
# implement server
%%bash --bg
tensorflow_model_server \
    --port=8500 \
    --rest_api_port=8501 \
    --model_name=my_mnist_model \
    --model_base_path="${MODEL_DIR}" >my_server.log 2>&1

In [None]:
# query to TF serving using REST API
# make a request
import json

X_new = X_test[:3]
request_json = json.dumps({
    "signature_name": "serving_default",
    "instances": X_new.tolist()
})

In [None]:
# json is 100% text
request_json

In [None]:
# deliver request data to TF serving using HTTP POST method
import requests

server_url = "http://localhost:8501/v1/models/my_mnist_model:predict"
response = requests.post(server_url, data=request_json)
response.raise_for_status()
response = response.json()

In [None]:
# make a prediction
import numpy as np
y_proba = np.array(response['predictions'])
y_proba.round(2)

In [None]:
# query to TF serving using gRPC API
# Make a request.
# Make a PredictRequest protocol buffer and fill in fields.
from tensorflow_serving.apis.predict_pb2 import PredictRequest

request = PredictRequest()
request.model_spec.name = model_name
request.model_sepc.signature_name = 'serving_default'
input_name = model.input_names[0]
request.inputs[input_name].CopyFrom(tf.make_tensor_proto(X_new))

In [None]:
import grpc
from tensorflow_serving.apis import prediction_service_pb2_grpc

# make a channel
channel = grpc.insecure_channel('localhost:8500')

# make a gRPC service for the channel
predict_service = prediction_service_pb2_grpc.PredictServiceStub(channel)

# send a request
response = predict_service.Predict(request, timeout=10.0)

In [None]:
# change protocol buffer to tensor
output_name = model.output_names[0]
outputs_proto = response.outputs[output_name]
y_proba = tf.make_ndarray(outputs_proto)

In [None]:
# make a new version of model
np.random.seed(42)
tf.random.set_seed(42)
model = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=[28, 28], dtype=tf.uint8),
    tf.keras.layers.Rescaling(scale=1 / 255),
    tf.keras.layers.Dense(50, activation="relu"),
    tf.keras.layers.Dense(50, activation="relu"),
    tf.keras.layers.Dense(10, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy",
              optimizer=tf.keras.optimizers.SGD(learning_rate=1e-2),
              metrics=["accuracy"])
history = model.fit(X_train, y_train, epochs=10,
                    validation_data=(X_valid, y_valid))

In [None]:
# save a new version of model
model_version = "0002"
model_path = Path(model_name) / model_version
model.save(model_path, save_format="tf")

### Vertex AI

In [None]:
# Authorization
from google.colab import auth

auth.authenticate_user()

In [None]:
# Make GCS bucket to save SavedModel.
from google.cloud import storage

project_id = 'my_project'
bucket_name = 'my_bucket'
location = 'us-central1'

storage_client = storage.Client(project=project_id)
bucket = storage_client.create_bucket(bucket_name, location=location)

In [None]:
# a function to upload directory to a new bucket.
def upload_directory(bucket, dirpath):
    dirpath = Path(dirpath)
    for filepath in dirpath.glob("**/*"):
        if filepath.is_file():
            blob = bucket.blob(filepath.relative_to(dirpath.parent).as_posix())
            blob.upload_from_filename(filepath)
    upload_directory(bucket, "my_mnist_model")

In [None]:
# multithreading
!gsutil -m cp -r my_mnist_model gs://{bucket_name}/

In [None]:
# Inform Vertex AI about the model.
from google.cloud import aiplatform

server_image = 'gcr.io/cloud-aiplatform/prediction/tf2-gpu.2-8:latest'

aiplatform.init(project=project_id, location=location)
mnist_model = aiplatform.Model.upload(
    display_name='mnist',
    artifact_uri=f'gs://{bucket_name}/my_mnist_model/0001',
    serving_container_image_uri=server_image,
)

In [None]:
# Make endpoint
endpoint = aiplatform.Endpoint.create(display_name='mnist-endpoint')

endpoint.deploy(
    mnist_model,
    min_replica_count=1,
    max_replica_count-5,
    machine_type='n1-standard-4',
    accelerator_type='NVIDIA_TESLA_K80',
    accelerator_count=1
)

In [None]:
# make a prediction
response = endpoint.predict(instances=X_new.tolist())

In [None]:
import numpy as np
np.round(response.predictions, 2)

In [None]:
# remove endpoint
endpoint.undeploy_all()
endpoint.delete()

### Batch prediction on Vertex AI

In [None]:
# prepare batch and upload to GCS
# make JSON Lines file
batch_path = Path('my_mnist_batch')
batch_path.mkdir(exist_ok=True)
with open(batch_path / 'my_mnist_batch.jsonl', 'w') as jsonl_file:
    for image in X_test[:100].tolist():
        jsonl_file.write(json.dumps(image))
        jsonl.file.write('\n')

upload_directory(bucket, batch_path)

In [None]:
# set directory path
batch_prediction_job = mnist_model.batch_predict(
    job_display_name="my_batch_prediction_job",
    machine_type="n1-standard-4",
    starting_replica_count=1,
    max_replica_count=5,
    accelerator_type="NVIDIA_TESLA_K80",
    accelerator_count=1,
    gcs_source=[f"gs://{bucket_name}/{batch_path.name}/my_mnist_batch.jsonl"],
    gcs_destination_prefix=f"gs://{bucket_name}/my_mnist_predictions/",
    sync=True
)

In [None]:
# make predictions
y_probas = []
for blob in batch_prediction_job.iter_outputs():
    if 'prediction.results' in blob.name:
        for line in blob.download_as_text().splitlines():
            y_proba = json.loads(line)['prediction']
            y_probas.append(y_proba)

In [None]:
# accuracy
y_pred = np.argmax(y_probas, axis=1)
accuracy = np.sum(y_pred == y_test[:100]) / 100

In [None]:
# delete model, bucket and batch prediction job
for prefix in ['my_mnist_model/', 'my_mnist_batch/', 'my_mnist_predictions/']:
    blobs = bucket.list_blobs(prefix=prefix)
    for blob in blobs:
        blob.delete()

bucket.delete()
batch_prediction_job.delete()