In [58]:
import os
import google.cloud.aiplatform as aip
import kfp
import pandas as pd

from datetime import datetime
from google.cloud import storage
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import pipeline, component, Output, Metrics, Dataset, Input, Model
from typing import NamedTuple
from src import util, data_preparation, preprocessing, modelling

config = util.load_config()

In [59]:
VERSION = 'v01' # Please change so it doesn't replace previous version

PIPELINE_ROOT = f'{config["BUCKET"]}/pipelines/{config["DATASET_DISPLAY_NAME"]}'
PIPELINE_DISPLAY_NAME = f'{config["DATASET_DISPLAY_NAME"]}-train-pipeline-{VERSION}'
MODEL_DIRECTORY = f'{config["BUCKET"]}/models/{config["DATASET_DISPLAY_NAME"]}'
MODEL_DISPLAY_NAME = f'{config["DATASET_DISPLAY_NAME"]}-classifier-{VERSION}'

In [92]:
@dsl.component(
    base_image="bank-marketing-pipeline-image:latest",
    packages_to_install=[
        'google-cloud-bigquery',
        'google-cloud-storage',
        'db-dtypes',
        'joblib',
        'numpy',
        'pandas',
        'scikit-learn',
        'imblearn'
    ]
)
def data_preparation(
    config: dict,
    train: Output[Dataset],
    valid: Output[Dataset],
    test: Output[Dataset]):
    
    # 1. Load configuration file
    config = util.load_config()
    
    # 2. Read raw dataset
    raw_dataset = pd.read_csv(f'{config["BUCKET"]}{config["dataset_path"]}', delimiter=config['dataset_delimiter'])
    
    # 3. Data defense for non API data
    data_preparation.check_data(raw_dataset, config)
    
    # 4. Splitting train, valid, and test set
    X_train, X_valid, X_test, \
       y_train, y_valid, y_test = data_preparation.split_data(raw_dataset, config)
    
    # 5. Concat train, valid, and test set
    train = pd.concat([X_train, y_train], axis=1)
    valid = pd.concat([X_valid, y_valid], axis=1)
    test = pd.concat([X_test, y_test], axis=1)
    
    # Define a dictionary to package the outputs
    dataset = {
        "train": train,
        "valid": valid,
        "test": test
    }
    
    return dataset

In [101]:
@dsl.component(
    packages_to_install=[
        'google-cloud-bigquery',
        'google-cloud-storage',
        'db-dtypes',
        'joblib',
        'numpy',
        'pandas',
        'scikit-learn',
        'imblearn'
    ]
)
def preprocessing(
    config: dict,
    train: Input[Dataset],
    valid: Input[Dataset],
    test: Input[Dataset],
    X_train: Output[Dataset], y_train: Output[Dataset],
    X_valid: Output[Dataset], y_valid: Output[Dataset],
    X_test: Output[Dataset], y_test: Output[Dataset]):
    
    # 1. Load configuration file
    config = util.load_config()
    
    # 2. Load dataset
    train, valid, test = dataset.values()
    
    # 3. Preprocessing dataset
    train_processed = preprocessing.preprocessing(train, is_api=False)
    valid_processed = preprocessing.preprocessing(valid, is_api=False)
    test_processed = preprocessing.preprocessing(test, is_api=False)
    
    # 4. Feature engineering
    train_processed = preprocessing.feature_engineering(train_processed)
    valid_processed = preprocessing.feature_engineering(valid_processed)
    test_processed = preprocessing.feature_engineering(test_processed)
    
    # 5. Split X and y
    X_train, y_train = train_processed.drop(columns=config['label']), train_processed[config['label']]
    X_valid, y_valid = valid_processed.drop(columns=config['label']), valid_processed[config['label']]
    X_test, y_test = test_processed.drop(columns=config['label']), test_processed[config['label']]
    
    # Define dictionary to package the outputs
    processed_dataset = {
        "train": (X_train, y_train),
        "valid": (X_valid, y_valid),
        "test": (X_test, y_test)
    }
    
    return processed_dataset

In [112]:
@dsl.component(
    packages_to_install=[
        'google-cloud-bigquery',
        'google-cloud-storage',
        'db-dtypes',
        'joblib',
        'numpy',
        'pandas',
        'scikit-learn',
        'imblearn'
    ]
)
def modelling(
    config: dict,
    X_train: Input[Dataset], y_train: Input[Dataset],
    X_valid: Input[Dataset], y_valid: Input[Dataset],
    X_test: Input[Dataset], y_test: Input[Dataset],
    model: Output[Model]):
    
    # 1. Load configuration file
    config = util.load_config()
    
    # 2. Load dataset
    # X_train, y_train = processed_dataset["train"]
    # X_valid, y_valid = processed_dataset["valid"]
    # X_test, y_test = processed_dataset["test"]
    
    # 3. Random under sampling train set
    X_rus, y_rus = modelling.rus_fit_resample(X_train, y_train)
    
    # 4. Train model
    model = modelling.train_model(X_rus, y_rus, X_valid, y_valid, X_test, y_test)
    
    return model

In [117]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name=PIPELINE_DISPLAY_NAME
)
def pipeline(config: dict):
    data_preparation_op = data_preparation(config=config)
    
    preprocessing_op = preprocessing(config=config,
                                     train=data_preparation_op.outputs["train"],
                                     valid=data_preparation_op.outputs["valid"],
                                     test=data_preparation_op.outputs["test"])
    
    modelling_op = modelling(config=config, 
                             x_train=preprocessing_op.outputs["X_train"],
                             y_train=preprocessing_op.outputs["y_train"],
                             x_valid=preprocessing_op.outputs["X_valid"],
                             y_valid=preprocessing_op.outputs["y_valid"],
                             x_test=preprocessing_op.outputs["X_test"],
                             y_test=preprocessing_op.outputs["y_test"])

In [118]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path='pipeline.json'
)

In [120]:
job = aip.PipelineJob(
    display_name="bank-marketing-temp-pipeline",
    template_path='pipeline.json',
    parameter_values={"config":config},
    project = PROJECT,
    location= 'asia-southeast2',
    enable_caching=False,
)
job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/186470930776/locations/asia-southeast2/pipelineJobs/bank-marketing-train-pipeline-v01-20231008161729
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/186470930776/locations/asia-southeast2/pipelineJobs/bank-marketing-train-pipeline-v01-20231008161729')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/asia-southeast2/pipelines/runs/bank-marketing-train-pipeline-v01-20231008161729?project=186470930776


In [None]:
@dsl.component(
    packages_to_install=[
        'google-cloud-aiplatform'
    ]
)
def deploy(config: dict, model):
    # 1. Load configuration file
    config = util.load_config()
    
    # 2. Initialization
    project = config["PROJECT"]
    region = config["REGION"]
    aip.init(project=project, location=region)

In [None]:
pyyaml
joblib
pandas
scikit-learn
imbalanced-learn
typing

In [133]:
import yaml
yaml.__version__

'5.4.1'

In [134]:
!pip install pyyaml

[0m

In [142]:
job = aip.PipelineJob(
    display_name="bank-marketing-pipeline-test",
    template_path=os.path.abspath("my-pipeline.yaml"),
    parameter_values={"config":config},
    project = PROJECT,
    location= 'asia-southeast2'
)
job.submit()

KeyError: 'root'

In [143]:
os.path.abspath("my-pipeline.yaml")

'/home/jupyter/carlo/bank-marketing/my-pipeline.yaml'

In [None]:
ml_pipeline_job = aip.PipelineJob(
    display_name="bank-marketing-pipeline-test",
    template_path="my-pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=PROJECT_REGION,
    parameter_values={"project": PROJECT_ID, "display_name": PIPELINE_DISPLAY_NAME},
    enable_caching=True
)

ml_pipeline_job.submit()  

In [144]:
PIPELINE_ROOT

'gs://adi-mlops-bucket/pipelines/bank-marketing'