In [None]:
# Copyright 2024 Forusone(shins777@gmail.com)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Custom training with pipeline - Tabular data regression

This notebook is simplified version of the below notebook in the official Google github. You can find more divese codes and detailed information from the link.

* [Vertex AI Pipelines](https://colab.sandbox.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/google_cloud_pipeline_components_model_train_upload_deploy.ipynb)

* [github - pipelines examples](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/)

### Dataset

The dataset used for this tutorial is [Cloud Public Dataset Program](https://cloud.google.com/bigquery/public-data/) [London Bikes Rental](https://console.cloud.google.com/bigquery?p=bigquery-public-data&d=london_bicycles&page=dataset&_ga=2.122237643.-1779725180.1624895157) combined with [NOAA weather data ](https://console.cloud.google.com/bigquery?p=bigquery-public-data&d=noaa_gsod&page=dataset&_ga=2.179861860.-1779725180.1624895157) The dataset predicts the duration of the bike rental.


### Install Vertex AI SDK


In [2]:
%pip install --upgrade --quiet --user google-cloud-aiplatform \
                                      google-cloud-pipeline-components \
                                      google-cloud-storage \
                                      kfp

  Preparing metadata (setup.py) ... [?25l[?25hdone


In [1]:
# @title Check package version
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 2.10.1
google_cloud_pipeline_components version: 2.18.0


In [3]:
# @title Authentication to access to GCP
# To use markdown for output data from LLM
from IPython.display import display, Markdown

# Use OAuth to access the GCP environment.
import sys
if "google.colab" in sys.modules:
    from google.colab import auth
    auth.authenticate_user()

In [4]:
# @title Define constants
PROJECT_ID = "ai-hangsik"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}

In [None]:
# import sys
# from IPython.display import Markdown, display

# PROJECT_ID="ai-hangsik"
# LOCATION="us-central1"

# # For only colab user, no need this process for Colab Enterprise in Vertex AI.
# if "google.colab" in sys.modules:
#     from google.colab import auth
#     auth.authenticate_user(project_id=PROJECT_ID)

# # set project.
# !gcloud config set project {PROJECT_ID}

Updated property [core/project].


In [12]:
# @title Import libraries
from typing import NamedTuple
from typing import Any, Dict, List

import google.cloud.aiplatform as aiplatform

import kfp
from google.cloud import bigquery
from kfp import compiler, dsl
from kfp.dsl import ( Artifact,
                      ClassificationMetrics,
                      Input,
                      Metrics,
                      Output,
                      component)


## Data preparation

In [8]:
# @title Create a bucket.
BUCKET_URI = f"gs://mlops-{PROJECT_ID}-0103"
! gsutil mb -l {LOCATION} -p {PROJECT_ID} {BUCKET_URI}

Creating gs://mlops-ai-hangsik-0103/...


In [9]:
# @title Vertex AI Pipelines constants

import datetime

now = datetime.datetime.now()
now_format = now.strftime('%Y%m%d-%H%M%S')

PIPELINE_ROOT = f"{BUCKET_URI}/pipeline/costom/custom_tabular_regression_pipeline"
WORKING_DIR = f"{PIPELINE_ROOT}/{now_format}"
MODEL_DISPLAY_NAME = f"custom_tabular_regression_pipeline_{now_format}"

hp_dict: str = '{"num_hidden_layers": 3, "hidden_size": 32, "learning_rate": 0.01, "epochs": 1, "steps_per_epoch": -1}'
data_dir: str = (
    "gs://cloud-samples-data/vertex-ai/pipeline-deployment/datasets/bikes_weather/"
)

TRAINER_ARGS = ["--data-dir", data_dir, "--hptune-dict", hp_dict]

print(TRAINER_ARGS, WORKING_DIR, MODEL_DISPLAY_NAME)


['--data-dir', 'gs://cloud-samples-data/vertex-ai/pipeline-deployment/datasets/bikes_weather/', '--hptune-dict', '{"num_hidden_layers": 3, "hidden_size": 32, "learning_rate": 0.01, "epochs": 1, "steps_per_epoch": -1}'] gs://mlops-ai-hangsik-0103/pipeline/costom/custom_tabular_regression_pipeline/20250102-130234 custom_tabular_regression_pipeline_20250102-130234


In [13]:
# @title Initialize Vertex AI SDK for Python
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

## Define custom model pipeline that uses components from `google_cloud_pipeline_components`

Next, you define the pipeline.

The `experimental.run_as_aiplatform_custom_job` method takes as arguments the previously defined component, and the list of `worker_pool_specs`— in this case one— with which the custom training job is configured.

Then, [`google_cloud_pipeline_components`](https://github.com/kubeflow/pipelines/tree/master/components/google-cloud) components are used to define the rest of the pipeline: upload the model, create an endpoint, and deploy the model to the endpoint.

*Note:* While not shown in this example, the model deploy will create an endpoint if one is not provided.

In [14]:

@kfp.dsl.pipeline(name="custom_tabular_regression_pipeline" + now_format)
def pipeline(
    project: str = PROJECT_ID,
    model_display_name: str = MODEL_DISPLAY_NAME,
    serving_container_image_uri: str = "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-9:latest",
):
    from google_cloud_pipeline_components.types import artifact_types
    from google_cloud_pipeline_components.v1.custom_job import \
        CustomTrainingJobOp
    from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,
                                                              ModelDeployOp)
    from google_cloud_pipeline_components.v1.model import ModelUploadOp
    from kfp.dsl import importer_node

    custom_job_task = CustomTrainingJobOp(
        project=project,
        display_name="custom_tabular_regression_pipeline",
        worker_pool_specs=[
            {
                "containerSpec": {
                    "args": TRAINER_ARGS,
                    "env": [{"name": "AIP_MODEL_DIR", "value": WORKING_DIR}],
                    "imageUri": "gcr.io/google-samples/bw-cc-train:latest",
                },
                "replicaCount": "1",
                "machineSpec": {
                    "machineType": "n1-standard-4",
                },
            }
        ],
    )

    import_unmanaged_model_task = importer_node.importer(
        artifact_uri=WORKING_DIR,
        artifact_class=artifact_types.UnmanagedContainerModel,
        metadata={
            "containerSpec": {
                "imageUri": "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-9:latest",
            },
        },
    ).after(custom_job_task)

    model_upload_op = ModelUploadOp(
        project=project,
        display_name=model_display_name,
        unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
    )
    model_upload_op.after(import_unmanaged_model_task)

    endpoint_create_op = EndpointCreateOp(
        project=project,
        display_name="custom_tabular_regression_pipeline",
    )

    ModelDeployOp(
        endpoint=endpoint_create_op.outputs["endpoint"],
        model=model_upload_op.outputs["model"],
        deployed_model_display_name=model_display_name,
        dedicated_resources_machine_type="n1-standard-4",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
    )

## Compile the pipeline

Next, compile the pipeline.

In [15]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="custom_tabular_regression_pipeline.json",
)

## Run the pipeline

Next, run the pipeline.

In [16]:
shell_output = ! gcloud projects describe  $PROJECT_ID
project_number = shell_output[-1].split(":")[1].strip().replace("'", "")

SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

print(f"SERVICE_ACCOUNT: {SERVICE_ACCOUNT}")

SERVICE_ACCOUNT: 721521243942-compute@developer.gserviceaccount.com


In [17]:
DISPLAY_NAME = "custom_tabular_regression_pipeline_" + now_format

job = aiplatform.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="custom_tabular_regression_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    enable_caching=False,
)


job.run(service_account=SERVICE_ACCOUNT)

# ! rm tabular_regression_pipeline.json

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/721521243942/locations/us-central1/pipelineJobs/custom-tabular-regression-pipeline20250102-130234-20250102130439
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/721521243942/locations/us-central1/pipelineJobs/custom-tabular-regression-pipeline20250102-130234-20250102130439')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-tabular-regression-pipeline20250102-130234-20250102130439?project=721521243942
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/721521243942/locations/us-central1/pipelineJobs/custom-tabular-regression-pipeline20250102-130234-20250102130439 current state:
PipelineState.PIPE

Click on the generated link to see your run in the Cloud Console.

<!-- It should look something like this as it is running:

<a href="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/automl_tabular_classif.png" width="40%"/></a> -->

In the UI, many of the pipeline DAG nodes will expand or collapse when you click on them. Here is a partially-expanded view of the DAG (click image to see larger version).

<a href="https://storage.googleapis.com/amy-jo/images/mp/train_endpoint_deploy.png" target="_blank"><img src="https://storage.googleapis.com/amy-jo/images/mp/train_endpoint_deploy.png" width="75%"/></a>

# Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial -- *Note:* this is auto-generated and not all resources may be applicable for this tutorial:
### Get resources from the pipline to clean up
Function to get details of a task

In [None]:
def get_task_detail(
    task_details: List[Dict[str, Any]], task_name: str
) -> List[Dict[str, Any]]:
    for task_detail in task_details:
        if task_detail.task_name == task_name:
            return task_detail

In [None]:
pipeline_task_details = (
    job.gca_resource.job_detail.task_details
)  # fetch pipeline task details


# fetch endpoint from pipeline and delete the endpoint
endpoint_task = get_task_detail(pipeline_task_details, "endpoint-create")
endpoint_resourceName = (
    endpoint_task.outputs["endpoint"].artifacts[0].metadata["resourceName"]
)
endpoint = aip.Endpoint(endpoint_resourceName)
# undeploy model from endpoint
endpoint.undeploy_all()
endpoint.delete()

# fetch model from pipeline and delete the model
model_task = get_task_detail(pipeline_task_details, "model-upload")
model_resourceName = model_task.outputs["model"].artifacts[0].metadata["resourceName"]
model = aip.Model(model_resourceName)
model.delete()

job.delete()

In [None]:
delete_bucket = False
if delete_bucket:
    ! gsutil rm -r $BUCKET_URI