### Objective

In this notebook, you will learn how to create a custom-trained model from a Python script in a Docker container using the Vertex AI SDK for Python, and then get a prediction from the deployed model by sending data.

This tutorial uses the following Google Cloud ML services and resources:

- BigQuery
- Cloud Storage
- Vertex AI managed Datasets
- Vertex AI Training
- Vertex AI Endpoints

The steps performed include:

- Create a Vertex AI custom `TrainingPipeline` for training a model.
- Train a TensorFlow model.
- Deploy the `Model` resource to a serving `Endpoint` resource.
- Make a prediction.

### Dataset

The dataset used for this tutorial is the penguins dataset from [BigQuery public datasets](https://cloud.google.com/bigquery/public-data). For this tutorial, you use only the fields `culmen_length_mm`, `culmen_depth_mm`, `flipper_length_mm`, `body_mass_g` from the dataset to predict the penguins species (`species`).

## Installation

Install the latest version of Cloud Storage, Bigquery and Vertex AI SDKs for Python.

In [None]:
# Install the packages
! pip3 install --upgrade google-cloud-aiplatform \
                        google-cloud-storage \
                        'google-cloud-bigquery[pandas]'

#automatically restarts kernel
import IPython 
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

## Before you begin

#### Set your project ID

In [1]:
PROJECT=!(gcloud config get-value project)
PROJECT_ID=PROJECT[0]

# Set the project id
! gcloud config set project {PROJECT_ID}

Updated property [core/project].


#### Region

Set the `REGION` variable as per the lab instructions.

In [2]:
REGION = "us-east1"  # TODO

In [3]:
BUCKET_NAME = "qwiklabs-gcp-02-f695bba24cbb-cepf" # update it from the lab instructions
BUCKET_URI = f"gs://{BUCKET_NAME}"

### Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [4]:
from google.cloud import aiplatform

# Initialize the Vertex AI SDK
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

### Initialize BigQuery Client

Initialize the BigQuery Python client for your project.

In [5]:
from google.cloud import bigquery

# Set up BigQuery client
bq_client = bigquery.Client(project=PROJECT_ID)

## Task 4. Create a Vertex AI Tabular Dataset from the BigQuery dataset

### Preprocess data and split data
- Convert categorical features to numeric
- Split train and test data in the fration 80-20 ratio

In [6]:
import numpy as np
import pandas as pd

LABEL_COLUMN = "species"

# Define the BigQuery source dataset
BQ_SOURCE = "bigquery-public-data.ml_datasets.penguins"

# Define NA values
NA_VALUES = ["NA", "."]

# Download a table
table = bq_client.get_table(BQ_SOURCE)
df = bq_client.list_rows(table).to_dataframe()

# Drop unusable rows
df = df.replace(to_replace=NA_VALUES, value=np.NaN).dropna()

# Convert categorical columns to numeric
df["island"], _ = pd.factorize(df["island"])
df["species"], _ = pd.factorize(df["species"])
df["sex"], _ = pd.factorize(df["sex"])

# Split into a training and holdout dataset
df_train = df.sample(frac=0.8, random_state=100)
df_holdout = df[~df.index.isin(df_train.index)]



### Create a Vertex AI Tabular Dataset

Create a Vertex AI tabular dataset resource from BigQuery training data.


In [8]:
# Create BigQuery dataset
BQ_DATASET="cepf_penguins_dataset"
bq_dataset_id = f"{PROJECT_ID}.{BQ_DATASET}"
bq_dataset = bigquery.Dataset(bq_dataset_id)
dataset = bq_client.create_dataset(bq_dataset, exists_ok=True)
# Add dataset = next time.
# dataset = bq_client.create_dataset(bq_dataset, exists_ok=True)
print("Created dataset {}.{}".format(bq_client.project, dataset.dataset_id))

Created dataset qwiklabs-gcp-02-f695bba24cbb.cepf_penguins_dataset


In [None]:
# Create a Vertex AI tabular dataset from BigQuery training data
#df_source=df_train
#staging_path=table name provided in lab instructions
#display_name=as provided in the lab instructions

[ TODO - Insert your code ]

In [9]:
# Create and populate the BigQuery table FIRST
TABLE_NAME = "cepf_penguins_table"
table_id = f"{PROJECT_ID}.{BQ_DATASET}.{TABLE_NAME}"

In [10]:
for col in df_train.columns:
    print(f"Column '{col}': {df_train[col].dtype}")

Column 'species': int64
Column 'island': int64
Column 'culmen_length_mm': float64
Column 'culmen_depth_mm': float64
Column 'flipper_length_mm': float64
Column 'body_mass_g': float64
Column 'sex': int64


In [11]:
# **Load df_train into BigQuery**
job_config = bigquery.LoadJobConfig(
    # Specify the schema of your df_train DataFrame if necessary
    # 			culmen_length_mm	culmen_depth_mm	flipper_length_mm	body_mass_g	sex
    schema=[
        bigquery.SchemaField("species", "int64"),
        bigquery.SchemaField("island", "int64"),
        bigquery.SchemaField("culmen_length_mm", "float64"),
        bigquery.SchemaField("culmen_depth_mm", "float64"),
        bigquery.SchemaField("flipper_length_mm", "float64"),
        bigquery.SchemaField("body_mass_g", "float64"),
        bigquery.SchemaField("sex", "int64"),
    ],
    write_disposition="WRITE_TRUNCATE",  # Overwrite the table if it exists
)

In [12]:
df_source=df_train
job = bq_client.load_table_from_dataframe(
    df_source, table_id, job_config=job_config
)  # Make an API request.
job.result()  # Wait for the job to complete.

LoadJob<project=qwiklabs-gcp-02-f695bba24cbb, location=US, id=3b722711-0341-4f26-912f-50b499966055>

In [13]:
# *** Now create the Vertex AI TabularDataset ***
staging_path=f"bq://{bq_dataset_id}.cepf_penguins_table"
display_name="cepf_penguins"

#[ TODO - Insert your code ]
dataset = aiplatform.TabularDataset.create(
    display_name=display_name,
    bq_source=f"{staging_path}",
)

Creating TabularDataset
Create TabularDataset backing LRO: projects/101459138402/locations/us-east1/datasets/5743408938850713600/operations/7912068768484818944
TabularDataset created. Resource name: projects/101459138402/locations/us-east1/datasets/5743408938850713600
To use this TabularDataset in another session:
ds = aiplatform.TabularDataset('projects/101459138402/locations/us-east1/datasets/5743408938850713600')


## Task 5. Train a model

In [14]:
# Define the command args for the training script

EPOCHS = 20
BATCH_SIZE = 10
#LABEL_COLUMN = "species"  is defined above

CMDARGS = [
    "--label_column=" + LABEL_COLUMN,
    "--epochs=" + str(EPOCHS),
    "--batch_size=" + str(BATCH_SIZE),
]

#### Training script

Complete the contents of the training script, `task.py`. You need to write code in the **[ TODO - Insert your code ]** section by training the model with epochs and batch size according and saves the trained model artifact to Cloud Storage directory `aiplatform-custom-training` in the created Cloud Storage Bucket location using `os.environ['AIP_MODEL_DIR']`.

In [15]:
%%writefile task.py

import argparse
import numpy as np
import os

import pandas as pd
import tensorflow as tf

from google.cloud import bigquery
from google.cloud import storage

# Read environmental variables
training_data_uri = os.getenv("AIP_TRAINING_DATA_URI")
validation_data_uri = os.getenv("AIP_VALIDATION_DATA_URI")
test_data_uri = os.getenv("AIP_TEST_DATA_URI")

# Added by T Kim
print("=========================================")
print(f"training_data_uri={training_data_uri}")
print(f"validation_data_uri={validation_data_uri}")
print(f"test_data_uri={test_data_uri}")

aip_model_dir = os.getenv("AIP_MODEL_DIR")
print(f"aip_model_dir={aip_model_dir}")
print("=========================================")
# Added by T Kim

# Read args
parser = argparse.ArgumentParser()
parser.add_argument('--label_column', required=True, type=str)
parser.add_argument('--epochs', default=10, type=int)
parser.add_argument('--batch_size', default=10, type=int)
args = parser.parse_args()

# Set up training variables
LABEL_COLUMN = args.label_column

# See https://cloud.google.com/vertex-ai/docs/workbench/managed/executor#explicit-project-selection for issues regarding permissions.
PROJECT_NUMBER = os.environ["CLOUD_ML_PROJECT_ID"]
bq_client = bigquery.Client(project=PROJECT_NUMBER)


# Download a table
def download_table(bq_table_uri: str):
    # Remove bq:// prefix if present
    prefix = "bq://"
    if bq_table_uri.startswith(prefix):
        bq_table_uri = bq_table_uri[len(prefix) :]
        
    # Download the BigQuery table as a dataframe
    # This requires the "BigQuery Read Session User" role on the custom training service account.
    table = bq_client.get_table(bq_table_uri)
    return bq_client.list_rows(table).to_dataframe()

# Download dataset splits
df_train = download_table(training_data_uri)
df_validation = download_table(validation_data_uri)
df_test = download_table(test_data_uri)

def convert_dataframe_to_dataset(
    df_train: pd.DataFrame,
    df_validation: pd.DataFrame,
):
    df_train_x, df_train_y = df_train, df_train.pop(LABEL_COLUMN)
    df_validation_x, df_validation_y = df_validation, df_validation.pop(LABEL_COLUMN)

    y_train = tf.convert_to_tensor(np.asarray(df_train_y).astype("float32"))
    y_validation = tf.convert_to_tensor(np.asarray(df_validation_y).astype("float32"))

    # Convert to numpy representation
    x_train = tf.convert_to_tensor(np.asarray(df_train_x).astype("float32"))
    x_test = tf.convert_to_tensor(np.asarray(df_validation_x).astype("float32"))

    # Convert to one-hot representation
    num_species = len(df_train_y.unique())
    y_train = tf.keras.utils.to_categorical(y_train, num_classes=num_species)
    y_validation = tf.keras.utils.to_categorical(y_validation, num_classes=num_species)

    dataset_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    dataset_validation = tf.data.Dataset.from_tensor_slices((x_test, y_validation))
    return (dataset_train, dataset_validation)

# Create datasets
dataset_train, dataset_validation = convert_dataframe_to_dataset(df_train, df_validation)

# Shuffle train set
dataset_train = dataset_train.shuffle(len(df_train))

def create_model(num_features):
    # Create model
    Dense = tf.keras.layers.Dense
    model = tf.keras.Sequential(
        [
            Dense(
                100,
                activation=tf.nn.relu,
                kernel_initializer="uniform",
                input_dim=num_features,
            ),
            Dense(75, activation=tf.nn.relu),
            Dense(50, activation=tf.nn.relu),            
            Dense(25, activation=tf.nn.relu),
            Dense(3, activation=tf.nn.softmax),
        ]
    )
    
    # Compile Keras model
    optimizer = tf.keras.optimizers.RMSprop(lr=0.001)
    model.compile(
        loss="categorical_crossentropy", metrics=["accuracy"], optimizer=optimizer
    )
    
    return model

# Create the model
model = create_model(num_features=dataset_train._flat_shapes[0].dims[0].value)

# Set up datasets
dataset_train = dataset_train.batch(args.batch_size)
dataset_validation = dataset_validation.batch(args.batch_size)

# Train the model
model.fit(dataset_train, epochs=args.epochs, validation_data=dataset_validation)

tf.saved_model.save(model, os.getenv("AIP_MODEL_DIR"))

Writing task.py


### Executes script in Cloud Vertex AI Training

Define your custom `TrainingPipeline` on Vertex AI.

Use the `CustomTrainingJob` class to define the `TrainingPipeline`. The class takes the following parameters:

- `display_name`: The user-defined name of this training pipeline.
- `script_path`: The local path to the training script.
- `container_uri`: The URI of the training container image.
- `requirements`: The list of Python package dependencies of the script.
- `model_serving_container_image_uri`: The URI of a container that can serve predictions for your model — either a pre-built container or a custom container.

Use the `run` function to start training.

The `run` function creates a training pipeline that trains and creates a `Model` object. After the training pipeline completes, the `run` function returns the `Model` object.

In [16]:
JOB_NAME = "cepf_custom_training_job"
MODEL_DISPLAY_NAME = "cepf_penguins_model"

In [None]:
# Use the `CustomTrainingJob` class to define the `TrainingPipeline`.
# container_uri="us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-8:latest"
# requirements=["google-cloud-bigquery[pandas]", "protobuf<3.20.0"]
# model_serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest",

[ TODO - Insert your code ]

In [25]:
job = aiplatform.CustomTrainingJob(
    display_name=JOB_NAME,
    script_path="task.py",
    container_uri="us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-8:latest",
    requirements=["google-cloud-bigquery[pandas]", "protobuf<3.20.0"],
    model_serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest",
)

    # ... other parameters ...
    #staging_bucket='gs://qwiklabs-gcp-02-f695bba24cbb-cepf'
    #staging_bucket=BUCKET_URI

```bash
The error message "AttributeError: 'str' object has no attribute 'name'" indicates that you're passing a string ("cepf_penguins") to the dataset argument in job.run, while it expects a Dataset object.

You've created a Vertex AI tabular dataset named "cepf_penguins". You need to retrieve that dataset as a Dataset object and pass that to job.run.
```

In [26]:
# Run the training job
training_fraction_split=0.8
validation_fraction_split=0.1
test_fraction_split=0.1

model = job.run(
    dataset=dataset,  #,
    model_display_name=MODEL_DISPLAY_NAME,
    bigquery_destination=f"bq://{PROJECT_ID}",
    args=CMDARGS,
    replica_count=1,  # Number of worker replicas
    machine_type="c2-standard-8",  #"n1-standard-4",
    training_fraction_split=training_fraction_split,
    validation_fraction_split=validation_fraction_split,
    test_fraction_split=test_fraction_split,
)

Training script copied to:
gs://qwiklabs-gcp-02-f695bba24cbb-cepf/aiplatform-2024-12-12-13:42:44.848-aiplatform_custom_trainer_script-0.1.tar.gz.
Training Output directory:
gs://qwiklabs-gcp-02-f695bba24cbb-cepf/aiplatform-custom-training-2024-12-12-13:42:44.941 
View Training:
https://console.cloud.google.com/ai/platform/locations/us-east1/training/6731043746672017408?project=101459138402
CustomTrainingJob projects/101459138402/locations/us-east1/trainingPipelines/6731043746672017408 current state:
PipelineState.PIPELINE_STATE_RUNNING
CustomTrainingJob projects/101459138402/locations/us-east1/trainingPipelines/6731043746672017408 current state:
PipelineState.PIPELINE_STATE_RUNNING
CustomTrainingJob projects/101459138402/locations/us-east1/trainingPipelines/6731043746672017408 current state:
PipelineState.PIPELINE_STATE_RUNNING
CustomTrainingJob projects/101459138402/locations/us-east1/trainingPipelines/6731043746672017408 current state:
PipelineState.PIPELINE_STATE_RUNNING
CustomTrain

## Task 6. Deploy the model

Before you use your model to make predictions, you must deploy it to an endpoint.

1. Create an endpoint resource named `cepf_penguins_model_endpoint`.
2. Deploy the model resource to the endpoint resource with the name `penguins_deployed`.

------------
1. Create an `Endpoint` resource for deploying the `Model` resource to.
2. Deploy the `Model` resource to the `Endpoint` resource.

https://cloud.google.com/vertex-ai/docs/general/deployment#python

In [29]:
DEPLOYED_NAME = "cepf_penguins_model_endpoint"

# Create an Endpoint resource
endpoint = aiplatform.Endpoint.create(display_name=DEPLOYED_NAME)

Creating Endpoint
Create Endpoint backing LRO: projects/101459138402/locations/us-east1/endpoints/1190203744881475584/operations/3196799958627909632
Endpoint created. Resource name: projects/101459138402/locations/us-east1/endpoints/1190203744881475584
To use this Endpoint in another session:
endpoint = aiplatform.Endpoint('projects/101459138402/locations/us-east1/endpoints/1190203744881475584')


In [33]:
DEPLOYED_NAME = "penguins_deployed"

# Deploy the model at model endpoint
    # The explanation_metadata and explanation_parameters should only be
    # provided for a custom trained model and not an AutoML model.
model.deploy(
    endpoint=endpoint,
    deployed_model_display_name=DEPLOYED_NAME,
    machine_type="c2-standard-8",
    min_replica_count=1,
    max_replica_count=1,
)

Deploying model to Endpoint : projects/101459138402/locations/us-east1/endpoints/1190203744881475584
Deploy Endpoint model backing LRO: projects/101459138402/locations/us-east1/endpoints/1190203744881475584/operations/7736428383017369600
Endpoint model deployed. Resource name: projects/101459138402/locations/us-east1/endpoints/1190203744881475584


<google.cloud.aiplatform.models.Endpoint object at 0x7f93244e1ed0> 
resource name: projects/101459138402/locations/us-east1/endpoints/1190203744881475584

In [None]:
DEPLOYED_NAME = "penguins_deployed"

# Deploy the model at model endpoint
    # The explanation_metadata and explanation_parameters should only be
    # provided for a custom trained model and not an AutoML model.
model.deploy(
    endpoint=endpoint,
    deployed_model_display_name=deployed_model_display_name,
    traffic_percentage=traffic_percentage,
    traffic_split=traffic_split,
    machine_type=machine_type,
    min_replica_count=min_replica_count,
    max_replica_count=max_replica_count,
    accelerator_type=accelerator_type,
    accelerator_count=accelerator_count,
    explanation_metadata=explanation_metadata,
    explanation_parameters=explanation_parameters,
    metadata=metadata,
    sync=sync,
)


model.deploy(
    endpoint=endpoint,
    deployed_model_display_name=DEPLOYED_NAME,
    # Add any other deployment settings you need here
)

In [28]:
# Deploy the Model resource to the Endpoint resource
path_to_your_model="projects/101459138402/locations/us-east1/models/1462407639546724352"

#model = aiplatform.Model("path/to/your/model")  # TODO
model = aiplatform.Model("path_to_your_model")  # TODO
model.deploy(
    endpoint=endpoint,
    deployed_model_display_name=DEPLOYED_NAME,
    # Add any other deployment settings you need here
)

NotFound: 404 The Model does not exist.

## Task 7. Process the test data and make an online prediction request

Send an online prediction request to your deployed model.

Prepare test data by convert it to a Python list

In [34]:
df_holdout_y = df_holdout.pop(LABEL_COLUMN)
df_holdout_x = df_holdout

# Convert to list representation
holdout_x = np.array(df_holdout_x).tolist()
holdout_y = np.array(df_holdout_y).astype("float32").tolist()

### Send the prediction request

Now that you have test data, you can use it to send a prediction request. Use the `Endpoint` object's `predict` function, which takes the following parameters:

- `instances`: A list of penguin measurement instances. According to your custom model, each instance should be an array of numbers. You prepared this list in the previous step.

The `predict` function returns a list, where each element in the list corresponds to the an instance in the request. In the output for each prediction, you see the following:

- Confidence level for the prediction (`predictions`), between 0 and 1, for each of the ten classes.

You can then run a quick evaluation on the prediction results:
1. `np.argmax`: Convert each list of confidence levels to a label
2. Print predictions

In [35]:
predictions = endpoint.predict(instances=holdout_x)
y_predicted = np.argmax(predictions.predictions, axis=1)

y_predicted

array([2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2])

In [36]:
def save_prediction_output(bucket_name, blob_name, predicted_output):
    from google.cloud import storage
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(blob_name)

    with blob.open("w") as f:
        f.write(predicted_output)

In [37]:
save_prediction_output(f"{BUCKET_NAME}", "prediction.txt", str(y_predicted))