<a href="https://colab.research.google.com/github/deltorobarba/machinelearning/blob/main/training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

*Create a custom-trained model from a Python script in a Docker container using the Vertex AI SDK for Python, and then get a prediction from the deployed model by sending data.*

The dataset used for this tutorial is the penguins dataset from [BigQuery public datasets](https://cloud.google.com/bigquery/public-data). For this tutorial, you use only the fields `culmen_length_mm`, `culmen_depth_mm`, `flipper_length_mm`, `body_mass_g` from the dataset to predict the penguins species (`species`).

In [None]:
# Install the packages
! pip3 install --upgrade google-cloud-aiplatform \
                        google-cloud-storage \
                        'google-cloud-bigquery[pandas]'

#automatically restarts kernel
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

In [None]:
PROJECT=!(gcloud config get-value project)
PROJECT_ID="qwiklabs-gcp-00-34bdec36e87f"

# Set the project id
! gcloud config set project {PROJECT_ID}

Updated property [core/project].


In [None]:
REGION = "us-west1"

In [None]:
BUCKET_NAME = "qwiklabs-gcp-00-34bdec36e87f-cymbal" # update it from the lab instructions
BUCKET_URI = f"gs://{BUCKET_NAME}"

In [None]:
from google.cloud import aiplatform

# Initialize the Vertex AI SDK
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

In [None]:
from google.cloud import bigquery

# Set up BigQuery client
bq_client = bigquery.Client(project=PROJECT_ID)

In [None]:
# Create a Vertex AI Tabular Dataset from the BigQuery dataset
# Preprocess data and split data: Convert categorical features to numeric
# Split train and test data in the fration 80-20 ratio

import numpy as np
import pandas as pd

LABEL_COLUMN = "species"

# Define the BigQuery source dataset
BQ_SOURCE = "bigquery-public-data.ml_datasets.penguins"

# Define NA values
NA_VALUES = ["NA", "."]

# Download a table
table = bq_client.get_table(BQ_SOURCE)
df = bq_client.list_rows(table).to_dataframe()

# Drop unusable rows
df = df.replace(to_replace=NA_VALUES, value=np.NaN).dropna()

# Convert categorical columns to numeric
df["island"], _ = pd.factorize(df["island"])
df["species"], _ = pd.factorize(df["species"])
df["sex"], _ = pd.factorize(df["sex"])

# Split into a training and holdout dataset
df_train = df.sample(frac=0.8, random_state=100)
df_holdout = df[~df.index.isin(df_train.index)]



In [None]:
# Create BigQuery dataset
BQ_DATASET="cymbal_penguins_dataset"
bq_dataset_id = f"{PROJECT_ID}.{BQ_DATASET}"
bq_dataset = bigquery.Dataset(bq_dataset_id)
bq_client.create_dataset(bq_dataset, exists_ok=True)

# Create a Vertex AI tabular dataset from BigQuery training data
#df_source=df_train
#staging_path=table name provided in lab instructions
#display_name=as provided in the lab instructions

#[ TODO - Insert your code ]

Dataset(DatasetReference('qwiklabs-gcp-00-34bdec36e87f', 'cymbal_penguins_dataset'))

In [None]:
df_train

Unnamed: 0,species,island,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g,sex
183,2,1,50.2,14.3,218.0,5700.0,1
180,2,1,48.2,14.3,210.0,4600.0,0
340,0,2,40.9,16.8,191.0,3700.0,0
47,1,0,46.2,17.5,187.0,3650.0,0
295,0,2,37.2,19.4,184.0,3900.0,1
...,...,...,...,...,...,...,...
96,0,0,41.1,19.0,182.0,3425.0,1
175,2,1,45.3,13.7,210.0,4300.0,0
337,0,2,41.8,19.4,198.0,4450.0,1
95,0,0,36.5,18.0,182.0,3150.0,0


In [None]:
# Create a Vertex AI tabular dataset
dataset = aiplatform.TabularDataset.create_from_dataframe(
    df_source=df_train,
    staging_path=f"bq://qwiklabs-gcp-00-34bdec36e87f.cymbal_penguins_dataset.cymbal_penguins_table",
    display_name="cymbal_penguins",
)

Your DataFrame has 266 rows and AutoML requires 1000 rows to train on tabular data. You can still train a custom model once your dataset has been uploaded to Vertex, but you will not be able to use AutoML for training.
Creating TabularDataset
Create TabularDataset backing LRO: projects/518128629599/locations/us-west1/datasets/2324358785025441792/operations/1641925452769525760
TabularDataset created. Resource name: projects/518128629599/locations/us-west1/datasets/2324358785025441792
To use this TabularDataset in another session:
ds = aiplatform.TabularDataset('projects/518128629599/locations/us-west1/datasets/2324358785025441792')


In [None]:
from google.cloud import aiplatform

In [None]:
aiplatform.init(project="qwiklabs-gcp-00-34bdec36e87f", location="us-west1")

In [None]:
# bq_source = "bq://qwiklabs-gcp-00-34bdec36e87f.cymbal_penguins.cymbal_penguins_table"

In [None]:
# Train Model. Define the command args for the training script

EPOCHS = 20
BATCH_SIZE = 10

CMDARGS = [
    "--label_column=" + LABEL_COLUMN,
    "--epochs=" + str(EPOCHS),
    "--batch_size=" + str(BATCH_SIZE),
]

Training script: Complete the contents of the training script, `task.py`. You need to write code in the **[ TODO - Insert your code ]** section by training the model with epochs and batch size according and saves the trained model artifact to Cloud Storage directory `aiplatform-custom-training` in the created Cloud Storage Bucket location using `os.environ['AIP_MODEL_DIR']`.

In [None]:
%%writefile task.py

import argparse
import numpy as np
import os

import pandas as pd
import tensorflow as tf

from google.cloud import bigquery
from google.cloud import storage

# Read environmental variables
training_data_uri = os.getenv("AIP_TRAINING_DATA_URI")
validation_data_uri = os.getenv("AIP_VALIDATION_DATA_URI")
test_data_uri = os.getenv("AIP_TEST_DATA_URI")

# Read args
parser = argparse.ArgumentParser()
parser.add_argument('--label_column', required=True, type=str)
parser.add_argument('--epochs', default=10, type=int)
parser.add_argument('--batch_size', default=10, type=int)
args = parser.parse_args()

# Set up training variables
LABEL_COLUMN = args.label_column

# See https://cloud.google.com/vertex-ai/docs/workbench/managed/executor#explicit-project-selection for issues regarding permissions.
PROJECT_NUMBER = os.environ["CLOUD_ML_PROJECT_ID"]
bq_client = bigquery.Client(project=PROJECT_NUMBER)


# Download a table
def download_table(bq_table_uri: str):
    # Remove bq:// prefix if present
    prefix = "bq://"
    if bq_table_uri.startswith(prefix):
        bq_table_uri = bq_table_uri[len(prefix) :]

    # Download the BigQuery table as a dataframe
    # This requires the "BigQuery Read Session User" role on the custom training service account.
    table = bq_client.get_table(bq_table_uri)
    return bq_client.list_rows(table).to_dataframe()

# Download dataset splits
df_train = download_table(training_data_uri)
df_validation = download_table(validation_data_uri)
df_test = download_table(test_data_uri)

def convert_dataframe_to_dataset(
    df_train: pd.DataFrame,
    df_validation: pd.DataFrame,
):
    df_train_x, df_train_y = df_train, df_train.pop(LABEL_COLUMN)
    df_validation_x, df_validation_y = df_validation, df_validation.pop(LABEL_COLUMN)

    y_train = tf.convert_to_tensor(np.asarray(df_train_y).astype("float32"))
    y_validation = tf.convert_to_tensor(np.asarray(df_validation_y).astype("float32"))

    # Convert to numpy representation
    x_train = tf.convert_to_tensor(np.asarray(df_train_x).astype("float32"))
    x_test = tf.convert_to_tensor(np.asarray(df_validation_x).astype("float32"))

    # Convert to one-hot representation
    num_species = len(df_train_y.unique())
    y_train = tf.keras.utils.to_categorical(y_train, num_classes=num_species)
    y_validation = tf.keras.utils.to_categorical(y_validation, num_classes=num_species)

    dataset_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    dataset_validation = tf.data.Dataset.from_tensor_slices((x_test, y_validation))
    return (dataset_train, dataset_validation)

# Create datasets
dataset_train, dataset_validation = convert_dataframe_to_dataset(df_train, df_validation)

# Shuffle train set
dataset_train = dataset_train.shuffle(len(df_train))

def create_model(num_features):
    # Create model
    Dense = tf.keras.layers.Dense
    model = tf.keras.Sequential(
        [
            Dense(
                100,
                activation=tf.nn.relu,
                kernel_initializer="uniform",
                input_dim=num_features,
            ),
            Dense(75, activation=tf.nn.relu),
            Dense(50, activation=tf.nn.relu),
            Dense(25, activation=tf.nn.relu),
            Dense(3, activation=tf.nn.softmax),
        ]
    )

    # Compile Keras model
    optimizer = tf.keras.optimizers.RMSprop(lr=0.001)
    model.compile(
        loss="categorical_crossentropy", metrics=["accuracy"], optimizer=optimizer
    )

    return model

# Create the model
model = create_model(num_features=dataset_train._flat_shapes[0].dims[0].value)

# Set up datasets
dataset_train = dataset_train.batch(args.batch_size)
dataset_validation = dataset_validation.batch(args.batch_size)

# Train the model
model.fit(dataset_train, epochs=args.epochs, validation_data=dataset_validation)

tf.saved_model.save(model, os.getenv("AIP_MODEL_DIR"))

Overwriting task.py


Executes script in Cloud Vertex AI Training Define your custom `TrainingPipeline` on Vertex AI.

Use the `CustomTrainingJob` class to define the `TrainingPipeline`. The class takes the following parameters:

- `display_name`: The user-defined name of this training pipeline.
- `script_path`: The local path to the training script.
- `container_uri`: The URI of the training container image.
- `requirements`: The list of Python package dependencies of the script.
- `model_serving_container_image_uri`: The URI of a container that can serve predictions for your model — either a pre-built container or a custom container.

Use the `run` function to start training.

The `run` function creates a training pipeline that trains and creates a `Model` object. After the training pipeline completes, the `run` function returns the `Model` object.

In [None]:
JOB_NAME = "cymbal_custom_training_job"
MODEL_DISPLAY_NAME = "cymbal_penguins_model"


# Use the `CustomTrainingJob` class to define the `TrainingPipeline`.
# container_uri="us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-8:latest"
# requirements=["google-cloud-bigquery[pandas]", "protobuf<3.20.0"]
# model_serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest",

# Define the training pipeline
job = aiplatform.CustomTrainingJob(
    display_name=JOB_NAME,
    script_path="task.py",
    container_uri="us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-8:latest",
    requirements=["google-cloud-bigquery[pandas]", "protobuf<3.20.0"],
    model_serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest",
)

# Use the `run` function to start training

model = job.run(
    dataset=dataset,
    model_display_name=MODEL_DISPLAY_NAME,
    bigquery_destination=f"bq://{PROJECT_ID}",
    args=CMDARGS,
)

# Run the training job

Training script copied to:
gs://qwiklabs-gcp-00-34bdec36e87f-cymbal/aiplatform-2024-11-01-15:50:23.716-aiplatform_custom_trainer_script-0.1.tar.gz.
Training Output directory:
gs://qwiklabs-gcp-00-34bdec36e87f-cymbal/aiplatform-custom-training-2024-11-01-15:50:23.798 
No dataset split provided. The service will use a default split.
View Training:
https://console.cloud.google.com/ai/platform/locations/us-west1/training/5393797776322592768?project=518128629599
CustomTrainingJob projects/518128629599/locations/us-west1/trainingPipelines/5393797776322592768 current state:
PipelineState.PIPELINE_STATE_PENDING
CustomTrainingJob projects/518128629599/locations/us-west1/trainingPipelines/5393797776322592768 current state:
PipelineState.PIPELINE_STATE_PENDING
CustomTrainingJob projects/518128629599/locations/us-west1/trainingPipelines/5393797776322592768 current state:
PipelineState.PIPELINE_STATE_PENDING
CustomTrainingJob projects/518128629599/locations/us-west1/trainingPipelines/53937977763225

In [None]:
# Deploy the model
# Create an Endpoint resource for deploying the Model resource to.
# Deploy the Model resource to the Endpoint resource.

DEPLOYED_NAME = "penguins_deployed"

# Deploy the model at model endpoint
[ TODO - Insert your code ]

In [None]:
# Process the test data and make an online prediction request
# (Send an online prediction request to your deployed model)

# Prepare test data by convert it to a Python list
df_holdout_y = df_holdout.pop(LABEL_COLUMN)
df_holdout_x = df_holdout

# Convert to list representation
holdout_x = np.array(df_holdout_x).tolist()
holdout_y = np.array(df_holdout_y).astype("float32").tolist()

Send the prediction request. Now that you have test data, you can use it to send a prediction request. Use the `Endpoint` object's `predict` function, which takes the following parameters:

- `instances`: A list of penguin measurement instances. According to your custom model, each instance should be an array of numbers. You prepared this list in the previous step.

The `predict` function returns a list, where each element in the list corresponds to the an instance in the request. In the output for each prediction, you see the following:

- Confidence level for the prediction (`predictions`), between 0 and 1, for each of the ten classes.

You can then run a quick evaluation on the prediction results:
1. `np.argmax`: Convert each list of confidence levels to a label
2. Print predictions

In [None]:
predictions = endpoint.predict(instances=holdout_x)
y_predicted = np.argmax(predictions.predictions, axis=1)

y_predicted

In [None]:
def save_prediction_output(bucket_name, blob_name, predicted_output):
    from google.cloud import storage

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(blob_name)

    with blob.open("w") as f:
        f.write(predicted_output)

In [None]:
save_prediction_output(f"{BUCKET_NAME}", "prediction.txt", str(y_predicted))