# 07 - Prediction Serving

The purpose of the notebook is to show how to use the deployed model for online and batch prediction.
The notebook covers the following tasks:
1. Test the endpoints for online prediction.
2. Use the uploaded custom model for batch prediction.
3. Run a the batch prediction pipeline using `Vertex Pipelines`.

## Setup

### Import libraries

In [1]:
import os
from datetime import datetime
import tensorflow as tf

from google.cloud import aiplatform as vertex_ai

2022-03-31 14:14:12.429868: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0


### Setup Google Cloud project

In [2]:
PROJECT = 'grandelli-demo-295810' # Change to your project id.
REGION = 'us-central1' # Change to your region.
BUCKET = 'grandelli-demo-295810-partner-training-2022' # Change to your bucket name.

if PROJECT == "" or PROJECT is None or PROJECT == "[your-project-id]":
    # Get your GCP project id from gcloud
    shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT = shell_output[0]
    
if BUCKET == "" or BUCKET is None or BUCKET == "[your-bucket-name]":
    # Get your bucket name to GCP project id
    BUCKET = PROJECT
    # Try to create the bucket if it doesn't exists
    ! gsutil mb -l $REGION gs://$BUCKET
    print("")
    
print("Project ID:", PROJECT)
print("Region:", REGION)
print("Bucket name:", BUCKET)

Project ID: grandelli-demo-295810
Region: us-central1
Bucket name: grandelli-demo-295810-partner-training-2022


### Set configurations

In [3]:
VERSION = 'v01'
DATASET_DISPLAY_NAME = 'chicago-taxi-tips'
MODEL_DISPLAY_NAME = f'{DATASET_DISPLAY_NAME}-classifier-{VERSION}'
ENDPOINT_DISPLAY_NAME = f'{DATASET_DISPLAY_NAME}-classifier'

SERVE_BQ_DATASET_NAME = 'partner_training' # Change to your serving BigQuery dataset name.
SERVE_BQ_TABLE_NAME = 'chicago_taxitrips_prep' # Change to your serving BigQuery table name.

## 1. Making Online Predicitons


In [4]:
vertex_ai.init(
    project=PROJECT,
    location=REGION,
    staging_bucket=BUCKET
)

endpoint_name = vertex_ai.Endpoint.list(
    filter=f'display_name={ENDPOINT_DISPLAY_NAME}', 
    order_by="update_time")[-1].gca_resource.name

endpoint = vertex_ai.Endpoint(endpoint_name)

In [5]:
test_instances = [  
    {
        "dropoff_grid": ["POINT(-87.6 41.9)"],
        "euclidean": [2064.2696],
        "loc_cross": [""],
        "payment_type": ["Credit Card"],
        "pickup_grid": ["POINT(-87.6 41.9)"],
        "trip_miles": [1.37],
        "trip_day": [12],
        "trip_hour": [16],
        "trip_month": [2],
        "trip_day_of_week": [4],
        "trip_seconds": [555]
    }
]

In [6]:
predictions = endpoint.predict(test_instances).predictions

for prediction in predictions:
    print(prediction)

{'scores': [0.341020465, 0.658979535], 'classes': ['tip<20%', 'tip>=20%']}


## 2. Batch Prediction

In [7]:
WORKSPACE = f"gs://{BUCKET}/{DATASET_DISPLAY_NAME}/"
SERVING_DATA_DIR = os.path.join(WORKSPACE, 'serving_data')
SERVING_INPUT_DATA_DIR = os.path.join(SERVING_DATA_DIR, 'input_data')
SERVING_OUTPUT_DATA_DIR = os.path.join(SERVING_DATA_DIR, 'output_predictions')

In [8]:
if tf.io.gfile.exists(SERVING_DATA_DIR):
    print("Removing previous serving data...")
    tf.io.gfile.rmtree(SERVING_DATA_DIR)
    
print("Creating serving data directory...")
tf.io.gfile.mkdir(SERVING_DATA_DIR)
print("Serving data directory is ready.")

Removing previous serving data...
Creating serving data directory...
Serving data directory is ready.


### Extract serving data to Cloud Storage as JSONL

In [9]:
from src.common import datasource_utils
from src.preprocessing import etl

INFO:apache_beam.typehints.native_type_compatibility:Using Any for unsupported type: typing.Sequence[~T]


In [10]:
LIMIT = 10000

sql_query = datasource_utils.get_serving_source_query(
    bq_dataset_name=SERVE_BQ_DATASET_NAME, 
    bq_table_name=SERVE_BQ_TABLE_NAME,
    limit=LIMIT
)

print(sql_query)


    SELECT 
        IF(trip_month IS NULL, -1, trip_month) trip_month,
        IF(trip_day IS NULL, -1, trip_day) trip_day,
        IF(trip_day_of_week IS NULL, -1, trip_day_of_week) trip_day_of_week,
        IF(trip_hour IS NULL, -1, trip_hour) trip_hour,
        IF(trip_seconds IS NULL, -1, trip_seconds) trip_seconds,
        IF(trip_miles IS NULL, -1, trip_miles) trip_miles,
        IF(payment_type IS NULL, 'NA', payment_type) payment_type,
        IF(pickup_grid IS NULL, 'NA', pickup_grid) pickup_grid,
        IF(dropoff_grid IS NULL, 'NA', dropoff_grid) dropoff_grid,
        IF(euclidean IS NULL, -1, euclidean) euclidean,
        IF(loc_cross IS NULL, 'NA', loc_cross) loc_cross
    FROM partner_training.chicago_taxitrips_prep 
    LIMIT 10000


In [11]:
job_name = f"extract-{DATASET_DISPLAY_NAME}-serving-{datetime.now().strftime('%Y%m%d%H%M%S')}"

args = {
    'job_name': job_name,
    #'runner': 'DataflowRunner',
    'sql_query': sql_query,
    'exported_data_prefix': os.path.join(SERVING_INPUT_DATA_DIR, "data-"),
    'temporary_dir': os.path.join(WORKSPACE, 'tmp'),
    'gcs_location': os.path.join(WORKSPACE, 'bq_tmp'),
    'project': PROJECT,
    'region': REGION,
    'setup_file': './setup.py'
}

In [12]:
tf.get_logger().setLevel('ERROR')

print("Data extraction started...")
etl.run_extract_pipeline(args)
print("Data extraction completed.")

Data extraction started...
INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.


  temp_location = pcoll.pipeline.options.view_as(


INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.34.0
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f3ad743ce10> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((((ref_AppliedPTransform_Read-Data-Read-Impulse_10)+(ref_AppliedPTransform_Read-Data-Read-Map-lambda-at-iobase-py-898-_11))+(Read Data/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction))+(Read Data/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction))+(ref_PCollection_PCollection_6_split/Write)
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket de

E0331 14:15:31.453326082       1 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
E0331 14:15:33.326186044       1 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies


INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:apache_beam.io.gcp.bigquery_tools:Started BigQuery job: <JobReference
 location: 'US'
 projectId: 'grandelli-demo-295810'>
 bq show -j --format=prettyjson --project_id=grandelli-demo-295810 None


E0331 14:15:35.733631377       1 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
E0331 14:15:37.681826754       1 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies


INFO:apache_beam.io.gcp.bigquery_tools:Using location 'US' from table <TableReference
 datasetId: 'partner_training'
 projectId: 'grandelli-demo-295810'
 tableId: 'chicago_taxitrips_prep'> referenced by query 
    SELECT 
        IF(trip_month IS NULL, -1, trip_month) trip_month,
        IF(trip_day IS NULL, -1, trip_day) trip_day,
        IF(trip_day_of_week IS NULL, -1, trip_day_of_week) trip_day_of_week,
        IF(trip_hour IS NULL, -1, trip_hour) trip_hour,
        IF(trip_seconds IS NULL, -1, trip_seconds) trip_seconds,
        IF(trip_miles IS NULL, -1, trip_miles) trip_miles,
        IF(payment_type IS NULL, 'NA', payment_type) payment_type,
        IF(pickup_grid IS NULL, 'NA', pickup_grid) pickup_grid,
        IF(dropoff_grid IS NULL, 'NA', dropoff_grid) dropoff_grid,
        IF(euclidean IS NULL, -1, euclidean) euclidean,
        IF(loc_cross IS NULL, 'NA', loc_cross) loc_cross
    FROM partner_training.chicago_taxitrips_prep 
    LIMIT 10000
INFO:apache_beam.io.gcp.bigquery

E0331 14:15:53.228782743       1 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies


INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((Write Data/Write/WriteImpl/GroupByKey/Read)+(ref_AppliedPTransform_Write-Data-Write-WriteImpl-Extract_37))+(ref_PCollection_PCollection_23/Write)
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((ref_PCollection_PCollection_17/Read)+(ref_AppliedPTransform_Write-Data-Write-WriteImpl-PreFinalize_38))+(ref_PCollection_PCollection_24/Write)
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 0 files in 0.051361799240112305 seconds.
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (ref_PCollection_PCollection_17/Read)+(ref_AppliedPTransform_Write-Data-Write-WriteImpl-FinalizeWrite_39)
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 1 files in 0.05796337127685547 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INF

In [13]:
!gsutil ls {SERVING_INPUT_DATA_DIR}

E0331 14:15:55.916792057       1 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies


gs://grandelli-demo-295810-partner-training-2022/chicago-taxi-tips/serving_data/input_data/data--00000-of-00001.jsonl


### Submit the batch prediction job

In [16]:
model_name =  vertex_ai.Model.list(
    filter=f'display_name={MODEL_DISPLAY_NAME}',
    order_by="update_time")[-1].gca_resource.name

E0331 14:16:42.979925125       1 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
E0331 14:16:44.990810410       1 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies


In [18]:
job_resources =  {
    "machine_type": 'n1-standard-2',
    #'accelerator_count': 1,
    #'accelerator_type': 'NVIDIA_TESLA_T4'
    "starting_replica_count": 1,
    "max_replica_count": 10,
}

job_display_name = f"{MODEL_DISPLAY_NAME}-prediction-job-{datetime.now().strftime('%Y%m%d%H%M%S')}"

vertex_ai.BatchPredictionJob.create(
    job_display_name=job_display_name,
    model_name=model_name,
    gcs_source=SERVING_INPUT_DATA_DIR + '/*.jsonl',
    gcs_destination_prefix=SERVING_OUTPUT_DATA_DIR,
    instances_format='jsonl',
    predictions_format='jsonl',
    sync=True,
    **job_resources,
)

E0331 14:17:55.324134545       1 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies


INFO:google.cloud.aiplatform.jobs:Creating BatchPredictionJob
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob created. Resource name: projects/155283586619/locations/us-central1/batchPredictionJobs/1479595755067932672
INFO:google.cloud.aiplatform.jobs:To use this BatchPredictionJob in another session:
INFO:google.cloud.aiplatform.jobs:bpj = aiplatform.BatchPredictionJob('projects/155283586619/locations/us-central1/batchPredictionJobs/1479595755067932672')
INFO:google.cloud.aiplatform.jobs:View Batch Prediction Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/1479595755067932672?project=155283586619
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/155283586619/locations/us-central1/batchPredictionJobs/1479595755067932672 current state:
JobState.JOB_STATE_PENDING
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/155283586619/locations/us-central1/batchPredictionJobs/1479595755067932672 current state:
JobState.JOB_STAT

<google.cloud.aiplatform.jobs.BatchPredictionJob object at 0x7f3ad7451610> 
resource name: projects/155283586619/locations/us-central1/batchPredictionJobs/1479595755067932672

## 3. Run the batch prediction pipeline using Vertex Pipelines

In [20]:
WORKSPACE = f"gs://{BUCKET}/{DATASET_DISPLAY_NAME}/"
ARTIFACT_STORE = os.path.join(WORKSPACE, 'tfx_artifacts')
PIPELINE_NAME = f'{MODEL_DISPLAY_NAME}-predict-pipeline'

### Set the pipeline configurations for the Vertex AI run

In [28]:
os.environ["PROJECT"] = PROJECT
os.environ["REGION"] = REGION
os.environ["GCS_LOCATION"] = f"gs://{BUCKET}/{DATASET_DISPLAY_NAME}"
os.environ["MODEL_DISPLAY_NAME"] = MODEL_DISPLAY_NAME
os.environ["PIPELINE_NAME"] = PIPELINE_NAME
os.environ["ARTIFACT_STORE_URI"] = ARTIFACT_STORE
os.environ["BATCH_PREDICTION_BQ_DATASET_NAME"] = SERVE_BQ_DATASET_NAME
os.environ["BATCH_PREDICTION_BQ_TABLE_NAME"] = SERVE_BQ_TABLE_NAME
os.environ["SERVE_LIMIT"] = "1000"
os.environ["BEAM_RUNNER"] = "DirectRunner"
os.environ["TFX_IMAGE_URI"] = f"gcr.io/{PROJECT}/{DATASET_DISPLAY_NAME}:{VERSION}"

In [30]:
import importlib
from src.tfx_pipelines import config
importlib.reload(config)

for key, value in config.__dict__.items():
    if key.isupper(): print(f'{key}: {value}')

PROJECT: grandelli-demo-295810
REGION: us-central1
GCS_LOCATION: gs://grandelli-demo-295810-partner-training-2022/chicago-taxi-tips
ARTIFACT_STORE_URI: gs://grandelli-demo-295810-partner-training-2022/chicago-taxi-tips/tfx_artifacts
MODEL_REGISTRY_URI: gs://grandelli-demo-295810-partner-training-2022/chicago-taxi-tips/model_registry
DATASET_DISPLAY_NAME: chicago-taxi-tips
MODEL_DISPLAY_NAME: chicago-taxi-tips-classifier-v01
PIPELINE_NAME: chicago-taxi-tips-classifier-v01-predict-pipeline
ML_USE_COLUMN: ml_use
EXCLUDE_COLUMNS: trip_start_timestamp
TRAIN_LIMIT: 0
TEST_LIMIT: 0
SERVE_LIMIT: 1000
NUM_TRAIN_SPLITS: 4
NUM_EVAL_SPLITS: 1
ACCURACY_THRESHOLD: 0.8
USE_KFP_SA: False
TFX_IMAGE_URI: gcr.io/grandelli-demo-295810/chicago-taxi-tips:v01
BEAM_RUNNER: DirectRunner
BEAM_DIRECT_PIPELINE_ARGS: ['--project=grandelli-demo-295810', '--temp_location=gs://grandelli-demo-295810-partner-training-2022/chicago-taxi-tips/temp']
BEAM_DATAFLOW_PIPELINE_ARGS: ['--project=grandelli-demo-295810', '--temp_

### (Optional) Build the ML container image

This is the `TFX` runtime environment for the training pipeline steps.

In [31]:
!echo $TFX_IMAGE_URI

E0331 15:43:33.120220901       1 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies


gcr.io/grandelli-demo-295810/chicago-taxi-tips:v01


In [32]:
!gcloud builds submit --tag $TFX_IMAGE_URI . --timeout=15m --machine-type=e2-highcpu-8

E0331 15:43:34.423646334       1 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies


Creating temporary tarball archive of 59 file(s) totalling 2.6 MiB before compression.
Some files were not included in the source upload.

Check the gcloud log [/home/jupyter/.config/gcloud/logs/2022.03.31/15.43.37.124845.log] to see which files and the contents of the
default gcloudignore file used (see `$ gcloud topic gcloudignore` to learn
more).

Uploading tarball of [.] to [gs://grandelli-demo-295810_cloudbuild/source/1648741417.350531-82bcd07ab4574fffa6dcccfd26885958.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/grandelli-demo-295810/locations/global/builds/5c369433-6f9f-49a1-af60-33d9cfd1eb22].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/5c369433-6f9f-49a1-af60-33d9cfd1eb22?project=155283586619].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "5c369433-6f9f-49a1-af60-33d9cfd1eb22"

FETCHSOURCE
Fetching storage object: gs://grandelli-demo-295810_cloudbuild/source/1648741417.350531-82bcd0

### Compile pipeline

In [33]:
from src.tfx_pipelines import runner

pipeline_definition_file = f'{config.PIPELINE_NAME}.json'
pipeline_definition = runner.compile_prediction_pipeline(pipeline_definition_file)

INFO:root:Pipeline components: ['bigquery_data_gen', 'vertex_batch_prediction', 'datastore_prediction_writer']
INFO:root:Beam pipeline args: ['--project=grandelli-demo-295810', '--temp_location=gs://grandelli-demo-295810-partner-training-2022/chicago-taxi-tips/temp']


### Submit run to Vertex Pipelines

In [26]:
from kfp.v2.google.client import AIPlatformClient

pipeline_client = AIPlatformClient(
    project_id=PROJECT, region=REGION)
                 
pipeline_client.create_run_from_job_spec(
    job_spec_path=pipeline_definition_file
)

E0331 14:56:03.220886862       1 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies


{'name': 'projects/155283586619/locations/us-central1/pipelineJobs/chicago-taxi-tips-classifier-v01-predict-pipeline-20220331145605',
 'displayName': 'chicago-taxi-tips-classifier-v01-predict-pipeline-20220331145605',
 'createTime': '2022-03-31T14:56:05.453498Z',
 'updateTime': '2022-03-31T14:56:05.453498Z',
 'pipelineSpec': {'deploymentConfig': {'@type': 'type.googleapis.com/ml_pipelines.PipelineDeploymentConfig',
   'executors': {'bigquery_data_gen_executor': {'container': {'image': 'gcr.io/grandelli-demo-295810/chicago-taxi-tips:v01',
      'command': ['python',
       '-m',
       'tfx.orchestration.kubeflow.v2.container.kubeflow_v2_run_executor'],
      'args': ['--executor_class_path',
       'src.tfx_pipelines.components.bigquery_data_gen_Executor',
       '--json_serialized_invocation_args',
       '{{$}}',
       '--project=grandelli-demo-295810',
       '--temp_location=gs://grandelli-demo-295810-partner-training-2022/chicago-taxi-tips/temp']}},
    'datastore_prediction_writ