In [None]:
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# AI Platform (Unified) SDK: Custom image classification model for batch prediction

<table align="left">
  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/ai-platform-samples/blob/master/notebooks/templates/ai_platform_notebooks_template.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/ai-platform-samples/blob/master/notebooks/templates/ai_platform_notebooks_template.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
</table>

# Overview


This tutorial demonstrates how to use the AI Platform (Unified) Python SDK to train and deploy a custom model for batch prediction.

### Dataset

The dataset used for this tutorial is the [cifar10 dataset](https://www.tensorflow.org/datasets/catalog/cifar10) from [TensorFlow Datasets](https://www.tensorflow.org/datasets/catalog/overview). The version of the dataset you will use in this tutorial is stored in a public Google Storage bucket.

### Objective

In this notebook, you will learn how to create a custom model from a Python script in a docker container using the AI Platform (Unified) SDK, and then do a batch prediction on the deployed model. You can alternatively create custom models from the command line using `gcloud` or online using Google Cloud Console.

The steps performed include: 

- Create a AI Platform (Unified) custom job for training a model.
- Train the model.
- View the model evaluation.
- Deploy the model to a serving endpoint.
- Make a batch prediction(s).
- Undeploy the model.

### Costs 

This tutorial uses billable components of Google Cloud Platform (GCP):

* Cloud AI Platform
* Cloud Storage

Learn about [Cloud AI Platform
pricing](https://cloud.google.com/ml-engine/docs/pricing) and [Cloud Storage
pricing](https://cloud.google.com/storage/pricing), and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

## Installation

We will need to install the latest (alpha) version of AI Platform (Unified) SDK from a tar file we have in a Google Storage bucket.

In [None]:
!pip3 install https://storage.googleapis.com/google-cloud-aiplatform/libraries/python/0.1.0/google-cloud-aiplatform-0.1.0.tar.gz

We need to install cloudstorage as well.

In [None]:
! pip3 install google-cloud-storage

### Restart the Kernel

Once you've installed the AI Platform (Unified) SDK, you need to restart the notebook kernel so it can find the packages.

In [None]:
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

## Before you begin

### GPU run-time

**Make sure you're running this notebook in a GPU runtime if you have that option. In Colab, select Runtime --> Change runtime type**

### Follow before you begin in Guide

{ add link to any online before you begin tutorial on the product }

### Set up your GCP project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a GCP project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

2. [Make sure that billing is enabled for your project.](https://cloud.google.com/billing/docs/how-to/modify-project)

3. [Enable the AI Platform APIs and Compute Engine APIs.](https://console.cloud.google.com/flows/enableapi?apiid=ml.googleapis.com,compute_component)

4. [Google Cloud SDK](https://cloud.google.com/sdk) is already installed in AI Platform Notebooks.

5. Enter your project ID in the cell below. Then run the  cell to make sure the
Cloud SDK uses the right project for all the commands in this notebook.

**Note**: Jupyter runs lines prefixed with `!` as shell commands, and it interpolates Python variables prefixed with `$` into these commands.

#### Project ID

**If you don't know your project ID**, you might be able to get your project ID using `gcloud` command, by executing the second cell below.

In [None]:
PROJECT_ID = "[your-project-id]" #@param {type:"string"}

In [None]:
if PROJECT_ID == "" or PROJECT_ID is None or PROJECT_ID == "[your-project-id]":
    # Get your GCP project id from gcloud
    shell_output = ! gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID:", PROJECT_ID)

In [None]:
! gcloud config set project $PROJECT_ID

#### Region

You can also change the `REGION` variable, which is used for operations
throughout the rest of this notebook. Make sure to [choose a region where Cloud
AI Platform services are
available](https://cloud.google.com/ml-engine/docs/tensorflow/regions). You can
not use a Multi-Regional Storage bucket for training with AI Platform.

In [None]:
REGION = 'us-central1' #@param {type: "string"}

#### Timestamp

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a timestamp for each instance session, and append onto the name of resources which will be created in this tutorial.

In [None]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

### Authenticate your GCP account

**If you are using AI Platform Notebooks**, your environment is already
authenticated. Skip this step.

In [None]:
import os
import sys

# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your Google Cloud account. This provides access
# to your Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

# If on AI Platform, then don't execute this code
if not os.path.exists('/opt/deeplearning/metadata/env_version'):
    if 'google.colab' in sys.modules:
        from google.colab import auth as google_auth
        google_auth.authenticate_user()

    # If you are running this tutorial in a notebook locally, replace the string
    # below with the path to your service account key and run this cell to
    # authenticate your Google Cloud account.
    else:
        %env GOOGLE_APPLICATION_CREDENTIALS your_path_to_credentials.json

### Create a Cloud Storage bucket

**The following steps are required, regardless of your notebook environment.**

When you submit a custom training job using the Cloud SDK, you upload a Python package
containing your training code to a Cloud Storage bucket. AI Platform runs
the code from this package. In this tutorial, AI Platform also saves the
trained model that results from your job in the same bucket. You can then
create an AI Platform model version based on this output in order to serve
online predictions.

Set the name of your Cloud Storage bucket below. It must be unique across all
Cloud Storage buckets. 

In [None]:
BUCKET_NAME = "[your-bucket-name]" #@param {type:"string"}

In [None]:
if BUCKET_NAME == "" or BUCKET_NAME is None or BUCKET_NAME == "[your-bucket-name]":
    BUCKET_NAME = PROJECT_ID + "ucaip-custom-" + TIMESTAMP

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l $REGION gs://$BUCKET_NAME

Finally, validate access to your Cloud Storage bucket by examining its contents:

In [None]:
! gsutil ls -al gs://$BUCKET_NAME

### Copy Python training package for CIFAR10

Next, copy the pre-made training package for training a CIFAR-10 model using a custom model and training. We will explain more about the package further down in the tutorial.

In [None]:
! gsutil cp -r gs://cloud-samples-data/ai-platform/cifar_custom/tf2_trainer_cifar.tar.gz gs://$BUCKET_NAME/tf2_trainer_cifar.tar.gz

### Install Docker (Colab or Local)

If you are using AI Platform Notebooks, docker is already installed. Skip these steps.

By default, docker is not installed on colab. If you're running colab, then you need to do the following to install docker.

In [None]:
if 'google.colab' in sys.modules:
    ! sudo apt update
    ! sudo apt install apt-transport-https ca-certificates curl software-properties-common
    ! curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
    ! sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu bionic stable"
    ! sudo apt update
    ! sudo apt install docker-ce

Start the docker service.

In [None]:
if 'google.colab' in sys.modules:
    ! sudo service docker start

### Copy docker training image

For the last step, you install a pre-built docker container that is configured for AI Platform (Unified) custom training job into your local Google Storage bucket.

*You need to Enable “Container Registry API” for your project in Google Cloud Console.*

In [None]:
REPO_NAME = "ucaip-training-test"

! gcloud auth configure-docker
! docker pull gcr.io/ucaip-test/ucaip-training-test:latest
! docker tag \
  gcr.io/ucaip-test/ucaip-training-test:latest \
  gcr.io/$PROJECT_ID/$REPO_NAME:latest
! docker push gcr.io/$PROJECT_ID/$REPO_NAME:latest

### Import libraries and define constants

#### Import AI Plarform (Unified) SDK

Import the AI Platform (Unified) SDK into our python environment.

In [None]:
import os
import sys
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value, Struct

from google.cloud import aiplatform_v1alpha1 as aip

#### AI Platform (Unified) constants

Let's now setup some constants for AI Platform (Unified):

1. AI Platform (Unified) API service endpoint.
2. AI Platform (Unified) location root path for dataset and model resources.

In [None]:
# API Endpoint
API_ENDPOINT = "us-central1-aiplatform.googleapis.com"
API_PREDICT_ENDPOINT = "us-central1-prediction-aiplatform.googleapis.com"

# uCAIP location root path for your dataset and model resources
PARENT = "projects/" + PROJECT_ID + "/locations/" + REGION

# Tutorial

Now you are ready to start creating your own custom model and training for CIFAR10

## Clients

The AI Platform (Unified) SDK works as a client/server model. On your side, the Python script, you will create a client that sends requests and receives responses from the server -- AI Platform.

You will use several clients in this tutorial, so you will set them all up upfront.

- Job Service for managed models.
- Model Service for managed models.
- Pipeline Service for training.
- Prediction Service for serving. *Note*, prediction has a different service endpoint.

In [None]:
# client options same for all services
client_options = {"api_endpoint": API_ENDPOINT}
predict_client_options = {"api_endpoint": API_PREDICT_ENDPOINT}


def create_job_client():
    client = aip.JobServiceClient(
        client_options=client_options
    )
    return client


def create_model_client():
    client = aip.ModelServiceClient(
        client_options=client_options
    )
    return client


def create_pipeline_client():
    client = aip.PipelineServiceClient(
        client_options=client_options
    )
    return client


def create_endpoint_client():
    client = aip.EndpointServiceClient(
        client_options=client_options
    )
    return client


def create_prediction_client():
    client = aip.PredictionServiceClient(
        client_options=predict_client_options
    )
    return client


clients = {}
clients['job'] = create_job_client()
clients['model'] = create_model_client()
clients['pipeline'] = create_pipeline_client()
clients['prediction'] = create_prediction_client()

for client in clients.items():
    print(client)

## Prepare your custom job specification

Now that your clients are ready, your first step is to create a Job Specification for your custom training job.

You are going to start with training what we can call an **empty job**. That is, you will create a job specification that provisions resources for training a job, and initiate the job using the client job service -- but the job itself will be empty. 

You do this so you can first focus on understanding the basic steps. Afterwards, you repeat again with a focus on adding the python training package for training a CIFAR10 custom model.

### Define the container specification

Let's first start by defining a job name and then a container specification:

- `JOB_NAME`: A unique name for your custom training job. For convenience, we appended the name with the current datetime to make the name unique.
- `MODEL_DIR`: A location in your Google Storage bucket for storing the model artificats.
- `image_uri`: The location of the container image in your local Google Storage bucket
- `--model-dir`: A command line parameter to the container indicating the location to store the model.


In [None]:
JOB_NAME = "custom_job_" + TIMESTAMP
MODEL_DIR = 'gs://{}/{}'.format(BUCKET_NAME, JOB_NAME) 
CONTAINER_SPEC = {
    "image_uri": "gcr.io/{}/{}:latest".format(PROJECT_ID, REPO_NAME),
    "args": [
        "--model-dir=" + MODEL_DIR
    ],
}

### Define the worker pool specification


Next, define the worker pool specification for your custom training job. This tells AI Platform what type and how many instances of machines to provision for the training.

For this tutorial, you will use a single instance (node). 

- `replica_count`: The number of instances to provision of this machine type.
- `machine_type`: The type of GCP instance to provision -- e.g., n1-standard-8.
- `accelerator_type`: The type, if any, of hardware accelerator. In this tutorial, we are using a NVIDIA Tesla K80.
- `accelerator_count`: The number of accelerators.
- `container_spec`: The docker container to install on the instance(s).

In [None]:
WORKER_POOL_SPEC = [
    {
        "replica_count": 1,
        "machine_spec": {
            "machine_type": "n1-standard-8",
            "accelerator_type": aip.AcceleratorType.NVIDIA_TESLA_K80,
            "accelerator_count": 1
        },
        "container_spec": CONTAINER_SPEC,
    }
]

If you were doing distributed training, you would add a second machine description and set the replica count accordingly. In the example below, the first machine descrption is the primary (coordinator), and the second ones are the machines the training is distributed to.

```
WORKER_POOL_SPEC=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8"
        },
        "container_spec":  CONTAINER_SPEC,
      },
      {
        "replica_count": 6,
        "machine_spec": {
          "machine_type": "n1-standard-8"
        },
        "container_spec": CONTAINER_SPEC
      }
]
```

### Assemble the job specification

Let's now assemble the description for the custom job specification.

In [None]:
CUSTOM_JOB = {
    "display_name": JOB_NAME,
    "job_spec": {
        "worker_pool_specs": WORKER_POOL_SPEC
    }
}

## Train the model

Let's now start the training of your custom training job on AI Platform. Use this helper function `create_custom_job`, which takes the parameter:

-`custom_job`: The specification for the custom job.

The helper function uses the job client service and calls the method `create_custom_job`, with the parameters:

-`parent`: The AI Platform (Unified) location path to dataset, model and endpoint resources.
-`custom_job`: The specification for the custom job.

We display a handful of the fields returned in `response` object, with the two that are of most interest are:

`response.name`: The AI Platform (Unified) fully qualified identifier assigned to this custom job. This identifier is saved for using in subsequent steps.
`response.state`: The current state of the custom job. This value is returned as an integer. For your convenience, it is mapped into a human readable name (e.g., PENDING) using the dictionary `state_name`.

In [None]:
state_name = {
    1: "QUEUED",
    2: "PENDING",
    3: "RUNNING",
    4: "SUCCEEDED",
    5: "FAILED",
    6: "CANCELLING",
    7: "CANCELLED",
    8: "PAUSED"
}


def create_custom_job(custom_job):
    response = clients['job'].create_custom_job(parent=PARENT, custom_job=CUSTOM_JOB)
    print("name:", response.name)
    print("display_name:", response.display_name)
    print("state:", state_name[response.state])
    print("create_time:", response.create_time)
    print("update_time:", response.update_time)
    return response.name


# Save the job name
JOB_NAME = create_custom_job(CUSTOM_JOB)

### List all custom jobs

Now that your custom job is running, let's get a list for all your custom jobs -- specific to your `PROJECT_ID`. This will probably be just one job, unless you've been running this tutorial multiple times or otherwise been using the AI Platform (Unified) job service.

Use the helper function `list_custom_jobs`, which uses the job client service and calls the method `list_custom_jobs`. The response object is a list, where each element in the list is a separate job.  

The `response` object for each custom job contains:

- `name`: The AI Platform (Unified) fully qualified identifier for your custom training job.
- `display_name`: The human readable name you assigned to your custom training job.
- `job_spec`: The job specification you provided for your custom training job.
- `state`: The current status of the custom job:
- `start_time`: When the custom training job was created.
- `end_time`: When the execution of the custom job ended.
- `update_time`: When the last time there was a status update to the custom job.

In [None]:
def list_custom_jobs():
    response = clients['job'].list_custom_jobs(parent=PARENT)
    for job in response:
        print(response)
    

list_custom_jobs()

### Get information on a custom Job

Next, use this helper function `get_custom_job`, which takes the parameter:

- `name`: The AI Platform (Unified) fully qualified identifier for the custom job.

The helper function uses the job client service to get the job information for just this job by calling the method `get_custom_job`, with the parameter:

- `name`: The AI Platform (Unified) fully qualified identifier for the custom job.

If you recall, you got the AI Platform (Unified) fully qualified identifier for the custom job in the `response.name` field when you called the `create_custom_job` method, and saved the identifier in the variable `JOB_NAME`.

In [None]:
def get_custom_job(name):
    response = clients['job'].get_custom_job(name=name)
    print("name:", response.name)
    print("display_name:", response.display_name)
    print("state:", state_name[response.state])
    print("create_time:", response.create_time)
    print("update_time:", response.update_time)


get_custom_job(JOB_NAME)

### View logs

In addition to the aforementioned state information on the job information, you could also look at logs associated with the custom job.

#### View logs on AI Platform Console

Let's first look at logs generated for AI Platform Console. The cell below will display a link. Paste the link in the address bar of another tab in your browser. It will display information about your job.

In [None]:
print('https://pantheon.corp.google.com/ai/platform/locations/{region}/training/{job_id}?project={project_id}'.format(
   region=REGION, job_id=JOB_NAME.split('/')[-1], 
   project_id=PROJECT_ID))

#### View logs on Stackdriver

Next, let's look at logs generated for StackDriver. The cell below will display a link. Paste the link in the address bar of another tab in your browser. It will display logs for your job.

In [None]:
print('https://pantheon.corp.google.com/logs/viewer?resource=ml_job%2Fjob_id%2F{job_id}&project={project_id}'.format(
   job_id=JOB_NAME.split('/')[-1], project_id=PROJECT_ID))

### Cancel a custom job

Next, we will show you how to cancel a custom training job. We will go ahead and cancel your "empty" training job. Use this helper function `cancel_job`, with the parameter:

- `name`: The AI Platform (Unified) fully qualified identifier for your custom training job.

The helper function will use the job service client and call the method `cancel_custom_job`, with the parameter:

- `name`: The AI Platform (Unified) fully qualified identifier for your custom training job.

We put a try/except around the call since it will throw an exception if the job is already completed (succeeded) -- which most likely it is.

In [None]:
def cancel_job(name):
    try:
        response = clients['job'].cancel_custom_job(name=name)
        print(response)
    except Exception as e:
        print(e)
        
cancel_job(JOB_NAME)

### Delete a custom job

Next, we will show you how to delete a custom training job. We will go ahead and delete your "empty" training job. Use the helper function `delete_job`, with the parameter:

- `name`: The AI Platform (Unified) fully qualified identifier for your custom training job.

The helper function will use the job service client and call the method `delete_custom_job`, with the parameter:

- `name`: The AI Platform (Unified) fully qualified identifier for your custom training job.

Afterwards, we will verify that the job has been deleted by calling the method `get_custom_job` for the same job. We put a try/except around the call since it will throw an exception if the job is already deleted -- which most likely it is.

In [None]:
def delete_job(name):
    try:
        response = clients['job'].delete_custom_job(name=name)
        print("Delete", response)
    except Exception as e:
        print(e)

    try:
        response = clients['job'].get_custom_job(name=name)
    except Exception as e:
        print(e)


delete_job(JOB_NAME)

## Train a model - CIFAR10

Now that you have seen the basic steps for custom training, you will do a new custom job to train a model. For expediency, you will train a CIFAR10 model. You will need to update your worker pool specification by adding a description for `python_package_spec`. This section will tell the custom job the Python training package to install and which Python module to invoke, along with command line arguments for the Python module.

Let's dive deeper now into the Python package specification:

-`executor_image_spec`: This is the docker image which is configured for your custom training job. You will continue to use the same one we used earlier for demonstration.

-`package_uris`: This is a list of the locations (URIs) of your python training packages to install on the provisioned instance. The locations need to be in a Google Storage bucket. These can be either individual python files or a zip (archive) of an entire package. In the later case, the job service will unzip (unarchive) the contents into the docker image.

-`python_module`: The python module (script) to invoke for running the custom training job. In this example, you will be invoking `trainer.task.py` -- note that it was not neccessary to append the `.py` suffix.

-`args`: The command line arguments to pass to the corresponding pythom module. In this example, you will be passing the Google Storage location where to store the model artifacts -- `"--model-dir=" + MODEL_DIR`.

In [None]:
WORKER_POOL_SPEC = [
     {
        "replica_count": 1,
        "machine_spec": {
            "machine_type": "n1-standard-8"
        },
        "python_package_spec": {
            "executor_image_uri": 
                "gcr.io/cloud-aiplatform/training/training-tf-cpu.2-1:latest",
            "package_uris": 
                ["gs://" + BUCKET_NAME + "/tf2_trainer_cifar.tar.gz"],
            "python_module": "trainer.task",
            "args": [
                "--model-dir=" + MODEL_DIR 
            ],
        }
    }
]

### Assemble the job specification

Let's now assemble the description for the custom job specification.

In [None]:
CUSTOM_JOB = {
    "display_name": JOB_NAME,
    "job_spec": {
        "worker_pool_specs": WORKER_POOL_SPEC
    }
}

### Examine the training package

#### Package layout

Before we start the training, let's look at how a Python package is assembled for a custom training job. When unarchived, the package contains the following directory/file layout.

- PKG-INFO
- README.md
- setup.cfg
- setup.py
- trainer
  - \_\_init\_\_.py
  - task.py

The files `setup.cfg` and `setup.py` are the instructions for installing the package into the operating environment of the docker image.

The file `trainer/task.py` is the python script for executing the custom job. *Note*, when we referred to it in the worker pool specification, we replace the directory slash with a dot (`trainer.task`) and dropped the file suffix (`.py`).

#### Task.py Contents

Below is the contents of task.py. I won't go into detail, it's just there for you to browse. In summary:

- Loads CIFAR10 dataset from TF Datasets (tfds).
- Builds a simple ConvNet model using TF.Keras model API.
- Compiles the model (`compile()`).
- Sets a training distribution strategy for multi-workers using tf.distribute.
- Trains the model (`fit()`)
- Saves the trained model (`save(args.model_dir)`) to the specified model directory.

```python
# Multi-worker training in Keras

from __future__ import absolute_import, division, print_function, unicode_literals
import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow.python.client import device_lib
import argparse
import os
import sys
tfds.disable_progress_bar()

parser = argparse.ArgumentParser()
parser.add_argument('--job-dir')
parser.add_argument('--model-dir', dest='model_dir',
                    default='/tmp/saved_model', type=str, help='Model dir.')
parser.add_argument('--lr', dest='lr',
                    default=0.001, type=float,
                    help='Learning rate.')
args = parser.parse_args()

print('Python Version = {}'.format(sys.version))
print('TensorFlow Version = {}'.format(tf.__version__))
print('TF_CONFIG = {}'.format(os.environ.get('TF_CONFIG', 'Not found')))
print(device_lib.list_local_devices())

# Preparing dataset
BUFFER_SIZE = 10000
BATCH_SIZE = 64

# Multi-worker configuration
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
print('num_replicas_in_sync = {}'.format(strategy.num_replicas_in_sync))


def make_datasets_unbatched():
  # Scaling CIFAR10 data from (0, 255] to (0., 1.]
  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255.0
    return image, label

  datasets, info = tfds.load(name='cifar10',
                            with_info=True,
                            as_supervised=True)
  return datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE)

# Build the Keras model
def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(32, 32, 3)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(10, activation='softmax')
  ])
  model.compile(
      loss=tf.keras.losses.sparse_categorical_crossentropy,
      optimizer=tf.keras.optimizers.SGD(learning_rate=args.lr),
      metrics=['accuracy'])
  return model

# Train the model
NUM_WORKERS = strategy.num_replicas_in_sync
# Here the batch size scales up by number of workers since
# `tf.data.Dataset.batch` expects the global batch size. Previously we used 64,
# and now this becomes 128.
GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS
train_datasets = make_datasets_unbatched().batch(GLOBAL_BATCH_SIZE)
with strategy.scope():
  # Creation of dataset, and model building/compiling need to be within
  # `strategy.scope()`.
  multi_worker_model = build_and_compile_cnn_model()

multi_worker_model.fit(x=train_datasets, epochs=3, steps_per_epoch=5)
multi_worker_model.save(args.model_dir)
```

### Train the model

Let's now start the training of your custom training job on AI Platform. Use the helper function defined earlier `create_custom_job`, which will return the AI Platform (Unified) fully qualified identifier assigned to the custom job.

In [None]:
# Save the job name
JOB_NAME = create_custom_job(CUSTOM_JOB)

### Get information on the custom job

Let's get the status on your custom training job for CIFAR10 using the helper function we defined earlier `get_custom_job`. The job most likely will still be either PENDING or RUNNING.


In [None]:
get_custom_job(JOB_NAME)

# Deployment

## Pre-Cooked

Training the above model may take upwards of ~5 minutes time. For expendiency, we have a pre-cooked (already trained) version of this model you can use for the next steps, while you wait for your model to finish training. 

Once your model is done training, you can repeat these steps for your trained model. You can calcuate the actual time it took to train the model by subtracting `end_time` from `start_time`. For your model, we will need to know the location of the saved model, which the python script saved in your local Google Storage bucket at `MODEL_DIR + '/saved_model.pb'`.


You can choose between the precooked model or your trained model with the python variable `precooked` in the cell below.

In [None]:
# Precooked flag
precook = False

if precook:
    model_path_to_deploy = "[not-implemented-yet]"
else:
    model_path_to_deploy = MODEL_DIR

print("model_to_deploy:", model_path_to_deploy)

## Load the saved model

Your model is stored in a TF SavedModel format in a Google Storage bucket. Let's go ahead and load it from the Google Storage bucket, and then you can do some things, like evaluate the model, and do a prediction.

To load, we will use the TF.Keras `model.load_model()` method passing it the Google Storage path where the model is saved -- specified by `MODEL_DIR`.

In [None]:
import tensorflow as tf

model = tf.keras.models.load_model(MODEL_DIR)

## Evaluate the model

Now let's find out how good the model is. 

### Load evaluation data

You will load the CIFAR10 test (holdout) data from `tf.keras.datasets`, using the method `load_data()`. This will return the dataset as a tuple of two elements. The first element is the training data and the second is the test data. Each element is also a tuple of two elements: the image data, and the corresponding labels.

You don't need the training data, and hence why we loaded it as `(_, _)`.

Before you can run the data through evaluation, you need to preprocess it:

x_test:
1. Normalize (rescaling) the pixel data by dividing each pixel by 255. This will replace each single byte integer pixel with a 32-bit floating point number between 0 and 1.

y_test:<br/>
2. The labels are currently scalar (sparse). If you look back at the `compile()` step in the `trainer/task.py` script, you will find that it was compiled for sparse labels. So we don't need to do anything more.

In [None]:
from tensorflow.keras.datasets import cifar10
import numpy as np

(_, _), (x_test, y_test) = cifar10.load_data()
x_test = (x_test / 255.0).astype(np.float32)

print(x_test.shape, y_test.shape)

### Evaluate the model

Let's evaluate how well the ConvNet model in the custom job did. Wahaha -- ~12%, not so good. Well, what does one expect with just 3 epochs and 5 steps per epoch -- see the `task.py fit() call`.

In [None]:
model.evaluate(x_test, y_test)

### Serving function for image data

To pass images to the prediction service, you encode the bytes into base 64 -- which makes the content safe from modification while transmitting binary data over the network. Since this deployed model expects input data as raw bytes, you need to ensure that the base 64 encoded data gets converted back to raw bytes before it is passed as input to the deployed model.

To resolve this, define a serving function (`serving_fn`) and attach it to the model as a preprocessing step. Add a `@tf.function` decorator so the serving function is part of the model's graph (instead of upstream on a CPU).

When you send a prediction or explanation request, the content of the request is base 64 decoded into a Tensorflow string, which is passed to the serving function (`serving_fn`). The serving function preprocesses the tf.string into raw numpy bytes (`preprocess_fn`) to match the input requirements of the model:
- `io.decode_jpeg`- Decompresses the JPG image which is returned as a Tensorflow vector with three channels (RGB).
- `image.convert_image_dtype` - Changes integer pixel values to float 32 and normalizes the values between the range 0 and 1.
- `image.resize` - Resizes the image to match the input shape for the model.

At this point, the data can be passed to the model (`m_call`).

In [None]:
def _preprocess(bytes_input):
    decoded = tf.io.decode_jpeg(bytes_input, channels=3)
    decoded = tf.image.convert_image_dtype(decoded, tf.float32)
    resized = tf.image.resize(decoded, size=(32, 32))
    return resized


@tf.function(input_signature=[tf.TensorSpec([None], tf.string)])
def preprocess_fn(bytes_inputs):
    decoded_images = tf.map_fn(_preprocess, bytes_inputs, dtype=tf.float32, back_prop=False)
    return {"numpy_inputs": decoded_images}  # User needs to make sure the key matches model's input


m_call = tf.function(model.call).get_concrete_function([tf.TensorSpec(shape=[None, 32, 32, 3], dtype=tf.float32, name="numpy_inputs")])


@tf.function(input_signature=[tf.TensorSpec([None], tf.string)])
def serving_fn(bytes_inputs):
    images = preprocess_fn(bytes_inputs)
    prob = m_call(**images)
    return prob


tf.saved_model.save(model, model_path_to_deploy, signatures={
    'serving_default': serving_fn,
})

## Get the serving function signature

You can get the signatures of your model's input and output layers by reloading the model into memory, and querying it for the signatures corresponding to each layer.

For our purpose, you need the signature of the serving function. Why? Well, when we send our data for prediction as a HTTP request packet, the image data is base 64 encoded, and our TF.Keras model takes numpy input. Your serving function will do the conversion from base 64 to a numpy array. 

When making a prediction request, you need to route the request to the serving function instead of the model, so you need to know the input layer name of the serving function -- which you will use later when you make a prediction request.

In [None]:
loaded = tf.saved_model.load(model_path_to_deploy)

input_name = list(loaded.signatures['serving_default'].structured_input_signature[1].keys())[0]
print('Serving function input:', input_name)

## Deploy the model for batch prediction

Let's now deploy the trained AI Platform (Unified) model you created with Custom Job for batch prediction. This differs from deploying a model for on-demand prediction.

For on-demand prediction, you:

1. Create an endpoint for deploying the model to.

2. Deploy the model to the endpoint.

3. Make on-demand (live) prediction requests to the endpoint.

For batch-prediction, you:

1. Create a batch prediction job.

2. The job service will provision resources for the batch prediction request.

3. The results of the batch prediction request are returned to the caller.

4. The job service will unprovision the resoures for the batch prediction request.

### Upload the model

First, use this helper function `upload_model` to upload our model, stored in SavedModel format, up to the model service, which will instantiate a AI Platform (Unified) model instance for our model. Once you've done that, you can use the model in the same way as any other AI Platform (Unified) model instance, such as deploying to an endpoint for serving predictions.

The helper function takes the parameters:

- `display_name`: A human readable name for the endpoint.
- `image_uri`: The container image for the model deployment.
- `model_uri`: The Google Storage path to our SavedModel artificat. For this tutorial, this is the Google Storage location where the `trainer/task.py` saved the model, which we specified in the variable `MODEL_DIR`.

The helper function uses the model client service and calls the method `upload_model`, which takes the parameters:

- `parent`: The AI Platform (Unified) location root path for dataset, model and endpoint resources. 
- `model`: The specification for the AI Platform (Unified) model instance.

Let's now dive deeper into the AI Platform (Unified) model specification `model`. This is a dictionary object that consists of the following fields:

- `display_name`: A human readable name for the model.
- `metadata_schema_uri`: Since our model was built without a AI Platform (Unified) managed dataset, we will leave this blank (`''`).
- `artificat_uri`: The Google Storage path where the model is stored in SavedModel format. 
- `container_spec`: This is the specification for the docker container that will be installed on the endpoint, from which the model will serve predictions.

Uploading a model into a AI Platform (Unified) model resource returns a long running operation, since it may take a few moments. We call `response.result()`, which is a synchronous call and will return when the AI Platform (Unified) model resource is ready. 

The helper function returns the AI Platform (Unified) fully qualified identifier for the corresponding AI Platform (Unified) model instance `upload_model_response.model`. You will save the identifier for subsequent steps in the variable `model_to_upload_name`.


In [None]:
GPU = False
if GPU:
    IMAGE_URI = "gcr.io/cloud-aiplatform/prediction/tf-gpu.1-15:latest"
else:
    IMAGE_URI = "gcr.io/cloud-aiplatform/prediction/tf-cpu.1-15:latest"


def upload_model(display_name, image_uri, model_uri):
    model = {
        "display_name": display_name,
        "metadata_schema_uri": "",
        "artifact_uri": model_uri,
        "container_spec": {
            "image_uri": image_uri
        },
    }
    response = clients['model'].upload_model(parent=PARENT, model=model)
    print("Long running operation:", response.operation.name)
    upload_model_response = response.result(timeout=180)
    print("upload_model_response")
    print(" model:", upload_model_response.model)
    return upload_model_response.model


model_to_deploy_name = upload_model("cifar10-" + TIMESTAMP, IMAGE_URI, model_path_to_deploy)

## Model deployment for batch prediction

Let's now deploy the trained AI Platform (Unified) model you created with AutoML for batch prediction. This differs from deploying a model for on-demand prediction.

For on-demand prediction, you:

1. Create an endpoint for deploying the model to.

2. Deploy the model to the endpoint.

3. Make on-demand (live) prediction requests to the endpoint.

For batch-prediction, you:

1. Create a batch prediction job.

2. The job service will provision resources for the batch prediction request.

3. The results of the batch prediction request are returned to the caller.

4. The job service will unprovision the resoures for the batch prediction request.

### Get test item(s)

Let's now do a batch prediction to your AI Platform (Unified) model. You will use a couple of arbitrary images out of the dataset as a test image. Don't be concerned that the image was likely used in training the model -- we just want to demonstrate how to make a prediction.

In [None]:
test_image_1 = x_test[0]
test_label_1 = y_test[0]
test_image_2 = x_test[1]
test_label_2 = y_test[1]

The serving function for your custom model takes compressed JPG images as input. For this purpose, you will save the uncompressed raw bytes of the CIFAR-10 test images to disk as compressed JPG images, where you:

- Denormalize the image data from \[0,1) range back to [0,255).
- Convert the 32-bit floating point values to 8-bit unsigned integers.

In [None]:
import cv2
cv2.imwrite('tmp1.jpg', (test_image_1 * 255).astype(np.uint8))
cv2.imwrite('tmp2.jpg', (test_image_2 * 255).astype(np.uint8))

For the batch prediction, you will copy the test items over to your Cloud Storage bucket.

In [None]:
! gsutil cp tmp1.jpg gs://$BUCKET_NAME/tmp1.jpg
! gsutil cp tmp2.jpg gs://$BUCKET_NAME/tmp2.jpg
    
test_item_1 = "gs://" + BUCKET_NAME + "/" + "tmp1.jpg"
test_item_2 = "gs://" + BUCKET_NAME + "/" + "tmp2.jpg"

### Prepare the request content

You are going to send the CIFAR10 image as compressed JPG image, instead of the raw uncompressed bytes:

- `tf.io.read_file`: Read the compressed JPG images back into memory as raw bytes.
- `base64.b64encode`: Encode the raw bytes into a base 64 encoded string.

In [None]:
import base64

bytes = tf.io.read_file(test_item_1)
b64str = base64.b64encode(bytes.numpy()).decode('utf-8')

You then write the prediction request as a JSONL file to a Cloud Storage bucket. To pass the image data to the prediction service, in the previous step you encoded the bytes into base 64 -- which makes the content safe from modification when transmitting binary data over the network. You need to tell the serving binary where your model is deployed to, that the content has been base 64 encoded, so it will decode it on the other end in the serving binary. 

Each instance in the prediction request is a dictionary entry of the form:

                        { input_name : { 'b64': content }
                        
- `input_name`: the name of the input layer of the underlying model.
- `'b64'`: A key that indicates the content is base 64 encoded.
- `content`: The compressed JPG image bytes as a base 64 encoded string.

In [None]:
with tf.io.gfile.GFile(gcs_input_uri, 'w') as f:
    data = { input_name: { 'b64': b64str } }
    f.write(json.dumps(data) + '\n')

### Make batch prediction request

Now that your batch of two image test items is ready, let's do the batch request. Use this helper function `create_batch_prediction_job`, with the parameters:

- `display_name`: The human readable name for the prediction job.
- `model_name`: The AI Platform (Unified) fully qualified identifier for the model.
- `gcs_source_uri`: The Google Storage path to the JSONL/CSV input file -- which we created above.
- `gcs_destination_output_uri_prefix`: The Google Storage path that the service will write the predictions to.

The helper function uses the job client service and calls the method `create_batch_prediction_job`, with the parameters:

- `parent`: The AI Platform (Unified) location root path for dataset, model and pipeline resources.
- `batch_prediction_job`: The specification for the batch prediction job.

Let's now dive into the specification for the `batch_prediction_job`:

- `display_name`: The human readable name for the prediction batch job.
- `model`: The AI Platform (Unified) fully qualified identifier for the model.
- `model_parameters`: requirements/constrains on the prediction service.
 - `confidenceThreshold`: The minimum confidence threshold on doing a prediction.
 - `maxPredictions`: The maximum size of the batch request.
- `input_config`: The input source and format type for the instances to predict.
- `output_cconfig`: The output destination and format for the predictions.
- `dedicated_resources`: The compute resources to provision for the batch prediction job. 
  - `machine_spec`: The compute instance to provision. Use the variable `GPU=True` to use a GPU; otherwise only a CPU is allocated.
  - `starting_replica_count`: The number of compute instances to initially provision.
  - `max_replica_count`: The maximum number of compute instances to scale to. In this tutorial, only one instance is provisioned.

This call is an asychronous operation. You will print from the response object a few select fields, including:

- `name`: The AI Platform (Unified) fully qualified identifier assigned to the batch prediction job.
- `display_name`: The human readable name for the prediction batch job.
- `model`: The AI Platform (Unified) fully qualified identifier for the model.
- `generate_explanations`: Whether True/False explanations were provided with the predictions (explainability).
- `state`: The state of the prediction job (pending, running, etc).

Since this call will take a few moments to execute, you will likely get `JobState.JOB_STATE_PENDING` for `state`.

The helper function will return and save the AI Platform (Unified) fully qualified identifier assigned to the batch prediction job as `prediction_name`.

In [None]:
BATCH_MODEL = "cifar10_batch-" + TIMESTAMP


def create_batch_prediction_job(display_name, model_name, gcs_source_uri, gcs_destination_output_uri_prefix):

    model_parameters = {
        "confidenceThreshold": 0.5,
        "maxPredictions": 10000,
    }

    if GPU:
        machine_spec = {
            "machine_type": "n1-standard-2",
            "accelerator_type": aip.AcceleratorType.NVIDIA_TESLA_K80,
            "accelerator_count": 1,
        }
    else:
        machine_spec = {
            "machine_type": "n1-standard-2",
            "accelerator_count": 0,
        }

    batch_prediction_job = {
        "display_name": display_name,
        # Format: 'projects/{project}/locations/{location}/models/{model_id}'
        "model": model_name,
        "model_parameters": json_format.ParseDict(model_parameters, Value()),
        "input_config": {
            "instances_format": "jsonl",
            "gcs_source": {"uris": [gcs_source_uri]},
        },
        "output_config": {
            "predictions_format": "jsonl",
            "gcs_destination": {"output_uri_prefix": gcs_destination_output_uri_prefix},
        },
        "dedicated_resources": {
            "machine_spec": machine_spec,
            "starting_replica_count": 1,
            "max_replica_count": 1,
        },
    }
    response = clients['job'].create_batch_prediction_job(
        parent=PARENT, batch_prediction_job=batch_prediction_job
    )
    print("response")
    print(" name:", response.name)
    print(" display_name:", response.display_name)
    print(" model:", response.model) 
    print(" generate_explanation:", response.generate_explanation)
    print(" state:", response.state)
    print(" create_time:", response.create_time)
    print(" start_time:", response.start_time)
    print(" end_time:", response.end_time)
    print(" update_time:", response.update_time)
    print(" labels:", response.labels)
    return response


response = create_batch_prediction_job(BATCH_MODEL, model_to_deploy_name, gcs_input_uri, "gs://" + BUCKET_NAME )

prediction_name = response.name

### List all batch prediction jobs

Use this helper function `list_batch_prediction_jobs`. This helper function uses the job client service and calls the method `list_batch_prediction_jobs`, with the parameter:

- `parent`: The AI Platform (Unified) location root path to the dataset, model and pipeline resources.

The method will return a list, where each element is a single batch prediction job. You will probably only have one, unless you've already been using the service or been experimenting with this tutorial.

We will print a couple of additional fields:

- `error`: An error description if an error occurred.
- `output_uri_prefix`: The Google Storage location you gave for outputtng the predictions.

In [None]:
def list_batch_prediction_jobs():
    response = clients['job'].list_batch_prediction_jobs(parent=PARENT)
    for batch in response:
        print(" name:", batch.name)
        print(" display_name:", batch.display_name)
        print(" model:", batch.model) 
        print(" generate_explanation:", batch.generate_explanation)
        print(" state:", batch.state)
        print(" error:", batch.error)
        gcs_destination = batch.output_config.gcs_destination
        print(" gcs_destination")
        print("  output_uri_prefix:", gcs_destination.output_uri_prefix)


list_batch_prediction_jobs()

### Get information on a batch prediction job

Use this helper function `get_batch_prediction_job`, with the paramter:

- `job_name`: The AI Platform (Unified) fully qualified identifier for the batch prediction job.

The helper function uses the job client service and calls the method `get_batch_prediction_job`, with the paramter:

- `name`: The AI Platform (Unified) fully qualified identifier for the batch prediction job. In this tutorial, we will pass it the AI Platform (Unified) fully qualified identifier for your batch prediction job -- `prediction_name`

The helper function will return the Google Storage path to where the predictions are stored -- `gcs_destination`.

In [None]:
def get_batch_prediction_job(job_name):
    response = clients['job'].get_batch_prediction_job(name=job_name)
    print("response")
    print(" name:", response.name)
    print(" display_name:", response.display_name)
    print(" model:", response.model) 
    print(" generate_explanation:", response.generate_explanation)
    print(" state:", response.state)
    print(" error:", response.error)
    gcs_destination = response.output_config.gcs_destination
    print(" gcs_destination")
    print("  output_uri_prefix:", gcs_destination.output_uri_prefix)
    return gcs_destination.output_uri_prefix


predictions = get_batch_prediction_job(prediction_name)

### Get the predictions

When the batch prediction is done processing, the job state will be `JOB_STATE_SUCCEEDED`.

Finally you view the predictions stored at the Google Storage path you set as output. The predictions will be in a JSONL format, which you indicated at the time we made the batch prediction job, under a subfolder starting with the name `prediction`, and under that folder will be a file called `prediction-results-xxxx-of-xxxx`.

Let's display (cat) the contents. You will a row for each prediction -- in this case, there is just one row. The row is the softmax probability distribution for the corresponding CIFAR10 classes.

In [None]:
! gsutil ls $predictions/prediction-*/prediction.results*

! gsutil cat $predictions/prediction-*/prediction.results*

# Cleaning up

To clean up all GCP resources used in this project, you can [delete the GCP
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:

- Dataset
- Model
- Endpoint
- GCS Bucket

In [None]:
delete_dataset = True
delete_model = True
delete_batch = True
delete_bucket = True

# Delete the dataset using the uCAIP fully qualified identifier for the dataset
try:
    if delete_dataset:
        clients['dataset'].delete_dataset(name=dataset['name'])
except Exception as e: 
    print(e)

# Delete the model using the uCAIP fully qualified identifier for the model
try:
    if delete_model:
        clients['model'].delete_model(name=model_to_deploy_name)
except Exception as e: 
    print(e)

# Delete the batch prediction job using the AI Platform (Unified) fully qualified identifier for the batch job
try:
    if delete_batch:
        clients['job'].delete_batch_prediction_job(name=prediction_name)
except Exception as e: 
    print(e)
    
if delete_bucket and 'BUCKET_NAME' in globals():
    ! gsutil rb -f gs://$BUCKET_NAME

# END OF TUTORIAL

Thank you for taking the time to go thru the tutorial -- The team at Google Cloud AI Platform.