In [1]:
from typing import NamedTuple
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component
from kfp.v2.google.client import AIPlatformClient
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        component)

import pandas as pd

In [2]:
# This shell-command outputs default project
shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
PROJECT_ID = shell_output[0]
REGION = "us-central1"
BUCKET_NAME = "gs://vertex-ai-tutorial-368805-bucket"

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
USER = "sam_oz"
PIPELINE_ROOT = "{}/pipeline_root3".format(BUCKET_NAME)
# If there are multiple users, it is better to use username in path:
# PIPELINE_ROOT = "{}/pipeline_root/{}".format(BUCKET_NAME, USER)

PIPELINE_ROOT

env: PATH=/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin


'gs://vertex-ai-tutorial-368805-bucket/pipeline_root3'

In [3]:
# Get and Split Data from BQ Table
@component(packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow", "sklearn"]) 
def get_data(dataset_train: Output[Dataset],dataset_test: Output[Dataset]):
    '''Query data warehouse and return dataframe'''
    from google.cloud import bigquery
    client = bigquery.Client()
    project = "samet-sandbox2"
    dataset_id = "regular_dataset"

    dataset_ref = bigquery.DatasetReference(project, dataset_id)
    table_ref = dataset_ref.table("IOT_Data")
    table = client.get_table(table_ref)

    data = client.list_rows(table).to_dataframe()
    
    from sklearn.model_selection import train_test_split as tts
    train, test = tts(data, test_size=0.3)
    
    train.to_csv(dataset_train.path)
    test.to_csv(dataset_test.path)

In [4]:
@component(packages_to_install = ["pandas","sklearn","xgboost"],)
def train_xgb_model(
    dataset: Input[Dataset],
    model_artifact: Output[Model]):
    
    from xgboost import XGBRegressor
    import pandas as pd
    # Get data from path: get_data().outputs["dataset_train"]
    data = pd.read_csv(dataset.path)

    # Instantiate algorithm
    model = XGBRegressor(
    n_estimators=150,
    reg_lambda=15,
    gamma=0,
    max_depth=3)
    
    X_train = data.drop(columns=["t0"])
    y_train = data['t0']
    
    # Fit the model
    model.fit(X_train,y_train,)
    
    # Calculate score.First argument is X, second is Y. 
    # Returns the coefficient of determination R^2 of the prediction.
    score = model.score(
        data.drop(columns=["t0"]),
        data['t0'],)
    
    # This functions output is a Model; which we defined as "Output[Model]" in functions parameters.
    # We can reach model's metadata.
    model_artifact.metadata["train_score"] = float(score) # model_artifact is Output[Model]
    model_artifact.metadata["framework"] = "XGBoost" # We define this metadata.
    
    model.save_model(model_artifact.path)


In [5]:
@component(packages_to_install = ["pandas","sklearn","xgboost","numpy"],)
def eval_model(
    test_set: Input[Dataset],
    xgb_model: Input[Model],
    metrics: Output[Metrics]
):
    from xgboost import XGBRegressor
    import pandas as pd
    import numpy as np
    
    data = pd.read_csv(test_set.path)
    model = XGBRegressor()
    model.load_model(xgb_model.path)
    
    X_test = data.drop(columns=["t0"])
    y_test = data['t0']
    
    score = model.score(X_test,y_test)
    y_pred = model.predict(X_test)
    from sklearn.metrics import mean_absolute_error
    mae = mean_absolute_error(y_test, y_pred)
    mape = np.sum(np.abs(y_test - y_pred) / y_test)
    metrics.log_metric("Mean Absolute Error", mae)
    metrics.log_metric("R-Square", score)
    metrics.log_metric("MAPE", mape)
    
    xgb_model.metadata["test_score"] = float(score)
    xgb_model.metadata["mae_score"] = float(mae)

In [6]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline. Use to determine the pipeline Context.
    name="pipeline-test-1",
)
def pipeline():
    dataset_op = get_data()
    train_op = train_xgb_model(dataset_op.outputs["dataset_train"])
    eval_op = eval_model(test_set=dataset_op.outputs["dataset_test"],xgb_model=train_op.outputs["model_artifact"])
    
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='xgb_pipe.json')



In [7]:
from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(project_id=PROJECT_ID, region=REGION)

response = api_client.create_run_from_job_spec(
job_spec_path="xgb_pipe.json",
# pipeline_root=PIPELINE_ROOT # this argument is necessary if you did not specify PIPELINE_ROOT as part of the pipeline definition.
)

