# Lab 4: Machine Learning with Vertex AI

Author: 
* Fabian Hirschmann <<fhirschmann@google.com>>

Welcome back 👋😍. During this lab, you will train a machine learning model on the data set you already know. We will deploy it to Vertex AI and finally construct a machine learning pipeline to perform the training process automatically.

We will do it in three different maturity levels:

1. Deploying locally trained models to Vertex AI using prebuilt containers
2. Train and deploy model on Vertex AI using custom containers
3. Use Vertex AI pipeline to train and deploy the model

In this Jupyter Notebook, you can press `Shift + Return` to execute the current code junk and jump to the next one.

## Step 0: Install requirements

In [1]:
!pip install --upgrade --quiet \
    google-cloud-aiplatform==1.72.0 \
    google-cloud-bigquery \
    google-cloud-bigquery \
    google-cloud-logging \
    google-cloud-pipeline-components==2.18.0 \
    fastavro \
    avro \
    pandas

Once the command above has finished, <font color=red>please restart your kernel from the menu (Kernel -> Restart Kernel) and continue with step 1.</font>

## Step 1: Import Dependencies and Set Environment Variables

Before we begin, let's import the necessary Python libraries and set a few environment variables for our project.

In [2]:
import random
random.seed(1337)
import os
import string
import random
import logging

import pandas as pd
from google.cloud import aiplatform, bigquery
from sklearn.metrics import roc_curve, auc as auc_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, roc_auc_score

import joblib

import google.cloud.logging
google.cloud.logging.Client().setup_logging(log_level=logging.WARNING)

project = !gcloud config get-value project
PROJECT_ID = project[0]

REGION = "us-central1"
BQ_DATASET = "ml_datasets"
BQ_TABLE = "ulb_fraud_detection_dataproc"
BQ_SOURCE = f"{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}"
PIPELINE_ROOT = f"gs://{PROJECT_ID}-bucket/pipelines"
TRAIN_IMAGE_URI=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/bootkon/bootkon-train:latest"
PREDICT_IMAGE_URI=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/bootkon/bootkon-predict:latest"

## Step 2: Create dataset for ML

We initialize the AI Platform and BigQuery client to interact with Google Cloud services.

In [3]:
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=f"{PROJECT_ID}-bucket")
bq = bigquery.Client(project=PROJECT_ID, location="us")

The BigQuery table we'll be working with is as follows:

In [4]:
BQ_SOURCE

'astute-ace-336608.ml_datasets.ulb_fraud_detection_dataproc'

We execute a query to fetch the dataset from BigQuery and store it in a Pandas DataFrame. We don't need the `Feedback` column for machine learning -- so we will delete it.

In [5]:
data = bq.query(f"SELECT * FROM `{BQ_SOURCE}`").to_dataframe()
data.drop("Feedback", axis=1, inplace=True)

Let's have a look at the data set in more detail.

In [6]:
data

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,67573.0,1.247292,-1.214023,1.717765,-0.129889,-2.118590,0.358109,-1.763690,0.452637,0.986597,...,0.164138,0.776612,0.004733,0.428537,0.297247,-0.029560,0.090474,0.022572,2.0,0
1,127999.0,-1.534523,1.122946,-3.437084,-0.797825,1.015405,-1.250023,0.585146,0.872816,-0.892797,...,0.516332,1.529644,0.246227,0.313690,-0.999467,0.660153,0.258338,-0.068222,2.0,0
2,145149.0,2.156547,0.100973,-2.700733,-0.048208,1.143939,-0.896822,0.817691,-0.407689,-0.310319,...,0.267644,0.882000,-0.195982,0.389867,0.657228,0.993540,-0.157356,-0.106062,2.0,0
3,153187.0,-1.070276,0.750548,-0.432043,0.795662,1.894683,-0.913714,1.370461,-0.728018,-0.391775,...,0.084740,0.626613,-0.215756,0.581448,0.018814,-0.465960,-0.551073,0.009276,2.0,0
4,152869.0,2.136909,0.088646,-2.490914,0.098321,0.789008,-1.399582,0.854902,-0.492912,-0.254999,...,0.278034,0.934892,-0.211839,-0.234266,0.609699,1.020898,-0.154427,-0.112532,2.0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
284802,63644.0,0.971875,-0.050017,0.745618,0.992303,-0.024952,1.071646,-0.465425,0.532930,0.318431,...,-0.054716,-0.102631,0.281037,-0.703291,-0.102724,-0.512912,0.098703,0.023487,20.0,0
284803,82872.0,1.562782,-1.229601,-1.158630,-2.496174,0.974909,3.237633,-1.478926,0.745097,-1.985665,...,-0.263261,-0.527888,-0.006884,0.982099,0.546208,-0.191959,0.031388,0.015444,20.0,0
284804,143865.0,2.277261,-1.618792,-2.363003,-2.596243,1.166023,3.433015,-1.622363,0.798377,-1.403863,...,-0.166966,-0.075929,0.263730,0.687151,-0.130091,-0.140835,0.027717,-0.056861,20.0,0
284805,36081.0,1.306817,-0.541898,0.166026,-0.632248,-0.698127,-0.418271,-0.424042,-0.036891,-1.062217,...,0.250819,0.764577,-0.132648,0.281172,0.660827,-0.066862,0.003071,-0.005665,20.0,0


We separate the target variable (`Class`), which we want to predict, from the features (all other columns). The `Class` column indicates whether a transaction is fraudulent (1) or legitimate (0).

In [7]:
target = data["Class"].astype(int)
data.drop("Class", axis=1, inplace=True)

Fraud detection datasets are typically highly imbalanced, meaning the majority of transactions are legitimate. We check the distribution of our classes.

In [8]:
target.value_counts()

Class
0    284315
1       492
Name: count, dtype: int64

We split our dataset into two parts:

- Training set (80%): Used to train the machine learning model.
- Testing set (20%): Used to evaluate the performance of the trained model.

In [9]:
X_train, X_test, y_train, y_test = train_test_split(data, target, train_size = 0.80)

Let's also save it to Cloud Storage.

In [10]:
X_train.to_csv(f"gs://{PROJECT_ID}-bucket/data/vertex/X_train.csv", index=False)
y_train.to_frame().to_csv(f"gs://{PROJECT_ID}-bucket/data/vertex/y_train.csv", index=False)

## Step 3.1: Train a Random Forest classifier

We use a `RandomForestClassifier`, which is an ensemble learning method that creates multiple decision trees and aggregates their predictions. This helps improve accuracy and robustness.

In [11]:
model = RandomForestClassifier(n_estimators=50, random_state=42, n_jobs=8, verbose=1)
model.fit(X_train, y_train)

# Predictions
y_pred = model.predict(X_test)
y_pred_prob = model.predict_proba(X_test)[:, 1]

[Parallel(n_jobs=8)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done  34 tasks      | elapsed:   43.3s
[Parallel(n_jobs=8)]: Done  50 out of  50 | elapsed:   55.8s finished
[Parallel(n_jobs=8)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done  34 tasks      | elapsed:    0.1s
[Parallel(n_jobs=8)]: Done  50 out of  50 | elapsed:    0.1s finished
[Parallel(n_jobs=8)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done  34 tasks      | elapsed:    0.1s
[Parallel(n_jobs=8)]: Done  50 out of  50 | elapsed:    0.1s finished


We calculate the accuracy of the model, which measures the proportion of correctly classified instances.

For a highly imbalanced data set, the accuracy is often meaningless, because a simple classifier that always says ***not fraud*** will have an accuracy close to 1 already.

In [12]:
accuracy_score(y_test, y_pred)

0.9995611109160493

We compute the ROC AUC (Receiver Operating Characteristic - Area Under the Curve) score. This metric evaluates the model's ability to distinguish between classes. A score closer to 1 indicates better performance.

In [13]:
roc_auc_score(y_test, y_pred_prob)

0.9219180813470536

We save the trained model to a local file so we can deploy it later.

In [14]:
joblib.dump(model, "model.joblib")

['model.joblib']

We upload the trained model to Vertex AI, where it can be used for predictions.

In [15]:
!gsutil cp model.joblib gs://{PROJECT_ID}-bucket/model/

Copying file://model.joblib [Content-Type=application/octet-stream]...
/ [1 files][  1.2 MiB/  1.2 MiB]                                                
Operation completed over 1 objects/1.2 MiB.                                      


## Step 3.2: Serve locally trained model on Vertex AI

The Vertex AI Model Registry is a centralized repository in Google Cloud's Vertex AI platform where machine learning (ML) models are stored, managed, and versioned. It allows data scientists and ML engineers to track different model versions, store metadata, and deploy models seamlessly to Vertex AI endpoints for inference.

Key features of the Model Registry include:

* Model Versioning: Track multiple versions of a model.
* Metadata Management: Store details such as model parameters, training data, and performance metrics.
* Deployment & Serving: Deploy registered models to Vertex AI Endpoints, Batch Predictions, or export them for external use.
* Model Governance: Manage access control, approval workflows, and lineage tracking.
* Integration with Pipelines: Automate model registration via Vertex AI Pipelines.

We can register the model we just trained in this notebook as follows:

In [16]:
vertex_model_upload = aiplatform.Model.upload(
    display_name="bootkon-upload-model",
    serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-5:latest",
    artifact_uri=f"gs://{PROJECT_ID}-bucket/model/",
    is_default_version=True,
    version_aliases=["v1"],
)

Creating Model
Create Model backing LRO: projects/888342260584/locations/us-central1/models/3048036447706677248/operations/5520722546774769664
Model created. Resource name: projects/888342260584/locations/us-central1/models/3048036447706677248@1
To use this Model in another session:
model = aiplatform.Model('projects/888342260584/locations/us-central1/models/3048036447706677248@1')


Once the model has been uploaded, navigate to the [`Model Registry` in Vertex AI](https://console.cloud.google.com/vertex-ai/models). Click on `bootkon-model`. Can you find your newly created model artifact? Open the `VERSION DETAILS` tab and try to find your model artifact on Cloud Storage.

Let's deploy the model to an endpoint for online prediction.

In [17]:
endpoint_upload = aiplatform.Endpoint.create(display_name="bootkon-endpoint-upload")

Creating Endpoint
Create Endpoint backing LRO: projects/888342260584/locations/us-central1/endpoints/1390056475904180224/operations/6436079171037822976
Endpoint created. Resource name: projects/888342260584/locations/us-central1/endpoints/1390056475904180224
To use this Endpoint in another session:
endpoint = aiplatform.Endpoint('projects/888342260584/locations/us-central1/endpoints/1390056475904180224')


The next code chunk will take around 10min. We don't want to wait for that, so we set `sync=False` and look at the result later.

In [18]:
vertex_model_upload.deploy(
    deployed_model_display_name="bootkon-model-upload",
    endpoint=endpoint_upload,
    machine_type="n2-standard-2",
    sync=False
)

Deploying model to Endpoint : projects/888342260584/locations/us-central1/endpoints/1390056475904180224


<google.cloud.aiplatform.models.Endpoint object at 0x7f11784d58a0> 
resource name: projects/888342260584/locations/us-central1/endpoints/1390056475904180224

Deploy Endpoint model backing LRO: projects/888342260584/locations/us-central1/endpoints/1390056475904180224/operations/7148773812069203968
Deploying model to Endpoint : projects/888342260584/locations/us-central1/endpoints/3718136008278016000
Deploy Endpoint model backing LRO: projects/888342260584/locations/us-central1/endpoints/3718136008278016000/operations/1055403516236922880
Endpoint model deployed. Resource name: projects/888342260584/locations/us-central1/endpoints/1390056475904180224


The next chunk lists the currently deployed models. While the model is deploying, it wont's show up.

In [19]:
endpoint_upload.list_models()

[]

## Step 4: Train and serve model using custom containers

In this section, we will train a `RandomForestClassifier` using **custom containers** on Vertex AI and deploy it for real-time predictions. Instead of using pre-built containers, we will package our training and prediction logic into Docker containers, allowing for **full control over dependencies, runtime environments, and scalability**. 

The process consists of two main steps:
1. **Model Training:** We will preprocess the dataset, train a model and save it as a serialized `joblib` file. The trained model will be uploaded to Cloud Storage for deployment.
2. **Model Serving:** Using a separate container, the stored model will be loaded from Cloud Storage, and an API will be exposed via Flask (or **FastAPI** in production) to handle inference requests.

By leveraging Vertex AI’s custom training and prediction services, we can achieve a **scalable, managed ML workflow** while keeping complete flexibility over the training and deployment pipeline.

We will create the following files:

- `train/Dockerfile`: Dockerfile for the training container
- `train/train.py`: Training script
- `predict/Dockerfile`: Dockerfile for the prediction container
- `predict/predict.py`: Prediction script

First, we configure docker.

In [20]:
!gcloud auth configure-docker $REGION-docker.pkg.dev --quiet

I0000 00:00:1739520189.216623 3264411 fork_posix.cc:75] Other threads are currently calling into gRPC, skipping fork() handlers



{
  "credHelpers": {
    "gcr.io": "gcloud",
    "us.gcr.io": "gcloud",
    "eu.gcr.io": "gcloud",
    "asia.gcr.io": "gcloud",
    "staging-k8s.gcr.io": "gcloud",
    "marketplace.gcr.io": "gcloud",
    "us-central1-docker.pkg.dev": "gcloud"
  }
}
Adding credentials for: us-central1-docker.pkg.dev
gcloud credential helpers already registered correctly.


In [21]:
mkdir -p train predict

In [22]:
%%writefile train/Dockerfile
FROM python:3.10-slim

WORKDIR /app
COPY train.py /app/train.py

RUN pip install --no-cache-dir --quiet pandas scikit-learn==1.5.2 google-cloud-storage fsspec gcsfs

ENTRYPOINT ["python", "/app/train.py"]

Overwriting train/Dockerfile


In [23]:
%%writefile predict/Dockerfile
FROM python:3.10-slim

WORKDIR /app
COPY predict.py /app/predict.py

RUN pip install --no-cache-dir --quiet pandas scikit-learn==1.5.2 google-cloud-storage google-cloud-aiplatform fsspec gcsfs flask
EXPOSE 8080
ENTRYPOINT ["python", "/app/predict.py"]

Overwriting predict/Dockerfile


The `train.py` script trains a `RandomForestClassifier` using scikit-learn, saves it as a `joblib` file, and uploads it to Cloud Storage. It reads the training data (`X_train` and `y_train`) from CSV files provided as command-line arguments and retrieves the target storage directory from the `AIP_MODEL_DIR` environment variable. The trained model is stored in GCS for later deployment on Vertex AI.


In [24]:
%%writefile train/train.py
import os
import sys

import joblib
import pandas as pd

from sklearn.ensemble import RandomForestClassifier
from google.cloud import storage

AIP_MODEL_DIR = os.environ["AIP_MODEL_DIR"]

X_train = pd.read_csv(sys.argv[1])
y_train = pd.read_csv(sys.argv[2])

model = RandomForestClassifier(n_estimators=50, random_state=42, n_jobs=8, verbose=1)
model.fit(X_train, y_train)

joblib.dump(model, "model.joblib")
storage_client = storage.Client()
bucket = storage_client.bucket(AIP_MODEL_DIR.split("/")[2])
blob = bucket.blob("/".join(AIP_MODEL_DIR.split("/")[3:]) + "/model.joblib")
blob.upload_from_filename("model.joblib")
print(f"Wrote model to {AIP_MODEL_DIR}/model.joblib")

Overwriting train/train.py


The `predict.py` script is a flask-based prediction server designed for deployment on Vertex AI using custom containers. It retrieves the model artifacts from Cloud Storage using `prediction_utils.download_model_artifacts()`, loads the model with `joblib`, and exposes two API endpoints:

- **`/predict`** for inference  
- **`/health`** for monitoring the service status  

The script reads environment variables such as `AIP_STORAGE_URI` for downloading the model and `AIP_PREDICT_ROUTE` for defining the prediction route dynamically. 

⚠ **In production,** it is recommended to use **FastAPI** instead of Flask due to its superior performance, asynchronous capabilities, and built-in request validation.


In [25]:
%%writefile predict/predict.py
import os
import joblib

import flask
import numpy as np
from google.cloud.aiplatform.utils import prediction_utils

AIP_STORAGE_URI = os.environ["AIP_STORAGE_URI"]
print(f"Downloading model from {AIP_STORAGE_URI}/model.joblib")
prediction_utils.download_model_artifacts(AIP_STORAGE_URI)
model = joblib.load("model.joblib")

app = flask.Flask(__name__)

@app.route(os.environ.get("AIP_PREDICT_ROUTE", "/predict"), methods=["POST"])
def predict():
    data = flask.request.get_json()
    inputs = np.array(data["instances"])
    predictions = model.predict(inputs).tolist()
    return flask.jsonify({"predictions": predictions})

@app.route(os.environ.get("AIP_HEALTH_ROUTE", "/health"), methods=["GET"])
def health_check():
    print("Received health check")
    return flask.jsonify({"status": "healthy"}), 200

    
if __name__ == "__main__":
    app.run(host="0.0.0.0", port=int(os.environ.get("AIP_HTTP_PORT", 8080)))

Overwriting predict/predict.py


We will store the container images in a docker repository named `bootkon`. Let's create it first.

In [26]:
!gcloud artifacts repositories create bootkon --repository-format=docker --location={REGION}

Create request issued for: [bootkon]
Waiting for operation [projects/astute-ace-336608/locations/us-central1/operati
ons/a240e7fd-936b-403d-9082-8d4f21e4b8cd] to complete...done.                  
Created repository [bootkon].


We can use Cloud Build to build to image and automatically push it to the container repository we just created.

In [27]:
!cd train && gcloud builds submit --region={REGION} --tag={TRAIN_IMAGE_URI} --timeout=1h --quiet

Creating temporary archive of 2 file(s) totalling 881 bytes before compression.
Uploading tarball of [.] to [gs://astute-ace-336608_cloudbuild/source/1739520193.925865-5561e710d4574e238008430771be4824.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/astute-ace-336608/locations/us-central1/builds/4a4cfab3-abc7-475a-94d1-6ee2b331639d].
Logs are available at [ https://console.cloud.google.com/cloud-build/builds;region=us-central1/4a4cfab3-abc7-475a-94d1-6ee2b331639d?project=888342260584 ].
Waiting for build to complete. Polling interval: 1 second(s).
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "4a4cfab3-abc7-475a-94d1-6ee2b331639d"

FETCHSOURCE
Fetching storage object: gs://astute-ace-336608_cloudbuild/source/1739520193.925865-5561e710d4574e238008430771be4824.tgz#1739520194311893
Copying gs://astute-ace-336608_cloudbuild/source/1739520193.925865-5561e710d4574e238008430771be4824.tgz#1739520194311893...
/ [1 files][  710.0 B/  7

Let's do the same thing for the prediction image.

In [28]:
!cd predict && gcloud builds submit --region={REGION} --tag={PREDICT_IMAGE_URI} --timeout=1h --quiet

Creating temporary archive of 3 file(s) totalling 1.2 KiB before compression.
Uploading tarball of [.] to [gs://astute-ace-336608_cloudbuild/source/1739520277.405046-dfbbb67ff0df42cd8d6960c99c151e4b.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/astute-ace-336608/locations/us-central1/builds/b250065d-2ef6-405c-9e2d-9fcaf541c0ba].
Logs are available at [ https://console.cloud.google.com/cloud-build/builds;region=us-central1/b250065d-2ef6-405c-9e2d-9fcaf541c0ba?project=888342260584 ].
Waiting for build to complete. Polling interval: 1 second(s).
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "b250065d-2ef6-405c-9e2d-9fcaf541c0ba"

FETCHSOURCE
Fetching storage object: gs://astute-ace-336608_cloudbuild/source/1739520277.405046-dfbbb67ff0df42cd8d6960c99c151e4b.tgz#1739520277719673
Copying gs://astute-ace-336608_cloudbuild/source/1739520277.405046-dfbbb67ff0df42cd8d6960c99c151e4b.tgz#1739520277719673...
/ [1 files][  961.0 B/  961

Now that the container is ready, we can run it as `CustomContainerTrainingJob` -- giving the training data set as arguments. This will take around 5-10min.

In [29]:
job = aiplatform.CustomContainerTrainingJob(
    display_name = "bootkon-custom",
    container_uri = TRAIN_IMAGE_URI,
    model_serving_container_image_uri = PREDICT_IMAGE_URI
)

In [30]:
vertex_model_custom = job.run(
    args=[
        f"gs://{PROJECT_ID}-bucket/data/vertex/X_train.csv",
        f"gs://{PROJECT_ID}-bucket/data/vertex/y_train.csv",
    ]
)

Training Output directory:
gs://astute-ace-336608-bucket/aiplatform-custom-training-2025-02-14-08:06:16.843 
View Training:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/2976178930425266176?project=888342260584
CustomContainerTrainingJob projects/888342260584/locations/us-central1/trainingPipelines/2976178930425266176 current state:
PipelineState.PIPELINE_STATE_RUNNING
View backing custom job:
https://console.cloud.google.com/ai/platform/locations/us-central1/training/3548195456729219072?project=888342260584
CustomContainerTrainingJob projects/888342260584/locations/us-central1/trainingPipelines/2976178930425266176 current state:
PipelineState.PIPELINE_STATE_RUNNING
CustomContainerTrainingJob projects/888342260584/locations/us-central1/trainingPipelines/2976178930425266176 current state:
PipelineState.PIPELINE_STATE_RUNNING
CustomContainerTrainingJob projects/888342260584/locations/us-central1/trainingPipelines/2976178930425266176 current state:
PipelineSt

In [31]:
endpoint_custom = aiplatform.Endpoint.create(display_name="bootkon-endpoint-custom")

Creating Endpoint
Create Endpoint backing LRO: projects/888342260584/locations/us-central1/endpoints/3718136008278016000/operations/5526633521285693440
Endpoint created. Resource name: projects/888342260584/locations/us-central1/endpoints/3718136008278016000
To use this Endpoint in another session:
endpoint = aiplatform.Endpoint('projects/888342260584/locations/us-central1/endpoints/3718136008278016000')


We also deploy this model and don't wait for it to finish (`sync=False`) -- instead we come back later.

In [32]:
vertex_model_custom.deploy(
    deployed_model_display_name="bootkon-model-custom",
    endpoint=endpoint_custom,
    machine_type="n2-standard-2",
    sync=False
)

<google.cloud.aiplatform.models.Endpoint object at 0x7f1178499a50> 
resource name: projects/888342260584/locations/us-central1/endpoints/3718136008278016000

## Step 5: Train and deploy models using Vertex Pipelines

In this section, we will train a `RandomForestClassifier` using Vertex AI Pipelines and Kubeflow Pipelines (KFP) and deploy it for real-time predictions. Unlike the custom container approach, we will define a pipeline-based workflow for automated training, model storage, and deployment. This approach allows us to achieve repeatability, scalability, and automation for end-to-end ML workflows on Google Cloud.

By leveraging Vertex AI Pipelines, we can create a fully managed, automated ML pipeline that integrates seamlessly with GCP services.


In [33]:
import kfp
from kfp import dsl, compiler

from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from kfp.dsl import importer_node

Next, we create a Kubeflow pipeline that automates the training, model upload, and deployment process in Vertex AI.

**Pipeline Steps**

1. Define a Unique Model Directory:
* The pipeline assigns a unique Cloud Storage path for storing the trained model using `PIPELINE_JOB_ID_PLACEHOLDER`, ensuring each run has an isolated model directory.

2. Run a Custom Training Job
* Uses `CustomTrainingJobOp` to launch a training job on Vertex AI.
* The training script is executed inside a custom container (`TRAIN_IMAGE_URI`).
* The trained model is stored in the dynamically created directory (`AIP_MODEL_DIR`).

3. Import the Trained Model as an Artifact
* The `importer_node.importer` step converts the saved model directory into an `UnmanagedContainerModel`, allowing it to be used by Vertex AI.

4. Upload the Model to Vertex AI
* The `ModelUploadOp` registers the trained model in Vertex AI, making it available for deployment.

5. Create an Endpoint for Deployment
* `EndpointCreateOp` initializes a new prediction endpoint in Vertex AI.

6. Deploy the Model to the Endpoint
* `ModelDeployOp` deploys the registered model to the created endpoint with a dedicated `n1-standard-4` machine.

**Key Features**
- **Dynamically generated model path** ensures each pipeline run has an isolated model storage.
- **Custom container training** allows full control over the training process.
- **Automated model registration and deployment** simplifies the end-to-end MLOps workflow.


In [34]:
@kfp.dsl.pipeline(name="bootkon-pipeline")
def pipeline(
    X_train: str,
    y_train: str,
    project: str = PROJECT_ID
):
    model_dir = f"{PIPELINE_ROOT}/model-{kfp.dsl.PIPELINE_JOB_ID_PLACEHOLDER}"
    custom_job_task = CustomTrainingJobOp(
        project=project,
        display_name="bootkon-model-pipeline",
        worker_pool_specs=[
            {
                "containerSpec": {
                    "args": [X_train, y_train],
                    "env": [{"name": "AIP_MODEL_DIR", "value": model_dir}],
                    "imageUri": TRAIN_IMAGE_URI,
                },
                "replicaCount": "1",
                "machineSpec": {
                    "machineType": "n1-standard-4",
                },
            }
        ],
    )

    import_unmanaged_model_task = importer_node.importer(
        artifact_uri=model_dir,
        artifact_class=artifact_types.UnmanagedContainerModel,
        metadata={
            "containerSpec": {
                "imageUri": PREDICT_IMAGE_URI,
            },
        },
    ).after(custom_job_task)

    model_upload_op = ModelUploadOp(
        project=project,
        display_name="bootkon-pipeline-model",
        unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
    )
    model_upload_op.after(import_unmanaged_model_task)

    endpoint_create_op = EndpointCreateOp(
        project=project,
        display_name="bootkon-endpoint-pipeline",
    )

    ModelDeployOp(
        endpoint=endpoint_create_op.outputs["endpoint"],
        model=model_upload_op.outputs["model"],
        deployed_model_display_name="bootkon-pipeline-model",
        dedicated_resources_machine_type="n1-standard-4"
    )

The following command compiles the `bootkon-pipeline` into a JSON file that can be submitted to Vertex AI.

In [35]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="bootkon_pipeline.json",
)

And we submit it. Feel free to investigate the pipeline using the link that is printed out.

In [36]:
job = aiplatform.PipelineJob(
    display_name="bootkon-pipeline",
    template_path="bootkon_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    enable_caching=False,
    project=PROJECT_ID,
    parameter_values={
        "project": PROJECT_ID,
        "X_train": f"gs://{PROJECT_ID}-bucket/data/vertex/X_train.csv",
        "y_train": f"gs://{PROJECT_ID}-bucket/data/vertex/y_train.csv"
    },
)

job.run(sync=False)

Creating PipelineJob
PipelineJob created. Resource name: projects/888342260584/locations/us-central1/pipelineJobs/bootkon-pipeline-20250214081430
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/888342260584/locations/us-central1/pipelineJobs/bootkon-pipeline-20250214081430')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/bootkon-pipeline-20250214081430?project=888342260584
PipelineJob projects/888342260584/locations/us-central1/pipelineJobs/bootkon-pipeline-20250214081430 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/888342260584/locations/us-central1/pipelineJobs/bootkon-pipeline-20250214081430 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/888342260584/locations/us-central1/pipelineJobs/bootkon-pipeline-20250214081430 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/888342260584/locations/us-central1/pipelineJ

## Step 6: Make predictions

We now should have several endpoints deployed. Let's check the endpoint from Step 3.2 (the ***upload*** model):

In [37]:
endpoint_upload.list_models()

[]

Let's make a prediction:

In [38]:
response = endpoint_upload.predict(instances=X_test.head(4000).values.tolist())

Most of them are ***not fraud*** .

In [39]:
response.predictions[:10]

[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]

But there are also a few fraud cases.

In [40]:
sum(response.predictions)

4.0

We can do the same thing with the other endpoint we created through custom containers:

In [43]:
response = endpoint_custom.predict(instances=X_test.head(4000).values.tolist())

In [44]:
sum(response.predictions)

4.0

## Investigate results in the Cloud Console

<font color="red"><b>Great job deploying all these models. Now, please go back to the lab in Cloud Shell and continue from there!</b></font>

<img src="../docs/img/lab4/cloud_shell_4.png" width=300/>