# 02 - ML Experimentation with Custom Model

The purpose of this notebook is to use [custom training](https://cloud.google.com/ai-platform-unified/docs/training/custom-training) to train a keras classifier to predict whether a given trip will result in a tip > 20%. The notebook covers the following tasks:
1. Preprocess the data locally using Apache Beam.
2. Train and test custom model locally using a Keras implementation.
3. Submit a Dataflow job to preprocess the data at scale.
4. Submit a custom training job to Vertex AI using a [pre-built container](https://cloud.google.com/ai-platform-unified/docs/training/pre-built-containers).
5. Upload the trained model to Vertex AI.
6. Track experiment parameters from [Vertex AI Metadata](https://cloud.google.com/vertex-ai/docs/ml-metadata/introduction).
7. Submit a [hyperparameter tuning job](https://cloud.google.com/vertex-ai/docs/training/hyperparameter-tuning-overview) to Vertex AI.

We use [Vertex TensorBoard](https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-overview) 
and [Vertex ML Metadata](https://cloud.google.com/vertex-ai/docs/ml-metadata/introduction) to  track, visualize, and compare ML experiments.

## Setup

### Import libraries

In [1]:
import os
import logging
from datetime import datetime
import numpy as np
from pathlib import Path
from collections.abc import Mapping

import tensorflow as tf
import tensorflow_transform as tft
import tensorflow.keras as keras

from google.cloud import aiplatform as vertex_ai
from google.cloud.aiplatform import hyperparameter_tuning as hp_tuning

from src.common import features, datasource_utils
from src.model_training import data, model, defaults, trainer, exporter
from src.preprocessing import etl

logging.getLogger().setLevel(logging.INFO)
tf.get_logger().setLevel('INFO')

print(f"TensorFlow: {tf.__version__}")
print(f"TensorFlow Transform: {tft.__version__}")

2023-05-19 00:58:56.641175: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-05-19 00:58:57.610193: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64:/usr/local/nccl2/lib:/usr/local/cuda/extras/CUPTI/lib64
2023-05-19 00:58:57.610316: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64:/usr/local/nccl2/lib:/usr/loca

TensorFlow: 2.11.0
TensorFlow Transform: 1.12.0


In [2]:
from platform import python_version

print(python_version())

3.7.12


In [3]:
import tensorflow as tf
import tensorflow_transform as tft
print(f"TensorFlow: {tf.__version__}")
print(f"TensorFlow Transform: {tft.__version__}")

TensorFlow: 2.11.0
TensorFlow Transform: 1.12.0


### Setup Google Cloud project

In [4]:
PROJECT = 'g360-docai' # Change to your project id.
REGION = 'us-west1' # Change to your region.
BUCKET = 'vertex-mlops-chicago-taxi-bucket' # Change to your bucket name.
BUCKET_URI = f"gs://{BUCKET}" 
SERVICE_ACCOUNT = "956259099845-compute@developer.gserviceaccount.com"

if PROJECT == "" or PROJECT is None or PROJECT == "[your-project-id]":
    # Get your GCP project id from gcloud
    shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT = shell_output[0]
    
if SERVICE_ACCOUNT == "" or SERVICE_ACCOUNT is None or SERVICE_ACCOUNT == "[your-service-account]":
    # Get your GCP project id from gcloud
    shell_output = !gcloud config list --format 'value(core.account)' 2>/dev/null
    SERVICE_ACCOUNT = shell_output[0]
    
if BUCKET == "" or BUCKET is None or BUCKET == "[your-bucket-name]":
    # Get your bucket name to GCP projet id
    BUCKET = PROJECT
    # Try to create the bucket if it doesn'exists
    ! gsutil mb -l $REGION gs://$BUCKET
    print("")
    
PARENT = f"projects/{PROJECT}/locations/{REGION}"
    
print("Project ID:", PROJECT)
print("Region:", REGION)
print("Bucket name:", BUCKET)
print("Bucket URI:", BUCKET_URI)
print("Service Account:", SERVICE_ACCOUNT)
print("Vertex API Parent URI:", PARENT)

Project ID: g360-docai
Region: us-west1
Bucket name: vertex-mlops-chicago-taxi-bucket
Bucket URI: gs://vertex-mlops-chicago-taxi-bucket
Service Account: 956259099845-compute@developer.gserviceaccount.com
Vertex API Parent URI: projects/g360-docai/locations/us-west1


### Set configurations

In [5]:
VERSION = f"v01"
DATASET_DISPLAY_NAME = f"chicago-taxi-tips"
MODEL_DISPLAY_NAME = f'{DATASET_DISPLAY_NAME}-classifier-{VERSION}'

WORKSPACE = f"gs://{BUCKET}/{DATASET_DISPLAY_NAME}"
EXPERIMENT_ARTIFACTS_DIR = os.path.join(WORKSPACE, "experiments")
RAW_SCHEMA_LOCATION = f"src/raw_schema/schema.pbtxt"

TENSORBOARD_DISPLAY_NAME = f"tb-{DATASET_DISPLAY_NAME}"
EXPERIMENT_NAME = f"{MODEL_DISPLAY_NAME}"

## Create Vertex TensorBoard instance 

In [6]:
tensorboard_resource = vertex_ai.Tensorboard.create(display_name=TENSORBOARD_DISPLAY_NAME)
tensorboard_resource_name = tensorboard_resource.gca_resource.name
print("TensorBoard resource name:", tensorboard_resource_name)

Creating Tensorboard
Create Tensorboard backing LRO: projects/956259099845/locations/us-central1/tensorboards/1722723614492459008/operations/6372026330887749632
Tensorboard created. Resource name: projects/956259099845/locations/us-central1/tensorboards/1722723614492459008
To use this Tensorboard in another session:
tb = aiplatform.Tensorboard('projects/956259099845/locations/us-central1/tensorboards/1722723614492459008')
TensorBoard resource name: projects/956259099845/locations/us-central1/tensorboards/1722723614492459008


## Initialize workspace

In [7]:
from google.cloud import storage

def bucket_check(bucket_name):
    """Creates a new bucket."""
    storage_client = storage.Client()
    
    # Check if the bucket already exists
    if not storage_client.lookup_bucket(bucket_name):
        bucket = storage_client.create_bucket(bucket_name)
        print("Bucket {} created".format(bucket.name))
    else:
        print("Bucket {} already exists".format(bucket_name))

        
bucket_check(BUCKET)

Bucket vertex-mlops-chicago-taxi-bucket already exists


In [8]:
REMOVE_EXPERIMENT_ARTIFACTS = False
if tf.io.gfile.exists(EXPERIMENT_ARTIFACTS_DIR) and REMOVE_EXPERIMENT_ARTIFACTS:
    print("Removing previous experiment artifacts...")
    tf.io.gfile.rmtree(Path(EXPERIMENT_ARTIFACTS_DIR))

if not tf.io.gfile.exists(EXPERIMENT_ARTIFACTS_DIR):
    print("Creating new experiment artifacts directory...")
    tf.io.gfile.makedirs(Path(EXPERIMENT_ARTIFACTS_DIR))

print("Workspace is ready.")
print("Experiment directory:", EXPERIMENT_ARTIFACTS_DIR)

Creating new experiment artifacts directory...
Workspace is ready.
Experiment directory: gs://vertex-mlops-chicago-taxi-bucket/chicago-taxi-tips/experiments


## Initialize Vertex AI experiment

In [9]:
vertex_ai.init(
    project=PROJECT,
    location=REGION,
    # staging_bucket=BUCKET,
    staging_bucket=BUCKET_URI,
    experiment=EXPERIMENT_NAME
)

run_id = f"run-local-{datetime.now().strftime('%Y%m%d%H%M%S')}"
vertex_ai.start_run(run_id)

EXPERIMENT_RUN_DIR = str(Path(os.path.join(EXPERIMENT_ARTIFACTS_DIR, EXPERIMENT_NAME, run_id)))
print("Experiment run directory:", EXPERIMENT_RUN_DIR)

Associating projects/956259099845/locations/us-west1/metadataStores/default/contexts/chicago-taxi-tips-classifier-v01-run-local-20230519005902 to Experiment: chicago-taxi-tips-classifier-v01
Experiment run directory: gs:/vertex-mlops-chicago-taxi-bucket/chicago-taxi-tips/experiments/chicago-taxi-tips-classifier-v01/run-local-20230519005902


## 1. Preprocess the data using Apache Beam

The Apache Beam pipeline of data preprocessing is implemented in the [preprocessing](src/preprocessing) directory.

In [10]:
EXPORTED_DATA_PREFIX = str(Path(os.path.join(EXPERIMENT_RUN_DIR, 'exported_data')))
TRANSFORMED_DATA_PREFIX = str(Path(os.path.join(EXPERIMENT_RUN_DIR, 'transformed_data')))
TRANSFORM_ARTIFACTS_DIR = str(Path(os.path.join(EXPERIMENT_RUN_DIR, 'transform_artifacts')))

### Get Source Query from Managed Dataset

In [11]:
ML_USE = 'UNASSIGNED'
LIMIT = 5120

raw_data_query = datasource_utils.get_training_source_query(
    project=PROJECT, 
    region=REGION, 
    dataset_display_name=DATASET_DISPLAY_NAME, 
    ml_use=ML_USE, 
    limit=LIMIT
)

print(raw_data_query)


    SELECT 
        IF(trip_month IS NULL, -1, trip_month) trip_month,
        IF(trip_day IS NULL, -1, trip_day) trip_day,
        IF(trip_day_of_week IS NULL, -1, trip_day_of_week) trip_day_of_week,
        IF(trip_hour IS NULL, -1, trip_hour) trip_hour,
        IF(trip_seconds IS NULL, -1, trip_seconds) trip_seconds,
        IF(trip_miles IS NULL, -1, trip_miles) trip_miles,
        IF(payment_type IS NULL, 'NA', payment_type) payment_type,
        IF(pickup_grid IS NULL, 'NA', pickup_grid) pickup_grid,
        IF(dropoff_grid IS NULL, 'NA', dropoff_grid) dropoff_grid,
        IF(euclidean IS NULL, -1, euclidean) euclidean,
        IF(loc_cross IS NULL, 'NA', loc_cross) loc_cross,
        tip_bin
    FROM playground_us.chicago_taxitrips_prep 
    WHERE ML_use = 'UNASSIGNED'
    LIMIT 5120


### Test Data Preprocessing Locally

In [12]:
# args = {
#     'runner': 'DirectRunner',
#     'raw_data_query': raw_data_query,
#     'write_raw_data': True,
#     'exported_data_prefix': EXPORTED_DATA_PREFIX,
#     'transformed_data_prefix': TRANSFORMED_DATA_PREFIX,
#     'transform_artifact_dir': TRANSFORM_ARTIFACTS_DIR,
#     'temporary_dir': os.path.join(WORKSPACE, 'tmp'),
#     'gcs_location': f'gs://{BUCKET}/bq_tmp',
#     'project': PROJECT
# }

# args = {
#     'runner': 'DirectRunner',
#     'raw_data_query': raw_data_query,
#     'write_raw_data': True,
#     'exported_data_prefix': EXPORTED_DATA_PREFIX,
#     'transformed_data_prefix': TRANSFORMED_DATA_PREFIX,
#     'transform_artifact_dir': TRANSFORM_ARTIFACTS_DIR,
#     'temporary_dir': os.path.join(WORKSPACE, 'tmp'),
#     'gcs_location': BUCKET_URI,
#     'project': PROJECT
# }

# Start of the code
args = {
    'runner': 'DirectRunner',
    'raw_data_query': raw_data_query,
    'write_raw_data': True,
    'exported_data_prefix': EXPORTED_DATA_PREFIX,
    'transformed_data_prefix': TRANSFORMED_DATA_PREFIX,
    'transform_artifact_dir': TRANSFORM_ARTIFACTS_DIR,
    'temporary_dir': os.path.join(WORKSPACE, 'tmp'),
    'gcs_location': f'gs://{BUCKET}/bq_tmp',
    'project': PROJECT
}


In [13]:
vertex_ai.log_params(args)

In [14]:
# import apache_beam as beam

# # Create a PCollection from the instance dicts
# pipeline = beam.Pipeline()
# instance_dicts_pcoll = pipeline | beam.Create(instance_dicts)


In [15]:
print("Data preprocessing started...")
etl.run_transform_pipeline(args)
print("Data preprocessing completed.")

INFO:absl:Feature trip_month has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_day has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_day_of_week has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_hour has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_seconds has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_miles has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature payment_type has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature pickup_grid has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature dropoff_grid has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature euclidean has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature loc_cross has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature tip_bin has a shape dim {
  size: 1
}
.

Data preprocessing started...






  temp_location = pcoll.pipeline.options.view_as(
INFO:absl:Feature dropoff_grid has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature euclidean has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature loc_cross has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature payment_type has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature pickup_grid has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature tip_bin has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_day has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_day_of_week has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_hour has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_miles has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_month has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Fe

Instructions for updating:
Use ref() instead.


2023-05-19 00:59:06.512932: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:981] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-05-19 00:59:06.525679: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:981] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-05-19 00:59:06.527377: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:981] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2023-05-19 00:59:06.529782: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorF



INFO:absl:Feature dropoff_grid has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature euclidean has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature loc_cross has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature payment_type has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature pickup_grid has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature tip_bin has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_day has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_day_of_week has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_hour has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_miles has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_month has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_seconds has a shape dim {
  size: 1
}
.



INFO:absl:Feature dropoff_grid has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature euclidean has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature loc_cross has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature payment_type has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature pickup_grid has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature tip_bin has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_day has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_day_of_week has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_hour has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_miles has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_month has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature trip_seconds has a shape dim {
  size: 1
}
.

INFO:tensorflow:Assets written to: gs:/vertex-mlops-chicago-taxi-bucket/chicago-taxi-tips/tmp/tftransform_tmp/91334bb9040b4a21be5b443edcd0faec/assets


INFO:tensorflow:Assets written to: gs:/vertex-mlops-chicago-taxi-bucket/chicago-taxi-tips/tmp/tftransform_tmp/91334bb9040b4a21be5b443edcd0faec/assets


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:tensorflow_text is not available.
INFO:apache_beam.io.gcp.gcsio:Finished listing 1 files in 0.045641422271728516 seconds.
INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1
INFO:apache_beam.io.filebasedsink:Renamed 1 shards in 0.10 seconds.
INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1
INFO:apache_beam.io.filebasedsink:Renamed 1 shards in 0.10 seconds.
INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1
INFO:apache_beam.io.filebasedsink:Renamed 1 shards in 0.10 seconds.
INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1
INFO:apache_beam.io.filebasedsink:Renamed 1 shards in 0.10 seconds.
INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 1 (

INFO:tensorflow:Assets written to: gs:/vertex-mlops-chicago-taxi-bucket/chicago-taxi-tips/tmp/tftransform_tmp/864f0945a07a430792d171a6ff8895b6/assets


INFO:tensorflow:Assets written to: gs:/vertex-mlops-chicago-taxi-bucket/chicago-taxi-tips/tmp/tftransform_tmp/864f0945a07a430792d171a6ff8895b6/assets


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:struct2tensor is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_decision_forests is not available.


INFO:tensorflow:tensorflow_text is not available.


INFO:tensorflow:tensorflow_text is not available.
INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1
INFO:apache_beam.io.filebasedsink:Renamed 1 shards in 0.10 seconds.
INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1
INFO:apache_beam.io.filebasedsink:Renamed 1 shards in 0.10 seconds.


Data preprocessing completed.


In [16]:
!gsutil ls {EXPERIMENT_RUN_DIR}

CommandException: "ls" command does not support "file://" URLs. Did you mean to use a gs:// URL?


## 2. Train a custom model locally using a Keras

The `Keras` implementation of the custom model is in the [model_training](src/model_training) directory.

In [17]:
LOG_DIR = os.path.join(EXPERIMENT_RUN_DIR, 'logs')
EXPORT_DIR = os.path.join(EXPERIMENT_RUN_DIR, 'model')

### Read transformed data

In [18]:
tft_output = tft.TFTransformOutput(Path(TRANSFORM_ARTIFACTS_DIR))
transform_feature_spec = tft_output.transformed_feature_spec()
transform_feature_spec

{'dropoff_grid_xf': FixedLenFeature(shape=[], dtype=tf.int64, default_value=None),
 'euclidean_xf': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None),
 'loc_cross_xf': FixedLenFeature(shape=[], dtype=tf.int64, default_value=None),
 'payment_type_xf': FixedLenFeature(shape=[], dtype=tf.int64, default_value=None),
 'pickup_grid_xf': FixedLenFeature(shape=[], dtype=tf.int64, default_value=None),
 'tip_bin': FixedLenFeature(shape=[], dtype=tf.int64, default_value=None),
 'trip_day_of_week_xf': FixedLenFeature(shape=[], dtype=tf.int64, default_value=None),
 'trip_day_xf': FixedLenFeature(shape=[], dtype=tf.int64, default_value=None),
 'trip_hour_xf': FixedLenFeature(shape=[], dtype=tf.int64, default_value=None),
 'trip_miles_xf': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None),
 'trip_month_xf': FixedLenFeature(shape=[], dtype=tf.int64, default_value=None),
 'trip_seconds_xf': FixedLenFeature(shape=[], dtype=tf.float32, default_value=None)}

In [19]:
train_data_file_pattern = os.path.join(Path(TRANSFORMED_DATA_PREFIX),'train/data-*.gz')
eval_data_file_pattern = os.path.join(Path(TRANSFORMED_DATA_PREFIX),'eval/data-*.gz')

for input_features, target in data.get_dataset(
    train_data_file_pattern, transform_feature_spec, batch_size=3).take(1):
    for key in input_features:
        print(f"{key} {input_features[key].dtype}: {input_features[key].numpy().tolist()}")
    print(f"target: {target.numpy().tolist()}")

dropoff_grid_xf <dtype: 'int64'>: [0, 0, 0]
euclidean_xf <dtype: 'float32'>: [-0.8328947424888611, -0.8328947424888611, -0.8328947424888611]
loc_cross_xf <dtype: 'int64'>: [0, 0, 0]
payment_type_xf <dtype: 'int64'>: [0, 0, 0]
pickup_grid_xf <dtype: 'int64'>: [0, 0, 0]
trip_day_of_week_xf <dtype: 'int64'>: [0, 1, 4]
trip_day_xf <dtype: 'int64'>: [14, 7, 12]
trip_hour_xf <dtype: 'int64'>: [0, 14, 23]
trip_miles_xf <dtype: 'float32'>: [-0.5798476338386536, -0.40228089690208435, -0.4614697992801666]
trip_month_xf <dtype: 'int64'>: [3, 2, 3]
trip_seconds_xf <dtype: 'float32'>: [-0.341960072517395, -0.2984708249568939, -0.2836449444293976]
target: [0, 0, 0]


### Create hyperparameters

In [20]:
hyperparams = {
    "hidden_units": [64, 32]
}

hyperparams = defaults.update_hyperparams(hyperparams)
hyperparams

{'hidden_units': [64, 32],
 'learning_rate': 0.0001,
 'batch_size': 512,
 'num_epochs': 10}

In [21]:
from src.common import features, datasource_utils
from src.model_training import data, model, defaults, trainer, exporter
from src.preprocessing import etl

logging.getLogger().setLevel(logging.INFO)
tf.get_logger().setLevel('INFO')

In [22]:
# from tensorflow_transform import TFTransformOutput
# from absl import logging

# def bkm(tf_transform_output: TFTransformOutput
#                        ) -> tf.keras.Model:
#   """Creates a DNN Keras model for classifying taxi data.

#   Args:
#     tf_transform_output: [TFTransformOutput], the outputs from Transform

#   Returns:
#     A keras Model.
#   """
#   feature_spec = tf_transform_output.transformed_feature_spec().copy()
#   feature_spec.pop(_LABEL_KEY)

#   inputs = {}
#   for key, spec in feature_spec.items():
#     if isinstance(spec, tf.io.VarLenFeature):
#       inputs[key] = tf.keras.layers.Input(
#           shape=[None], name=key, dtype=spec.dtype, sparse=True)
#     elif isinstance(spec, tf.io.FixedLenFeature):
#       # TODO(b/208879020): Move into schema such that spec.shape is [1] and not
#       # [] for scalars.
#       inputs[key] = tf.keras.layers.Input(
#           shape=spec.shape or [1], name=key, dtype=spec.dtype)
#     else:
#       raise ValueError('Spec type is not supported: ', key, spec)

#   output = tf.keras.layers.Concatenate()(tf.nest.flatten(inputs))
#   output = tf.keras.layers.Dense(100, activation='relu')(output)
#   output = tf.keras.layers.Dense(70, activation='relu')(output)
#   output = tf.keras.layers.Dense(50, activation='relu')(output)
#   output = tf.keras.layers.Dense(20, activation='relu')(output)
#   output = tf.keras.layers.Dense(1)(output)
#   return tf.keras.Model(inputs=inputs, outputs=output)

In [68]:
# bkm(tft_output)
# features.FEATURE_NAMES
inputs = {}
d = features
for feature_name in features.FEATURE_NAMES:
    name = features.transformed_name(feature_name)
    if isinstance(feature_name, tf.io.VarLenFeature):
          inputs[name] = tf.keras.layers.Input(
              shape=[None], name=key, dtype=tf.float32, sparse=True)
    elif isinstance(feature_name, tf.io.FixedLenFeature):
      # TODO(b/208879020): Move into schema such that spec.shape is [1] and not
      # [] for scalars.
          inputs[name] = tf.keras.layers.Input(
              shape=[1], name=key, dtype=tf.float32)
    else:
          raise ValueError('Spec type is not supported: ', key, feature_name)

ValueError: ('Spec type is not supported: ', 'trip_seconds_xf', 'trip_month')

In [93]:
def create_model_inp():
    inputs = {}
    for feature_name in features.FEATURE_NAMES:
        name = features.transformed_name(feature_name)
        if feature_name in features.NUMERICAL_FEATURE_NAMES:
            inputs[name] = keras.layers.Input(name=key, shape=(1,), dtype=tf.float32, ragged=True)
        elif feature_name in features.categorical_feature_names():
            inputs[name] = keras.layers.Input(name=key, shape=(1,), dtype=tf.float32, ragged=True)
        else:
            pass
    return inputs
test_layer = create_model_inp()
test_layer

{'trip_month_xf': <KerasTensor: type_spec=RaggedTensorSpec(TensorShape([None, 1]), tf.float32, 0, tf.int64) (created by layer 'trip_seconds_xf')>,
 'trip_day_xf': <KerasTensor: type_spec=RaggedTensorSpec(TensorShape([None, 1]), tf.float32, 0, tf.int64) (created by layer 'trip_seconds_xf')>,
 'trip_day_of_week_xf': <KerasTensor: type_spec=RaggedTensorSpec(TensorShape([None, 1]), tf.float32, 0, tf.int64) (created by layer 'trip_seconds_xf')>,
 'trip_hour_xf': <KerasTensor: type_spec=RaggedTensorSpec(TensorShape([None, 1]), tf.float32, 0, tf.int64) (created by layer 'trip_seconds_xf')>,
 'trip_seconds_xf': <KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'trip_seconds_xf')>,
 'trip_miles_xf': <KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'trip_seconds_xf')>,
 'payment_type_xf': <KerasTensor: type_spec=RaggedTensorSpec(TensorShape([None, 1]), tf.float32, 0, tf.int64) (created by layer 'trip_seconds_xf')>,
 'pickup_grid_xf': <KerasTensor: type_spec=RaggedTensorS

In [200]:
from src.common import features

def create_model_inp():
    inputs = {}
    for feature_name in features.FEATURE_NAMES:
        name = features.transformed_name(feature_name)
        if feature_name in features.NUMERICAL_FEATURE_NAMES:
            inputs[name] = keras.layers.Input(name=name, shape=(1,), dtype=tf.float32, ragged=True)
        elif feature_name in features.categorical_feature_names():
            inputs[name] = keras.layers.Input(name=name, shape=(1,), dtype=tf.int64, ragged=True)
        else:
            pass
    return inputs

def _cbc(feature_vocab_sizes, hyperparams):
    input_layers = create_model_inp()
    layers = []
    for key in input_layers:
        feature_name = features.original_name(key)
        if feature_name in features.EMBEDDING_CATEGORICAL_FEATURES:
            vocab_size = feature_vocab_sizes[feature_name]
            embedding_size = features.EMBEDDING_CATEGORICAL_FEATURES[feature_name]
            embedding_output = keras.layers.Embedding(
                input_dim=vocab_size + 1,
                output_dim=embedding_size,
                name=f"{key}_embedding",
            )(input_layers[key])
            layers.append(embedding_output)
        elif feature_name in features.ONEHOT_CATEGORICAL_FEATURE_NAMES:
            vocab_size = feature_vocab_sizes[feature_name]
            onehot_layer = keras.layers.experimental.preprocessing.CategoryEncoding(
                num_tokens=vocab_size + 1,
                output_mode="binary",
                name=f"{key}_onehot",
            )(input_layers[key])
            layers.append(onehot_layer)
        elif feature_name in features.NUMERICAL_FEATURE_NAMES:
            numeric_layer = keras.layers.Reshape((-1,), name=f"{key}_numeric")(input_layers[key])  # Use Reshape with (-1,) shape
            layers.append(numeric_layer)
        else:
            pass

    reshaped_layers = []
    for layer in layers:
        if len(layer.shape) > 2:
            reshaped_layer = keras.layers.Reshape((-1,))(layer)
        else:
            reshaped_layer = layer
        reshaped_layers.append(reshaped_layer)

    joined = keras.layers.Concatenate(name="combines_inputs", axis=-1)(reshaped_layers)
    feedforward_output = keras.Sequential(
        [
            keras.layers.Dense(units, activation="relu")
            for units in hyperparams["hidden_units"]
        ],
        name="feedforward_network",
    )(joined)
    logits = keras.layers.Dense(units=1, name="logits")(feedforward_output)

    model = keras.Model(inputs=input_layers, outputs=[logits])
    return model

def create_binary(tft_output, hyperparams):
    feature_vocab_sizes = dict()
    for feature_name in features.categorical_feature_names():
        feature_vocab_sizes[feature_name] = tft_output.vocabulary_size_by_name(
            feature_name
        )
    return _cbc(feature_vocab_sizes, hyperparams)

In [None]:
classy = create_binary(tft_output, hyperparams)
classy.summary()

Model: "model_7"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 trip_month_xf (InputLayer)     [(None, 1)]          0           []                               
                                                                                                  
 trip_day_xf (InputLayer)       [(None, 1)]          0           []                               
                                                                                                  
 trip_hour_xf (InputLayer)      [(None, 1)]          0           []                               
                                                                                                  
 pickup_grid_xf (InputLayer)    [(None, 1)]          0           []                               
                                                                                            

### Create and test model inputs and outputs

In [24]:
classifier = model.create_binary_classifier(tft_output, hyperparams)
classifier.summary()

AttributeError: 'str' object has no attribute 'shape'

In [None]:
keras.utils.plot_model(
    classifier, 
    show_shapes=True, 
    show_dtype=True
)

In [None]:
classifier(input_features)

In [None]:
from itertools import chain
test = [(None, 2), (None, 4), (7,), (None, 3), (None, 1), (None, 1), (6,), (None, 3), (None, 3), (None, 1), (None, 10)]
test = tuple(chain.from_iterable(test[1]))
test

### Train the model locally.

In [None]:
logging.getLogger().setLevel(logging.INFO)

hyperparams["learning_rate"] = 0.001
hyperparams["num_epochs"] = 5
hyperparams["batch_size"] = 512

vertex_ai.log_params(hyperparams)

In [None]:
classifier = trainer.train(
    train_data_dir=train_data_file_pattern,
    eval_data_dir=eval_data_file_pattern,
    tft_output_dir=TRANSFORM_ARTIFACTS_DIR,
    hyperparams=hyperparams,
    log_dir=LOG_DIR,
)

In [None]:
val_loss, val_accuracy = trainer.evaluate(
    model=classifier,
    data_dir=eval_data_file_pattern,
    raw_schema_location=RAW_SCHEMA_LOCATION,
    tft_output_dir=TRANSFORM_ARTIFACTS_DIR,
    hyperparams=hyperparams,
)

In [None]:
vertex_ai.log_metrics(
    {"val_loss": val_loss, "val_accuracy": val_accuracy})

In [None]:
!tb-gcp-uploader --tensorboard_resource_name={tensorboard_resource_name} \
  --logdir={LOG_DIR} \
  --experiment_name={EXPERIMENT_NAME} --one_shot=True

### Export the trained model

In [None]:
saved_model_dir = os.path.join(EXPORT_DIR)

exporter.export_serving_model(
    classifier=classifier,
    serving_model_dir=saved_model_dir,
    raw_schema_location=RAW_SCHEMA_LOCATION,
    tft_output_dir=TRANSFORM_ARTIFACTS_DIR,
)

### Inspect model serving signatures

In [None]:
!saved_model_cli show --dir={saved_model_dir} --tag_set=serve --signature_def=serving_tf_example

In [None]:
!saved_model_cli show --dir={saved_model_dir} --tag_set=serve --signature_def=serving_default

### Test the exported SavedModel

In [None]:
serving_model = tf.saved_model.load(saved_model_dir)
print("Saved model is loaded.")

In [None]:
# Test the serving_tf_example with TF Examples

file_names = tf.data.TFRecordDataset.list_files(EXPORTED_DATA_PREFIX + '/data-*.tfrecord')
for batch in tf.data.TFRecordDataset(file_names).batch(3).take(1):
    predictions = serving_model.signatures['serving_tf_example'](batch)
    for key in predictions:
        print(f"{key}: {predictions[key]}")

In [None]:
# Test the serving_default with feature dictionary

import tensorflow_data_validation as tfdv
from tensorflow_transform.tf_metadata import schema_utils

raw_schema = tfdv.load_schema_text(RAW_SCHEMA_LOCATION)
raw_feature_spec = schema_utils.schema_as_feature_spec(raw_schema).feature_spec

In [None]:
instance = {
    "dropoff_grid": "POINT(-87.6 41.9)",
    "euclidean": 2064.2696,
    "loc_cross": "",
    "payment_type": "Credit Card",
    "pickup_grid": "POINT(-87.6 41.9)",
    "trip_miles": 1.37,
    "trip_day": 12,
    "trip_hour": 6,
    "trip_month": 2,
    "trip_day_of_week": 4,
    "trip_seconds": 555,
}

for feature_name in instance:
    dtype = raw_feature_spec[feature_name].dtype
    instance[feature_name] = tf.constant([[instance[feature_name]]], dtype)

In [None]:
predictions = serving_model.signatures['serving_default'](**instance)
for key in predictions:
    print(f"{key}: {predictions[key].numpy()}")

## Start a new Vertex AI experiment run

In [None]:
vertex_ai.init(
    project=PROJECT,
    staging_bucket=BUCKET,
    experiment=EXPERIMENT_NAME)

run_id = f"run-gcp-{datetime.now().strftime('%Y%m%d%H%M%S')}"
vertex_ai.start_run(run_id)

EXPERIMENT_RUN_DIR = os.path.join(EXPERIMENT_ARTIFACTS_DIR, EXPERIMENT_NAME, run_id)
print("Experiment run directory:", EXPERIMENT_RUN_DIR)

## 3. Submit a Data Processing Job to Dataflow

In [None]:
EXPORTED_DATA_PREFIX = os.path.join(EXPERIMENT_RUN_DIR, 'exported_data')
TRANSFORMED_DATA_PREFIX = os.path.join(EXPERIMENT_RUN_DIR, 'transformed_data')
TRANSFORM_ARTIFACTS_DIR = os.path.join(EXPERIMENT_RUN_DIR, 'transform_artifacts')

In [None]:
ML_USE = 'UNASSIGNED'
LIMIT = 1000000
raw_data_query = datasource_utils.get_training_source_query(
    project=PROJECT, 
    region=REGION, 
    dataset_display_name=DATASET_DISPLAY_NAME, 
    ml_use=ML_USE, 
    limit=LIMIT
)

etl_job_name = f"etl-{MODEL_DISPLAY_NAME}-{run_id}"

args = {
    'job_name': etl_job_name,
    'runner': 'DataflowRunner',
    'raw_data_query': raw_data_query,
    'exported_data_prefix': EXPORTED_DATA_PREFIX,
    'transformed_data_prefix': TRANSFORMED_DATA_PREFIX,
    'transform_artifact_dir': TRANSFORM_ARTIFACTS_DIR,
    'write_raw_data': False,
    'temporary_dir': os.path.join(WORKSPACE, 'tmp'),
    'gcs_location': os.path.join(WORKSPACE, 'bq_tmp'),
    'project': PROJECT,
    'region': REGION,
    'setup_file': './setup.py'
}

In [None]:
vertex_ai.log_params(args)

In [None]:
logging.getLogger().setLevel(logging.ERROR)

print("Data preprocessing started...")
etl.run_transform_pipeline(args)
print("Data preprocessing completed.")

In [None]:
!gsutil ls {EXPERIMENT_RUN_DIR}

## 4. Submit a Custom Training Job to Vertex AI

In [None]:
LOG_DIR = os.path.join(EXPERIMENT_RUN_DIR, 'logs')
EXPORT_DIR = os.path.join(EXPERIMENT_RUN_DIR, 'model')

### Test the training task locally

In [None]:
!python -m src.model_training.task \
    --model-dir={EXPORT_DIR} \
    --log-dir={LOG_DIR} \
    --train-data-dir={TRANSFORMED_DATA_PREFIX}/train/* \
    --eval-data-dir={TRANSFORMED_DATA_PREFIX}/eval/*  \
    --tft-output-dir={TRANSFORM_ARTIFACTS_DIR} \
    --num-epochs=3 \
    --hidden-units=32,32 \
    --experiment-name={EXPERIMENT_NAME} \
    --run-name={run_id} \
    --project={PROJECT} \
    --region={REGION} \
    --staging-bucket={BUCKET}

### Prepare training package

In [None]:
TRAINER_PACKAGE_DIR = os.path.join(WORKSPACE, 'trainer_packages')
TRAINER_PACKAGE_NAME = f'{MODEL_DISPLAY_NAME}_trainer'
print("Trainer package upload location:", TRAINER_PACKAGE_DIR)

In [None]:
!rm -r src/__pycache__/
!rm -r src/.ipynb_checkpoints/
!rm -r src/raw_schema/.ipynb_checkpoints/
!rm -f {TRAINER_PACKAGE_NAME}.tar {TRAINER_PACKAGE_NAME}.tar.gz

!mkdir {TRAINER_PACKAGE_NAME}

!cp setup.py {TRAINER_PACKAGE_NAME}/
!cp -r src {TRAINER_PACKAGE_NAME}/
!tar cvf {TRAINER_PACKAGE_NAME}.tar {TRAINER_PACKAGE_NAME}
!gzip {TRAINER_PACKAGE_NAME}.tar
!gsutil cp {TRAINER_PACKAGE_NAME}.tar.gz {TRAINER_PACKAGE_DIR}/
!rm -r {TRAINER_PACKAGE_NAME}
!rm -r {TRAINER_PACKAGE_NAME}.tar.gz

### Prepare the training job

In [None]:
TRAIN_RUNTIME = 'tf-cpu.2-5'
TRAIN_IMAGE = f"us-docker.pkg.dev/vertex-ai/training/{TRAIN_RUNTIME}:latest"
print("Training image:", TRAIN_IMAGE)

In [None]:
num_epochs = 10
learning_rate = 0.001
hidden_units = "64,64"

trainer_args = [
    f'--train-data-dir={TRANSFORMED_DATA_PREFIX + "/train/*"}',
    f'--eval-data-dir={TRANSFORMED_DATA_PREFIX + "/eval/*"}',
    f'--tft-output-dir={TRANSFORM_ARTIFACTS_DIR}',
    f'--num-epochs={num_epochs}',
    f'--learning-rate={learning_rate}',
    f'--project={PROJECT}',
    f'--region={REGION}',
    f'--staging-bucket={BUCKET}',
    f'--experiment-name={EXPERIMENT_NAME}'
]

In [None]:
package_uri = os.path.join(TRAINER_PACKAGE_DIR, f'{TRAINER_PACKAGE_NAME}.tar.gz')

worker_pool_specs = [
    {
        "replica_count": 1,
        "machine_spec": {
            "machine_type": 'n1-standard-4',
            "accelerator_count": 0
    },
        "python_package_spec": {
            "executor_image_uri": TRAIN_IMAGE,
            "package_uris": [package_uri],
            "python_module": "src.model_training.task",
            "args": trainer_args,
        }
    }
]

### Submit the training job

In [None]:
print("Submitting a custom training job...")

training_job_display_name = f"{TRAINER_PACKAGE_NAME}_{run_id}"

training_job = vertex_ai.CustomJob(
    display_name=training_job_display_name,
    worker_pool_specs=worker_pool_specs,
    base_output_dir=EXPERIMENT_RUN_DIR,
)

training_job.run(
    service_account=SERVICE_ACCOUNT,
    tensorboard=tensorboard_resource_name,
    sync=True
)

## 5. Upload exported model to Vertex AI Models

In [None]:
!gsutil ls {EXPORT_DIR}

### Generate the Explanation metadata

In [None]:
explanation_config = features.generate_explanation_config()
explanation_config

### Upload model

In [None]:
SERVING_RUNTIME='tf2-cpu.2-5'
SERVING_IMAGE = f"us-docker.pkg.dev/vertex-ai/prediction/{SERVING_RUNTIME}:latest"
print("Serving image:", SERVING_IMAGE)

In [None]:
explanation_metadata = vertex_ai.explain.ExplanationMetadata(
    inputs=explanation_config["inputs"],
    outputs=explanation_config["outputs"],
)
explanation_parameters = vertex_ai.explain.ExplanationParameters(
    explanation_config["params"]
)

vertex_model = vertex_ai.Model.upload(
    display_name=MODEL_DISPLAY_NAME,
    artifact_uri=EXPORT_DIR,
    serving_container_image_uri=SERVING_IMAGE,
    parameters_schema_uri=None,
    instance_schema_uri=None,
    explanation_metadata=explanation_metadata,
    explanation_parameters=explanation_parameters,
    labels={
        'dataset_name': DATASET_DISPLAY_NAME,
        'experiment': run_id
    }
)

In [None]:
vertex_model.gca_resource

## 6. Extract experiment run parameters

In [None]:
experiment_df = vertex_ai.get_experiment_df()
experiment_df = experiment_df[experiment_df.experiment_name == EXPERIMENT_NAME]
experiment_df.T

In [None]:
print("Vertex AI Experiments:")
print(
    f"https://console.cloud.google.com/vertex-ai/locations{REGION}/experiments/{EXPERIMENT_NAME}/metrics?project={PROJECT}"
)

## 7. Submit a Hyperparameter Tuning Job to Vertex AI

For more information about configuring a hyperparameter study, refer to [Vertex AI Hyperparameter job configuration](https://cloud.google.com/vertex-ai/docs/training/using-hyperparameter-tuning).

### Configure a hyperparameter job

In [None]:
metric_spec = {
    'ACCURACY': 'maximize'
}

parameter_spec = {
    'learning-rate': hp_tuning.DoubleParameterSpec(min=0.0001, max=0.01, scale='log'),
    'hidden-units': hp_tuning.CategoricalParameterSpec(values=["32,32", "64,64", "128,128"])
}

In [None]:
tuning_job_display_name = f"hpt_{TRAINER_PACKAGE_NAME}_{run_id}"

hp_tuning_job = vertex_ai.HyperparameterTuningJob(
    display_name=tuning_job_display_name,
    custom_job=training_job,
    metric_spec=metric_spec,
    parameter_spec=parameter_spec,
    max_trial_count=4,
    parallel_trial_count=2,
    search_algorithm=None # Bayesian optimization.
)

### Submit the hyperparameter tuning job

In [None]:
print("Submitting a hyperparameter tunning job...")

hp_tuning_job.run(
    service_account=SERVICE_ACCOUNT,
    tensorboard=tensorboard_resource_name,
    restart_job_on_worker_restart=False,
    sync=True,
)

### Retrieve trial results

In [None]:
hp_tuning_job.trials

In [None]:
best_trial = sorted(
    hp_tuning_job.trials, 
    key=lambda trial: trial.final_measurement.metrics[0].value, 
    reverse=True
)[0]

print("Best trial ID:", best_trial.id)
print("Validation Accuracy:", best_trial.final_measurement.metrics[0].value)
print("Hyperparameter Values:")
for parameter in best_trial.parameters:
    print(f" - {parameter.parameter_id}:{parameter.value}")