# Vertex Pipelines: Qwik Start

## Vertex Pipelines settings

There are a few additional libraries you'll need to install in order to use Vertex Pipelines:

- `Kubeflow Pipelines`: This is the SDK used to build the pipeline. Vertex Pipelines supports running pipelines built with both Kubeflow Pipelines or TFX.
- `Google Cloud Pipeline Components`: This library provides pre-built components that make it easier to interact with Vertex AI services from your pipeline steps.

### Step 1: Create Python notebook and install libraries

From the Launcher menu in your Notebook instance, create a notebook by selecting Python 3:

<img src="img/GCP_python.png">

You can access the Launcher menu by clicking on the + sign in the top left of your notebook instance.

To install both services needed for this lab, first set the user flag in a notebook cell:

In [1]:
USER_FLAG = "--user"

Then run the following from your notebook:

In [2]:
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.0.0 --upgrade
!pip3 install {USER_FLAG} kfp google-cloud-pipeline-components==0.1.1 --upgrade

Collecting google-cloud-aiplatform==1.0.0
  Downloading google_cloud_aiplatform-1.0.0-py2.py3-none-any.whl (1.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m00:01[0m:00:01[0m
Collecting google-cloud-storage<2.0.0dev,>=1.32.0 (from google-cloud-aiplatform==1.0.0)
  Downloading google_cloud_storage-1.44.0-py2.py3-none-any.whl (106 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m106.8/106.8 kB[0m [31m15.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting google-cloud-bigquery<3.0.0dev,>=1.15.0 (from google-cloud-aiplatform==1.0.0)
  Downloading google_cloud_bigquery-2.34.4-py2.py3-none-any.whl (206 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m206.6/206.6 kB[0m [31m26.6 MB/s[0m eta [36m0:00:00[0m
Collecting packaging>=14.3 (from google-cloud-aiplatform==1.0.0)
  Downloading packaging-21.3-py3-none-any.whl (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m

After installing these packages you'll need to restart the kernel:

In [1]:
import os

if not os.getenv("IS_TESTING"):
    # インストール後カーネルを自動で再起動
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Finally, check that you have correctly installed the packages. The KFP SDK version should be >=1.6:

In [1]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.22
google_cloud_pipeline_components version: 0.1.1


### Step 2: Set your project ID and bucket

Throughout this training you'll reference your Cloud Project ID and the bucket you created earlier. Next you'll create variables for each of those.

Then create a variable to store your bucket name.

In [2]:
PROJECT_ID = "ccbd-ecbdp-bds"
BUCKET_NAME = "gs://ccbd-bds-aa-kfp-test2"

### Step 3: Import libraries

Add the following to import the libraries you'll be using throughout this lab:

In [3]:
from typing import NamedTuple

import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, ClassificationMetrics, Metrics, component)
from kfp.v2.google.client import AIPlatformClient

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip

### Step 4: Define constants

The last thing you need to do before building the pipeline is define some constant variables. `PIPELINE_ROOT` is the Cloud Storage path where the artifacts created by your pipeline will be written. You're using `asia-northeast1`. as the region here:

In [4]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

REGION="us-central1"
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
print(PIPELINE_ROOT)

env: PATH=/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin
gs://ccbd-bds-aa-kfp-test2/pipeline_root/


After running the code above, you should see the root directory for your pipeline printed. This is the Cloud Storage location where the artifacts from your pipeline will be written. It will be in the format of `gs://<bucket_name>/pipeline_root/`.

## Creating an end-to-end ML pipeline

It's time to build your first ML pipeline. In this pipeline, you'll use the UCI Machine Learning Dry Beans dataset, from: KOKLU, M. and OZKAN, I.A., (2020), "Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques."In Computers and Electronics in Agriculture, 174, 105507. DOI.

<font color="CornflowerBlue">**Note**: This pipeline will take over 2 hours to complete. So you will not need to wait the entire duration of the pipeline to complete the lab. Follow the steps until your pipeline job has started.</font>

This is a tabular dataset, and in your pipeline you'll use the dataset to train, evaluate, and deploy an AutoML model that classifies beans into one of 7 types based on their characteristics.

This pipeline will:

- Create a Dataset in Vertex AI
- Train a tabular classification model with AutoML
- Get evaluation metrics on this model
- Based on the evaluation metrics, decide whether to deploy the model using conditional logic in Vertex Pipelines
- Deploy the model to an endpoint using Vertex Prediction

Each of the steps outlined will be a component. Most of the pipeline steps will use pre-built components for Vertex AI services via the `google_cloud_pipeline_components` library you imported earlier in this lab.

In this section, we'll define one custom component first. Then, we'll define the rest of the pipeline steps using pre-built components. Pre-built components make it easier to access Vertex AI services, like model training and deployment.

The majority of time for this step is for the AutoML training piece of this pipeline, which will take about an hour.

### Step 1: A custom component for model evaluation

The custom component you'll define will be used towards the end of the pipeline once model training has completed. This component will do a few things:

- Get the evaluation metrics from the trained AutoML classification model
- Parse the metrics and render them in the Vertex Pipelines UI
- Compare the metrics to a threshold to determine whether the model should be deployed

Before defining the component, understand its input and output parameters. As input, this pipeline takes some metadata on your Cloud project, the resulting trained model (you'll define this component later), the model's evaluation metrics, and a `thresholds_dict_str`.

The `thresholds_dict_str` is something you'll define when you run your pipeline. In the case of this classification model, this will be the area under the ROC curve value for which you should deploy the model. For example, if you pass in 0.95, that means you'd only like your pipeline to deploy the model if this metric is above 95%.

The evaluation component returns a string indicating whether or not to deploy the model.

- Add the following in a notebook cell to create this custom component:

In [5]:
@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tables_eval_component.yaml", # Optional: you can use this to load the component later
    packages_to_install=["google-cloud-aiplatform"],
)
def classif_model_eval_metrics(
    project: str,
    location: str,  # "region",
    api_endpoint: str,  # "region-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Model],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    """これは AutoML の表形式分類モデルの評価指標をレンダリングする関数である。
    AutoML の表形式トレーニング プロセスによって生成された分類モデル評価を取得し、解析作業を行って、その情報を元にモデルの ROC 曲線と混同行列をレンダリングする。また与えられた指標のしきい値情報を用いて、評価結果と比較を行い、モデルの精度がデプロイするのに十分かどうかを判断する。
    """
    import json
    import logging

    from google.cloud import aiplatform

    # モデルの評価情報をフェッチ
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # 与えられた指標のしきい値を用いてモデルの精度がデプロイするのに
    # 十分かどうかを判断。
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # より高い方が良い
                if metrics_dict[k] < v:  # しきい値を下回る場合はデプロイしない
                    logging.info(
                        "{} < {}; returning False".format(metrics_dict[k], v)
                    )
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # ROC 曲線をロギング
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # 混同行列をロギング
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # テキストの指標情報もロギング
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aiplatform.init(project=project, location=location)
    # モデルのリソース名を入力 Model Artifact から抽出
    model_resource_path = model.uri.replace("aiplatform://v1/", "")
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # リクエストの作成と送信に使うクライアントを初期化する。
    client = aiplatform.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

### Step 2: Adding Google Cloud pre-built components

In this step you'll define the rest of your pipeline components and see how they all fit together.

1. First, define the display name for your pipeline run using a timestamp:

In [6]:
import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

automl-beans1705385628


2. Then copy the following into a new notebook cell:

In [7]:
@kfp.dsl.pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://ccbd-ecbdp-bds.kfp_test2.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = REGION,
    api_endpoint: str = f"{REGION}-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        location=gcp_region,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classif_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        deploy_op = gcc_aip.ModelDeployOp(  # noqa: F841
            model=training_op.outputs["model"],
            project=project,
            machine_type="e2-standard-4",
        )

What's happening in this code:

- First, just as in the previous pipeline, you define the input parameters this pipeline takes. You need to set these manually since they don't depend on the output of other steps in the pipeline.
- The rest of the pipeline uses a few pre-built components for interacting with Vertex AI services:
    - `TabularDatasetCreateOp` creates a tabular dataset in Vertex AI given a dataset source either in Cloud Storage or BigQuery. In this pipeline, you're passing the data via a BigQuery table URL.
    - `AutoMLTabularTrainingJobRunOp` kicks off an AutoML training job for a tabular dataset. You pass a few configuration parameters to this component, including the model type (in this case, classification), some data on the columns, how long you'd like to run training for, and a pointer to the dataset. Notice that to pass in the dataset to this component, you're providing the output of the previous component via `dataset_create_op.outputs["dataset"]` .
    - ModelDeployOp deploys a given model to an endpoint in Vertex AI. There are additional configuration options available, but here you're providing the endpoint machine type, project, and model you'd like to deploy. You're passing in the model by accessing the outputs of the training step in your pipeline.
- This pipeline also makes use of **conditional logic**, a feature of Vertex Pipelines that lets you define a condition, along with different branches based on the result of that condition. Remember that when you defined the pipeline you passed a `thresholds_dict_str` parameter. This is the accuracy threshold you're using to determine whether to deploy your model to an endpoint. To implement this, make use of the `Condition` class from the KFP SDK. The condition passed in is the output of the custom eval component you defined earlier in this lab. If this condition is true, the pipeline will continue to execute the `deploy_op` component. If accuracy doesn't meet the predefined threshold, the pipeline will stop here and won't deploy a model.

### Step 3: Compile and run the end-to-end ML pipeline

1. With your pipeline defined, you're ready to compile it. The following will generate a JSON file that you'll use to run the pipeline:

In [8]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)



2. Next, instantiate an API client:

In [9]:
api_client = AIPlatformClient(
    project_id=PROJECT_ID,
    region=REGION,
)



3. Finally, kick off a pipeline run:

In [10]:
response = api_client.create_run_from_job_spec(
    "tab_classif_pipeline.json", pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID,"display_name": DISPLAY_NAME}
)

INFO:googleapiclient.discovery:URL being requested: POST https://us-central1-aiplatform.googleapis.com/v1beta1/projects/ccbd-ecbdp-bds/locations/us-central1/pipelineJobs?pipelineJobId=automl-tab-beans-training-v2-20240116061418&alt=json


4. Click on the link shown after running the cell above to see your pipeline in the console. **This pipeline will take a little over an hour to run**. Most of the time is spent in the AutoML training step. The completed pipeline will look something like this:

5. If you toggle the "Expand artifacts" button at the top, you'll be able to see details for the different artifacts created from your pipeline. For example, if you click on the `dataset` artifact, you'll see details on the Vertex AI dataset that was created. You can click the link here to go to the page for that dataset:

6. Similarly, to see the resulting metric visualizations from your custom evaluation component, click on the artifact called **metricsc**. On the right side of your dashboard, you'll be able to see the confusion matrix for this model:

7. To see the model and endpoint created from this pipeline run, go to the models section and click on the model named `automl-beans`. There you should see this model deployed to an endpoint:

8. You can also access this page by clicking on the **endpoint** artifact in your pipeline graph.

9. In addition to looking at the pipeline graph in the console, you can also use Vertex Pipelines for **Lineage Tracking**.

10. Lineage tracking means tracking artifacts created throughout your pipeline. This can help you understand where artifacts were created and how they are being used throughout an ML workflow. For example, to see the lineage tracking for the dataset created in this pipeline, click on the dataset artifact and then **View Lineage**:

This shows all the places this artifact is being used:

<font color="CornflowerBlue">**Note**: Wait until the training job in your pipeline has started and then check your progress below. The training job will take longer than the time allotted for this lab but you will be awarded full points for submitting the training job successfully.</font>

### Step 4: Comparing metrics across pipeline runs (Optional)

- If you run this pipeline multiple times, you may want to compare metrics across runs. You can use the `aiplatform.get_pipeline_df()` method to access run metadata. Here, we'll get metadata for all runs of this pipeline and load it into a Pandas DataFrame:

In [None]:
pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

You've now learned how to build, run, and get metadata for an end-to-end ML pipeline on Vertex Pipelines.