# MLOps Stage 3: Automation: Creating a Kubeflow Pipeline

## Overview

In this notebook, we create a Vertex AI Pipeline for training and deploying a XGBoost model, and using Vertex AI Experiments to log training parameters and metrics.

## Objective

Here, we use prebuilt components in Vertex AI Pipelines for training and deploying a XGBoost custom model, and using Vertex AI Experiments to log the corresponding training parameters and metrics, from within the training package.

This notebook uses the following Google Cloud ML services:
- Google Cloud Pipeline Components
- Vertex AI Training
- Vertex AI Pipelines
- Vertex AI Experiments

The steps performed include:
- Construct a XGBoost training package.
- Add tracking the experiment
    - Construct a pipeline to train and deploy a XGBoost model.
- Execute the pipeline.

## Dataset

The dataset used in this example is the Synthetic Financial Fraud dataset from Kaggle. PaySim simulates mobile money transactions based on a sample of real transactions extracted from one month of financial logs from a mobile money service implemented in an African country. The original logs were provided by a multinational company, who is the provider of the mobile financial service which is currently running in more than 14 countries all around the world.

## Installation

Install the following packages for executing this notebook.

In [1]:
import os

# The Vertex AI Workbench Notebook product has specific requirements
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME") and not os.getenv("VIRTUAL_ENV")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists(
    "/opt/deeplearning/metadata/env_version"
)

# Vertex AI Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_WORKBENCH_NOTEBOOK:
    USER_FLAG = "--user"

! pip3 install {USER_FLAG} --upgrade --quiet google-cloud-aiplatform \
                                             google-cloud-pipeline-components \
                                             kfp 

## Restart the Kernel

Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [2]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Set up Project Information

In [1]:
PROJECT_ID = "bq-experiments-350102"

In [2]:
REGION = "us-central1"

In [3]:
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [4]:
BUCKET_NAME = "bq-experiments-fraud" 
BUCKET_URI = f"gs://{BUCKET_NAME}"

In [5]:
! gsutil ls -al $BUCKET_URI

 493534783  2022-08-25T16:24:56Z  gs://bq-experiments-fraud/synthetic-fraud.csv#1661444696515532  metageneration=1
      2133  2022-11-11T15:17:22Z  gs://bq-experiments-fraud/trainer_fraud.tar.gz#1668179842539274  metageneration=1
                                 gs://bq-experiments-fraud/mqmcvfd2/
                                 gs://bq-experiments-fraud/pipelines/
                                 gs://bq-experiments-fraud/q0pjoruv/
                                 gs://bq-experiments-fraud/vy5rkufq/
TOTAL: 2 objects, 493536916 bytes (470.67 MiB)


## Import Libraries

In [6]:
import json
import os

import google.cloud.aiplatform as aip
import tensorflow as tf
from kfp import dsl
from kfp.v2 import compiler

2022-11-15 16:35:56.180224: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-11-15 16:36:01.374886: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2022-11-15 16:36:09.320917: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64:/usr/local/cuda/lib:/usr/local/lib/x86_64-linux-gnu:/usr/local/nvidia/lib:/usr/local/nvidia/lib64:/usr/local/nvidia/lib:/usr/local/nvidia/lib64
2022-11-15 16:36:09.321334: W tensorflow/strea

## Initialize Vertex AI SDK

In [7]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

## Set Pre-built Containers

In [17]:
TRAIN_VERSION = "xgboost-cpu.1-1"
DEPLOY_VERSION = "xgboost-cpu.1-1"

TRAIN_IMAGE = "{}-docker.pkg.dev/vertex-ai/training/{}:latest".format(
    REGION.split("-")[0], TRAIN_VERSION
)

DEPLOY_IMAGE = "{}-docker.pkg.dev/vertex-ai/serving/{}:latest".format(
    REGION.split("-")[0], TRAIN_VERSION
)

print(TRAIN_IMAGE)
print(DEPLOY_IMAGE)

us-docker.pkg.dev/vertex-ai/training/xgboost-cpu.1-1:latest
us-docker.pkg.dev/vertex-ai/serving/xgboost-cpu.1-1:latest


## Set Machine Type

In [9]:
if os.getenv("IS_TESTING_TRAIN_MACHINE"):
    MACHINE_TYPE = os.getenv("IS_TESTING_TRAIN_MACHINE")
else:
    MACHINE_TYPE = "n1-standard"

VCPU = "4"
TRAIN_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Train machine type", TRAIN_COMPUTE)

Train machine type n1-standard-4


## XGBoost Model Training

### Package Assembly

In [47]:
# Make folder for Python training script
! rm -rf custom
! mkdir custom

# Add package information
! touch custom/README.md

setup_cfg = "[egg_info]\n\ntag_build =\n\ntag_date = 0"
! echo "$setup_cfg" > custom/setup.cfg

setup_py = "import setuptools\n\nsetuptools.setup(\n\n    install_requires=[\n\n        'cloudml-hypertune',\n\n    ],\n\n    packages=setuptools.find_packages())"
! echo "$setup_py" > custom/setup.py

pkg_info = "Metadata-Version: 1.0\n\nName: Financial Fraud Classification\n\nVersion: 0.0.0\n\nSummary: Demostration training script\n\nHome-page: www.google.com\n\nAuthor: Google\n\nAuthor-email: bryanfreeman@google.com\n\nLicense: Public\n\nDescription: Demo\n\nPlatform: Vertex"
! echo "$pkg_info" > custom/PKG-INFO

# Make the training subfolder
! mkdir custom/trainer
! touch custom/trainer/__init__.py

### Create Task Script

In [48]:
%%writefile custom/trainer/task.py
import datetime
import os
import sys
import pandas as pd
import xgboost as xgb
import argparse
import logging
import numpy as np
import json

from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

import subprocess
subprocess.check_call(['pip3', 'install', "--upgrade",
                       "google-cloud-aiplatform", 
                       "google-cloud-bigquery"], 
                      stderr=sys.stdout
                     )

from google.cloud import bigquery
import google.cloud.aiplatform as aiplatform

# SET UP TRAINING SCRIPT ARGUMENTS
parser = argparse.ArgumentParser()
parser.add_argument('--model-dir', dest='model_dir',
                    default=os.getenv('AIP_MODEL_DIR'), type=str, help='Model dir.')
parser.add_argument("--project-id", dest="project_id",
                    type=str, help="Project id for bigquery client.")
parser.add_argument("--bq-table", dest="bq_table",
                    type=str, help="Table location the training data.")

# Args for experiment
parser.add_argument('--experiment', dest='experiment',
                    required=True, type=str,
                    help='Name of experiment')
parser.add_argument('--run', dest='run',
                    required=True, type=str,
                    help='Name of run within the experiment')

args = parser.parse_args()

logging.getLogger().setLevel(logging.INFO)

# Function to retrieve data from BigQuery
def get_data():
    logging.info("Downloading training data from BigQuery: {}, {}".format(args.project_id, args.bq_table))
    logging.info("Creating BigQuery client")
    bqclient = bigquery.Client(project=args.project_id)
    
    logging.info("Loading table data")
    table = bigquery.TableReference.from_string(args.bq_table)
    rows = bqclient.list_rows(table)
    dataframe = rows.to_dataframe()
    
    logging.info("Preparing data for training")
    dataframe["isFraud"] = dataframe["isFraud"].astype(int)
    dataframe.drop(['nameOrig','nameDest','isFlaggedFraud'],axis=1,inplace=True)
    X = pd.concat([dataframe.drop('type', axis=1), pd.get_dummies(dataframe['type'])], axis=1)
    y = X[['isFraud']]
    X = X.drop(['isFraud'],axis=1)
    
    logging.info("Splitting data for training")
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.3,random_state=42, shuffle=True)
    
    logging.info("Finishing get_data")
    return X_train, X_test, y_train, y_test

# Function to train the model
def train_model(X_train, y_train):
    logging.info("Start training ...")
    model = xgb.XGBClassifier(
            scale_pos_weight=734,
            max_depth=7
            learning_rate=0.03289820323933852
    )
    model.fit(X_train, y_train)
    
    logging.info("Training completed")
    return model

# Function to evaluate the model
def evaluate_model(model, X_test, y_test):
    logging.info("Preparing test data ...")
    data_test = xgb.DMatrix(X_test)
    
    logging.info("Getting test predictions ...")
    y_pred = model.predict(X_test)
    
    logging.info("Evaluating predictions ...")
    f1 = f1_score(y_test, y_pred, average='weighted')
    logging.info(f"Evaluation completed with weighted f1 score: {f1}")
    
    logging.info("Finishing ...")
    return f1


# Create a run within the experiment
aiplatform.init(experiment=args.experiment)
aiplatform.start_run(args.run)

with aiplatform.start_execution(
    schema_title="system.ContainerExecution", display_name="xgboost_training"
) as execution:
    logging.info("Starting execution ...")
    X_train, X_test, y_train, y_test = get_data()
    model = train_model(X_train, y_train)

    # GCSFuse conversion
    gs_prefix = 'gs://'
    gcsfuse_prefix = '/gcs/'
    if args.model_dir.startswith(gs_prefix):
        args.model_dir = args.model_dir.replace(gs_prefix, gcsfuse_prefix)
        dirpath = os.path.split(args.model_dir)[0]
        if not os.path.isdir(dirpath):
            os.makedirs(dirpath)

    # Export the classifier to a file
    gcs_model_path = os.path.join(args.model_dir, 'model.bst')
    logging.info("Saving model artifacts to {}". format(gcs_model_path))
    model.save_model(gcs_model_path)

    logging.info("Saving metrics to {}/metrics.json". format(args.model_dir))
    gcs_metrics_path = os.path.join(args.model_dir, 'metrics.json')
    with open(gcs_metrics_path, "w") as f:
        f.write(json.dumps(metric_dict))

aiplatform.end_run()

Writing custom/trainer/task.py


### Store Training Script in Cloud Storage

In [49]:
! rm -f custom.tar custom.tar.gz
! tar cvf custom.tar custom
! gzip custom.tar
! gsutil cp custom.tar.gz $BUCKET_URI/trainer_fraud.tar.gz

custom/
custom/setup.py
custom/setup.cfg
custom/README.md
custom/PKG-INFO
custom/trainer/
custom/trainer/task.py
custom/trainer/__init__.py
Copying file://custom.tar.gz [Content-Type=application/x-tar]...
/ [1 files][  2.2 KiB/  2.2 KiB]                                                
Operation completed over 1 objects/2.2 KiB.                                      


### Generate UUID

In [50]:
import random
import string


# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))


UUID = generate_uuid()

## Construct Custom Training Pipeline

Construct a pipeline for training a custom model using pre-built Google Cloud Pipeline Components for Vertex AI Training, as follows:

1. Pipeline arguments, specify the locations of:
- **python_package:** The custom training Python package.
- **python_module:** The entry module in the package to execute.
- **display_name:** The human readable resource name for generated resources
- **bucket:** The Cloud Storage location to store model artifacts
- **project:** The project ID.
- **region:** The region.

2. Use the prebuilt component CustomPythonPackageTrainingJobRunOp to train a custom model and upload the custom model as a Vertex AI Model resource, where:
- The display name for the model.
- The dataset is specified within the training package.
- The python package are passed into the pipeline.
- The command line arguments for the python package are hardcoded in the call to the component.
- The command line arguments for the name of the experiment and run are hardcoded in the call to the component.
- The training and serving containers are specified in the pipeline definition.
- The component returns the model resource as outputs["model"].\

Note: Since each component is executed as a graph node in its own execution context, you pass the parameter project for each component op, in constrast to doing a aip.init(project=project) if this was a Python script calling the SDK methods directly within the same execution context.

In [51]:
PIPELINE_ROOT = "{}/pipeline_root/custom_xgboost_training".format(BUCKET_URI)
EXPERIMENT_NAME = f"xgboostfraud{UUID}"
BQ_TABLE = "bq-experiments-350102.synthetic_financial_fraud.fraud_data"
MODEL_DIR = "{}/{}".format(BUCKET_URI, UUID)

@dsl.pipeline(
    name="fraud-xgboost",
    description="Train and deploy a custom XGBoost model for fraud detection",
)
def pipeline(
    display_name: str,
    python_package: str,
    python_module: str,
    bucket: str = PIPELINE_ROOT,
    project: str = PROJECT_ID,
    region: str = REGION,
):
    from google_cloud_pipeline_components import aiplatform as gcc_aip

    _ = gcc_aip.CustomPythonPackageTrainingJobRunOp(
        project=project,
        display_name=display_name,
        # Training
        python_package_gcs_uri=python_package,
        python_module_name=python_module,
        container_uri=TRAIN_IMAGE,
        staging_bucket=PIPELINE_ROOT,
        args=[
            "--project-id=" + PROJECT_ID,
            "--bq-table=" + BQ_TABLE,
            "--model_dir=" + MODEL_DIR,
            EXPERIMENT_NAME,
            "--run",
            "fraud-run-1",
        ],
        replica_count=1,
        machine_type=TRAIN_COMPUTE,
        # Serving - As part of this operation, the model is registered to Vertex AI
        model_serving_container_image_uri=DEPLOY_IMAGE,
        model_display_name=display_name,
    )
    

## Compile and Execute The Pipeline

In [52]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="custom_xgboost_training.json"
)

pipeline = aip.PipelineJob(
    display_name="custom_xgboost",
    template_path="custom_xgboost_training.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        "display_name": "fraud_" + UUID,
        "python_package": f"{BUCKET_URI}/trainer_fraud.tar.gz",
        "python_module": "trainer.task",
        "project": PROJECT_ID,
        "region": REGION,
    },
    enable_caching=False,
)

pipeline.run()

! rm -f custom_fraud_training.json



Creating PipelineJob
PipelineJob created. Resource name: projects/402374189238/locations/us-central1/pipelineJobs/fraud-xgboost-20221115173902
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/402374189238/locations/us-central1/pipelineJobs/fraud-xgboost-20221115173902')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/fraud-xgboost-20221115173902?project=402374189238
PipelineJob projects/402374189238/locations/us-central1/pipelineJobs/fraud-xgboost-20221115173902 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/402374189238/locations/us-central1/pipelineJobs/fraud-xgboost-20221115173902 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/402374189238/locations/us-central1/pipelineJobs/fraud-xgboost-20221115173902 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/402374189238/locations/us-central1/pipelineJobs/fraud-xgboost-

RuntimeError: Job failed with:
code: 9
message: "The DAG failed because some tasks failed. The failed tasks are: [custompythonpackagetrainingjob-run].; Job (project_id = bq-experiments-350102, job_id = 5172341291081531392) is failed due to the above error.; Failed to handle the job: {project_number = 402374189238, job_id = 5172341291081531392}"


## View Model Training Results

In [None]:
PROJECT_NUMBER = pipeline.gca_resource.name.split("/")[1]
print(PROJECT_NUMBER)


def print_pipeline_output(job, output_task_name):
    JOB_ID = job.name
    print(JOB_ID)
    for _ in range(len(job.gca_resource.job_detail.task_details)):
        TASK_ID = job.gca_resource.job_detail.task_details[_].task_id
        EXECUTE_OUTPUT = (
            PIPELINE_ROOT
            + "/"
            + PROJECT_NUMBER
            + "/"
            + JOB_ID
            + "/"
            + output_task_name
            + "_"
            + str(TASK_ID)
            + "/executor_output.json"
        )
        GCP_RESOURCES = (
            PIPELINE_ROOT
            + "/"
            + PROJECT_NUMBER
            + "/"
            + JOB_ID
            + "/"
            + output_task_name
            + "_"
            + str(TASK_ID)
            + "/gcp_resources"
        )
        EVAL_METRICS = (
            PIPELINE_ROOT
            + "/"
            + PROJECT_NUMBER
            + "/"
            + JOB_ID
            + "/"
            + output_task_name
            + "_"
            + str(TASK_ID)
            + "/evaluation_metrics"
        )
        if tf.io.gfile.exists(EXECUTE_OUTPUT):
            ! gsutil cat $EXECUTE_OUTPUT
            return EXECUTE_OUTPUT
        elif tf.io.gfile.exists(GCP_RESOURCES):
            ! gsutil cat $GCP_RESOURCES
            return GCP_RESOURCES
        elif tf.io.gfile.exists(EVAL_METRICS):
            ! gsutil cat $EVAL_METRICS
            return EVAL_METRICS

    return None

print("custompythonpackagetrainingjob-run")
artifacts = print_pipeline_output(pipeline, "custompythonpackagetrainingjob-run")
print("\n\n")
output = !gsutil cat $artifacts
output = json.loads(output[0])
model_id = output["artifacts"]["model"]["artifacts"][0]["metadata"]["resourceName"]
print("\n\n")