# Vertex AI Pipelines Handson(LightGBM)
- このハンズオンでは Vertex AI Pipelines で LightGBM モデルを実行するパイプラインを作成します。
- 主に Continuous Training を意識したパイプラインになっています。
- LightGBM は専用の pre-build container が存在しないため、Custom Container を利用して、モデルのサービングを行います。
- モデルトレーニングの評価指標をベースに分岐を行います。
- 評価指標が目標を達成している場合は Vertex AI へのモデルの登録とバッチ推論を行い、結果を BigQuery に格納します。

## パッケージのインストール

In [None]:
# 2025/02/12 時点では、Workbench で実行した場合にはこの辺がインストールされている。
# KFP SDK version: 2.5.0
# google-cloud-aiplatform==1.75.0
# kfp==2.5.0
# kfp-pipeline-spec==0.2.2
# kfp-server-api==2.0.5

! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep -e aiplatform -e kfp

In [None]:
# Workbench Instances などを利用している場合など、必要に応じて実施する。
# uninstall については、バグ回避のために入れている。


# !pip uninstall -y protobuf python3-protobuf
# !pip install --no-cache-dir --upgrade "kfp>2" \
#                                         google-cloud-aiplatform

In [None]:
# 2025/02/12 時点では google-cloud-pipeline-components==2.18.0 がインストールされる。

!pip3 install -U google-cloud-pipeline-components
!pip3 freeze | grep google-cloud-pipeline-components

## 環境変数の設定

In [None]:
shell_output = !gcloud config get project
PROJECT_ID = shell_output[0]
PROJECT_ID

In [None]:
REGION = "us-central1"
BQ_REGION = REGION.split("-")[0].upper()

In [None]:
BUCKET_URI = f"gs://your-bucket-name-{PROJECT_ID}-unique"  # @param {type:"string"}

In [None]:
# Workbench 等を利用する時に特別な設定を行ったいない場合は、Default の GCE のサービスアカウントが利用される。
SERVICE_ACCOUNT = ""  # @param {type:"string"}

In [None]:
import sys

IS_COLAB = "google.colab" in sys.modules
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    else:  # IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

In [None]:
PATH = %env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
date_string = !date '+%Y%m%d%H%M%S'
YYYYMMDDHHmmSS = date_string[0]

DATASET_ID = "lightgbm"  # The Data Set ID where the view sits
TABLE_ID = "lightgbm_train_data_raw"
VIEW_NAME = "lightgbm_train_data"  # BigQuery view you create for input data


PIPELINE_ROOT = f"{BUCKET_URI}/vai_pipelines_handson_pipeline_lightGBM_{YYYYMMDDHHmmSS}"  # This is where all pipeline artifacts are sent. You'll need to ensure the bucket is created ahead of time
PIPELINE_ROOT
print(f"PIPELINE_ROOT: {PIPELINE_ROOT}")


CONTAINER_IMAGE_URL=f"us-central1-docker.pkg.dev/{PROJECT_ID}/hellocustomprediction/hellocustomprediction:latest"
print(CONTAINER_IMAGE_URL)

## 環境構築

In [None]:
! gcloud services enable aiplatform.googleapis.com

In [None]:
! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

In [None]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

### LightGBM 公式からデータをダウンロードする
通常は他システムからのファイル連携や API、ETL の処理でトレーニングデータを受け取る

In [None]:
!git clone https://github.com/microsoft/LightGBM.git


In [None]:
input_filename = "LightGBM/examples/regression/regression.train"
with open(input_filename, 'r') as f:
    data = f.read()
    data = data.replace('\t', ',')
    
output_filename = "LightGBM/examples/regression/regression_train.csv"
with open(output_filename, 'w') as f:
    f.write(data)

In [None]:
!gsutil cp $output_filename $BUCKET_URI/regression_train.csv

### BQ に生データをロードする

In [None]:
# Create a BQ Dataset in the project.
!bq mk --location=$BQ_REGION --dataset $PROJECT_ID:$DATASET_ID

In [None]:
from google.cloud import bigquery

In [None]:
client = bigquery.Client(project=PROJECT_ID)

# ジョブ構成を作成
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV,
    autodetect=True
)
job_config.source_format = bigquery.SourceFormat.CSV

# ロードジョブを実行
load_job = client.load_table_from_uri(
    f"{BUCKET_URI}/regression_train.csv",
    f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}",
    job_config=job_config
)




In [None]:
# load_job.result()

## Vertex AI Pipelines の利用準備

### ライブラリのインポート

In [None]:
import google.cloud.aiplatform as aiplatform
import kfp
from kfp import compiler, dsl
from kfp.dsl import Artifact, Dataset, Input, Metrics, Model, Output, component
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.types import artifact_types
from kfp.dsl import importer_node
from typing import NamedTuple

### Vertex AI の初期化

In [None]:
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

## パイプラインコンポーネントの定義

### BigQuery の View を作成するコンポーネント

In [None]:
@component(
    packages_to_install=["google-cloud-bigquery==3.29.0"],
)
def create_view(
    project_id: str,
    dataset_id: str,
    table_id: str,
    view_name: str,
):
    from google.cloud import bigquery

    client = bigquery.Client(project=project_id)

    # TODO: 「*」指定は Bad Practice ですが今回は簡単のためこちらで実施。
    create_or_replace_view = f"""
        CREATE OR REPLACE VIEW
        {dataset_id}.{view_name} AS
        SELECT
            *
        FROM
          `{project_id}.{dataset_id}.{table_id}`
    """

    job_config = bigquery.QueryJobConfig()
    query_job = client.query(query=create_or_replace_view, job_config=job_config)
    query_job.result()

### LightGBM 用にトレーニングデータを csv 出力するコンポーネント

In [None]:
@component(
    packages_to_install=["google-cloud-bigquery[pandas]==3.29.0"],
)
def export_dataset(
    project_id: str,
    dataset_id: str,
    view_name: str,
    dataset: Output[Dataset],
):
    from google.cloud import bigquery

    client = bigquery.Client(project=project_id)

    table_name = f"{project_id}.{dataset_id}.{view_name}"
    query = f"""
    SELECT
      *
    FROM
      `{table_name}`
    """

    job_config = bigquery.QueryJobConfig()
    query_job = client.query(query=query, job_config=job_config)
    df = query_job.result().to_dataframe()
    df.to_csv(dataset.path, index=False)

### LightGBM のトレーニングを行うコンポーネント

In [None]:
@component(
    packages_to_install=[
        "lightgbm==4.5.0",
        "pandas==2.2.3",
        "scikit-learn==1.6.1",
    ],
)
def lightgbm_training(
    dataset: Input[Dataset],
    model: Output[Model],
    metrics: Output[Metrics],
) -> NamedTuple("Outputs", [("auc", float), ("model_uri", str)]) :
    import os

    import pandas as pd
    import lightgbm as lgb
    from sklearn.metrics import (accuracy_score, precision_recall_curve,
                                 roc_auc_score)
    from sklearn.model_selection import (RandomizedSearchCV, StratifiedKFold,
                                         train_test_split)

    # Load the training dataset
    with open(dataset.path, "r") as train_data:
        raw_data = pd.read_csv(train_data, header=None, skiprows=1)


    # TODO: 今回のデータセットは train dataset を train/test に split しているが、test の dataset は初めから分かれているケースもありケースバイケース
    y = raw_data[0]    
    X = raw_data.drop(0, axis=1)
    X_train, X_test, y_train, y_test = train_test_split(X, y)
    lgb_train = lgb.Dataset(X_train, y_train)
    lgb_eval = lgb.Dataset(X_test, y_test, reference=lgb_train)


    # train
    params = {
        "boosting_type": "gbdt",
        "objective": "binary",
        'metric': 'auc',
        "num_leaves": 31,
        "learning_rate": 0.05,
        "feature_fraction": 0.9,
        "bagging_fraction": 0.8,
        "bagging_freq": 5,
        "verbose": 0,
    }
    gbm = lgb.train(
        params, lgb_train, num_boost_round=20, valid_sets=lgb_eval, callbacks=[lgb.early_stopping(stopping_rounds=5)]
    )
    
    

    # evaluation
    predictions = gbm.predict(X_test)
    auc = roc_auc_score(y_test, predictions)
    _ = precision_recall_curve(y_test, predictions)

    metrics.log_metric("framework", "lightgbm")
    metrics.log_metric("dataset_size", len(raw_data))
    metrics.log_metric("AUC", auc)
    

    # Export the model to a file
    os.makedirs(model.path, exist_ok=True)
    gbm.save_model(os.path.join(model.path, "model.lgb")) 
    
    return (auc,model.uri)

### エラーメッセージを出力するコンポーネント

In [None]:
@component
def print_message():
    print("[W99999] We should not deploy the new model")

## モデルサービングのためのコンテナを作成する（=LightGBM を動かすコンテナの作成）

### Artifact Registry にリポジトリを作成

In [None]:
!gcloud artifacts repositories create hellocustomprediction \
 --repository-format=docker \
 --location=us-central1
!gcloud artifacts repositories list

### コンテナを作成

In [None]:
!docker build \
  --tag=us-central1-docker.pkg.dev/{PROJECT_ID}/hellocustomprediction/hellocustomprediction \
  -f app/Dockerfile \
  app

### Artifact Registry に登録（Push）

In [None]:
!gcloud auth configure-docker --quiet us-central1-docker.pkg.dev
!docker push us-central1-docker.pkg.dev/{PROJECT_ID}/hellocustomprediction/hellocustomprediction

### ※実施不要※ Vertex AI にモデルを手動でデプロイして確認をおこないた場合に実施する。

In [None]:
!gcloud ai models upload \
  --region=us-central1 \
  --display-name=hellocustomprediction \
  --container-image-uri="us-central1-docker.pkg.dev/{PROJECT_ID}/hellocustomprediction/hellocustomprediction:latest" \
  --container-health-route="/" \
  --container-predict-route="/predict" \
  --container-ports="8080"

In [None]:
!gcloud ai models list\
  --region=us-central1

In [None]:
!gcloud ai endpoints create \
  --region=us-central1 \
  --display-name=endpoint_hellocustomprediction


In [None]:
!gcloud ai endpoints list \
  --region=us-central1 \
  --filter=display_name=endpoint_hellocustomprediction

In [None]:
# 前のコマンドの結果（gcloud ai models list, gcloud ai endpoints list）を確認して値を設定してください。
MODEL_ID=5812195485796007936
ENDPOINT_ID=5512869937808408576
!echo {MODEL_ID}
!echo {ENDPOINT_ID}

In [None]:
!gcloud ai endpoints deploy-model {ENDPOINT_ID} \
  --region=us-central1 \
  --model={MODEL_ID} \
  --display-name=hellocustomprediction \
  --machine-type=n1-standard-2 \
  --min-replica-count=1 \
  --max-replica-count=1 \
  --traffic-split=0=100 \
  --service-account={SERVICE_ACCOUNT}

In [None]:
!gcloud ai endpoints raw-predict {ENDPOINT_ID} \
--project={PROJECT_ID} --region=us-central1 \
--http-headers=Content-Type=application/json --request=@request.json

## パイプラインの定義（定義したコンポーネントを利用）

In [None]:
CONTAINER_IMAGE_URL

In [None]:
@dsl.pipeline(
    name="vai-pipelines-handson-lightgbm",
)
def pipeline():
    create_input_view_task = create_view(
        project_id=PROJECT_ID,
        dataset_id=DATASET_ID,
        table_id=TABLE_ID,
        view_name=VIEW_NAME,
    )

    export_dataset_task = (
        export_dataset(
            project_id=PROJECT_ID,
            dataset_id=DATASET_ID,
            view_name=VIEW_NAME,
        )
        .after(create_input_view_task)
        .set_caching_options(False)
    )

    training_task = lightgbm_training(
        dataset=export_dataset_task.outputs["dataset"],
    )
    

    with dsl.If(training_task.outputs["auc"] > 0.7, name="Condition: AUC is OK"):
        import_unmanaged_model_task = importer_node.importer(
            # artifact_uri="gs://your-bucket-name-yuyaono-dev1-unique/vai_pipelines_handson_pipeline_lightGBM/635273855015/vai-pipelines-handson-lightgbm-20240423023838/lightgbm-training_-2268097256068481024/model",
            artifact_uri=training_task.outputs["model_uri"],
            artifact_class=artifact_types.UnmanagedContainerModel,
            metadata={
                "artifactUri": training_task.outputs["model_uri"],
                "containerSpec": {
                    "imageUri": CONTAINER_IMAGE_URL,
                    "healthRoute": "/",
                    "predictRoute": "/predict",
                    "env": [
                        {
                            "name": "SRC_MODEL_URI",
                            "value": training_task.outputs["model_uri"]
                        }
                    ]
                },
            },
        )
        

        model_task = ModelUploadOp(
            project=PROJECT_ID,
            display_name="hellocustomprediction_model_upload_op",
            unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
        )
        
        batch_prediction_task = ModelBatchPredictOp(
            project=PROJECT_ID,
            model=model_task.outputs["model"],
            job_display_name="batch_pred_op",
            instances_format="bigquery",
            bigquery_source_input_uri=f"bq://{PROJECT_ID}.{DATASET_ID}.lightgbm_train_data_raw_batch_src",
            predictions_format="bigquery",
            bigquery_destination_output_uri=f"bq://{PROJECT_ID}.{DATASET_ID}.lightgbm_train_data_raw_batch_dst",
            machine_type="n1-standard-4",
            starting_replica_count=1,
            max_replica_count=1,
            service_account=SERVICE_ACCOUNT
        )
        
    with dsl.Else():
        print_message()

        

## パイプラインのコンパイル（YAML 生成）

In [None]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline-lightgbm.yaml")

## パイプラインの実行

In [None]:
job = aiplatform.PipelineJob(
    display_name="vai-pipelines-handson-lightgbm",
    template_path="pipeline-lightgbm.yaml",
    pipeline_root=PIPELINE_ROOT,
)

job.run()

## その他の便利手順（カンペ）

- 推論のバッチ実行で使う、SRC/DST の Table を BigQuery 上につくる

```
ALTER TABLE 【PROJECT_ID】.lightgbm.lightgbm_train_data_raw_batch_src
DROP COLUMN int64_field_0;


DELETE FROM 【PROJECT_ID】.lightgbm.lightgbm_train_data_raw_batch_dst WHERE 1=1;


ALTER TABLE 【PROJECT_ID】.lightgbm.lightgbm_train_data_raw_batch_dst
DROP COLUMN prediction;


ALTER TABLE 【PROJECT_ID】.lightgbm.lightgbm_train_data_raw_batch_dst
ADD COLUMN prediction STRING;

ALTER TABLE 【PROJECT_ID】.lightgbm.lightgbm_train_data_raw_batch_dst
ADD COLUMN prediction_error STRING;


select * from 【PROJECT_ID】.lightgbm.lightgbm_train_data_raw_batch_dst
```