#### import libraries


In [1]:
import yaml
import pandas as pd
from pathlib import Path
import kfp.v2.dsl as dsl
from typing import NamedTuple
import matplotlib.pyplot as plt
from kfp.v2 import compiler, dsl
from google.cloud import bigquery
from google.cloud import aiplatform, aiplatform_v1
from kfp.v2.dsl import (
    Artifact,
    Dataset,
    Input,
    InputPath,
    Metrics,
    Model,
    Output,
    OutputPath,
    component,
    pipeline,
)

#### load config file
- set global variables


In [2]:
with open('config.yaml', 'r') as file:
    config = yaml.safe_load(file)

REGION = config['utils']['region']
PROJECT_ID = config['utils']['project_id']
BUCKET_NAME = config['utils']['bucket_name']
SERVICE_ACCOUNT = config['utils']['service_account']
BASE_IMAGE = config['utils']['base_image']
PIPELINE_ROOT = config['utils']['pipeline_root']
DATASET = config['utils']['dataset']
CONTAINER_IMAGE = config['utils']['container_image']   

#### get_dataframe component
- load data from bigquery

In [3]:
%%writefile load_data.py

from kfp.v2 import dsl
from kfp.v2.dsl import (Dataset, Output)
import yaml

with open('config.yaml', 'r') as file:
    config = yaml.safe_load(file)
    
BASE_IMAGE = config['utils']['base_image']

@dsl.component(base_image=BASE_IMAGE, install_kfp_package=False,)
def load_data(
    bq_table: str, 
    output_data_path: Output[Dataset]
):
    """Pull conversion data from bigquery and dump to csv in gcs"""

    import os
    import pandas as pd
    from google.cloud import bigquery

    project_number = os.environ["CLOUD_ML_PROJECT_ID"]
    bqclient = bigquery.Client(project=project_number)
    table = bigquery.TableReference.from_string(bq_table)
    rows = bqclient.list_rows(table)
    dataframe = rows.to_dataframe(create_bqstorage_client=True)
    dataframe = dataframe.sample(frac=1, random_state=2)
    dataframe.to_csv(output_data_path.path + ".csv", index=False)
    

Writing load_data.py


#### preprocessing component
- preprocess dataset for ML operations

In [4]:
%%writefile preprocessing.py

from kfp.v2 import dsl
from kfp.v2.dsl import (Dataset, Input, Output)
import yaml

with open('config.yaml', 'r') as file:
    config = yaml.safe_load(file)
    
BASE_IMAGE = config['utils']['base_image']

@dsl.component(base_image=BASE_IMAGE, install_kfp_package=False,)
def preprocessing(
    dataset: Input[Dataset], 
    train_dataset: Output[Dataset],
    test_dataset: Output[Dataset]
):
    """Preprocess data"""
    
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(dataset.path + ".csv")
    df = df.dropna()
    
    train_data, test_data = train_test_split(df)
    
    train_data.to_csv(train_dataset.path + ".csv", index=False)
    test_data.to_csv(test_dataset.path + ".csv", index=False)
    

Writing preprocessing.py


#### decision_tree_train component
- train model using decision_tree

In [5]:
%%writefile decision_tree_train.py

from kfp.v2 import dsl
from kfp.v2.dsl import (Dataset, Input, Model, Output)
import yaml

with open('config.yaml', 'r') as file:
    config = yaml.safe_load(file)
    
BASE_IMAGE = config['utils']['base_image']

@dsl.component(base_image=BASE_IMAGE, install_kfp_package=False,)
def decision_tree_train(
    train_dataset: Input[Dataset], 
    model: Output[Model]
):
    """Train decision_tree model on data, dump model"""
    
    import pickle
    import pandas as pd
    from sklearn.pipeline import Pipeline
    from sklearn.compose import ColumnTransformer
    from sklearn.tree import DecisionTreeClassifier    
    from sklearn.preprocessing import StandardScaler, OneHotEncoder
    
    df = pd.read_csv(train_dataset.path + ".csv")
    X_train = df.iloc[:, :-1]
    y_train = df.iloc[:, -1]
    
    # Preprocess features (categorical & numerical)
    # Num features
    numeric_transformer = Pipeline(steps=[('scaler', StandardScaler())])
    # Cat features
    categorical_transformer = Pipeline(steps=[('onehot', OneHotEncoder(handle_unknown='ignore'))])
    
    
    # Set target features to preprocess
    numeric_features = X_train.select_dtypes(include=['int64', 'float64']).columns
    categorical_features = X_train.select_dtypes(include=['object', 'category']).columns

    # Apply predefined transformations  
    preprocessor = ColumnTransformer(transformers=[
                    ('num', numeric_transformer, numeric_features),
                    ('cat', categorical_transformer, categorical_features)])
    
    
    classifier = DecisionTreeClassifier(criterion = 'entropy', random_state = 0)
    
    
    decision_tree_model = Pipeline(steps=[('preprocessor', preprocessor), ('classifier', classifier)])
    decision_tree_model.fit(X_train, y_train) 
    
    
    model.metadata["framework"] = "DT"
    file_name = model.path + f".pkl"
    with open(file_name, 'wb') as file:  
        pickle.dump(decision_tree_model, file)
        

Writing decision_tree_train.py


#### random_forest_model component
- train model using random_forest

In [6]:
%%writefile random_forest_train.py

from kfp.v2 import dsl
from kfp.v2.dsl import (Dataset, Input, Model, Output)
import yaml

with open('config.yaml', 'r') as file:
    config = yaml.safe_load(file)
    
BASE_IMAGE = config['utils']['base_image']

@dsl.component(base_image=BASE_IMAGE, install_kfp_package=False,)
def random_forest_train(
    train_dataset: Input[Dataset], 
    model: Output[Model]
):
    """Train random_forest model on data, dump model"""
    
    import pickle
    import pandas as pd
    from sklearn.pipeline import Pipeline
    from sklearn.compose import ColumnTransformer
    from sklearn.ensemble import RandomForestClassifier    
    from sklearn.preprocessing import StandardScaler, OneHotEncoder
    
    df = pd.read_csv(train_dataset.path + ".csv")
    X_train = df.iloc[:, :-1]
    y_train = df.iloc[:, -1]
    
    # Preprocess features (categorical & numerical)
    # Num features
    numeric_transformer = Pipeline(steps=[('scaler', StandardScaler())])
    # Cat features
    categorical_transformer = Pipeline(steps=[('onehot', OneHotEncoder(handle_unknown='ignore'))])
    
    
    # Set target features to preprocess
    numeric_features = X_train.select_dtypes(include=['int64', 'float64']).columns
    categorical_features = X_train.select_dtypes(include=['object', 'category']).columns

    # Apply predefined transformations  
    preprocessor = ColumnTransformer(transformers=[
                    ('num', numeric_transformer, numeric_features),
                    ('cat', categorical_transformer, categorical_features)])
    
    
    classifier = RandomForestClassifier(n_estimators = 10, criterion = 'entropy', random_state = 0)
    
    
    random_forest_model = Pipeline(steps=[('preprocessor', preprocessor), ('classifier', classifier)])
    random_forest_model.fit(X_train, y_train) 
    
    
    model.metadata["framework"] = "RF"
    file_name = model.path + f".pkl"
    with open(file_name, 'wb') as file:  
        pickle.dump(random_forest_model, file)
        

Writing random_forest_train.py


#### evaluate_model component
- get both models' evaluation
- determine the best in terms of accuracy

In [7]:
%%writefile evaluate_model.py

from kfp.v2 import dsl
from typing import NamedTuple
from kfp.v2.dsl import (Dataset, Input, Metrics, Model, Output)
import yaml

with open('config.yaml', 'r') as file:
    config = yaml.safe_load(file)
    
BASE_IMAGE = config['utils']['base_image']

@dsl.component(base_image=BASE_IMAGE, install_kfp_package=False,)
def evaluate_model(
    test_dataset: Input[Dataset], 
    dt_model: Input[Model],
    rf_model: Input[Model],
    metrics: Output[Metrics]
) -> NamedTuple("output", [("optimal_model", str)]):
    
    """Evaluate model on test data"""
    
    import pickle
    import pandas as pd
    import sklearn
    from sklearn.tree import DecisionTreeClassifier
    from sklearn.ensemble import RandomForestClassifier
    
    
    def best_model(rf_acc_value, dt_acc_value):
        """get best model base off accuracy"""
        opt_model = "random_forest"
        if dt_acc_value > rf_acc_value:
            opt_model = "decision_tree"
        return opt_model
  

    df = pd.read_csv(test_dataset.path + ".csv")
    X_test = df.iloc[:, :-1]
    y_test = df.iloc[:, -1]
    
     
    # decision tree model evaluation
    decision_tree_train = DecisionTreeClassifier()
    file_name_1 = dt_model.path + ".pkl"
    with open(file_name_1, 'rb') as file:  
        decision_tree_train = pickle.load(file)
    y_pred = decision_tree_train.predict(X_test)
    dt_accuracy = sklearn.metrics.accuracy_score(y_test, y_pred) * 100
    metrics.log_metric("dt_accuracy", round(dt_accuracy, 2))
    
    
    # random forest model evaluation
    random_forest_model = RandomForestClassifier()
    file_name_2 = rf_model.path + ".pkl"
    with open(file_name_2, 'rb') as file:  
        random_forest_model = pickle.load(file)
    y_pred = random_forest_model.predict(X_test)
    rf_accuracy = sklearn.metrics.accuracy_score(y_test, y_pred) * 100
    metrics.log_metric("rf_accuracy", round(rf_accuracy, 2))
    
    
    optimal_model = best_model(rf_accuracy, dt_accuracy)
    return (optimal_model,)


Writing evaluate_model.py


#### deploy_model component
- deploy the model wih the best evaluation value

In [8]:
%%writefile deploy_model.py

from kfp.v2 import dsl
from typing import NamedTuple
import yaml

with open('config.yaml', 'r') as file:
    config = yaml.safe_load(file)
    
BASE_IMAGE = config['utils']['base_image']

@dsl.component(base_image=BASE_IMAGE, install_kfp_package=False,)
def deploy_model(
    model: Input[Model],
    project: str,
    region: str,
    container_image : str, 
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model] 
):
    """Train sklearn model on bean data csv, dump model"""
    
    from google.cloud import aiplatform
    aiplatform.init(project=project, location=region)
    
    
    DISPLAY_NAME  = "conversion_model"
    MODEL_NAME = "conversion_model_v1"
    ENDPOINT_NAME = "conversion_endpoint"
    
    
    # Create vertex endpoint
    endpoint = aiplatform.Endpoint.create(
        display_name=ENDPOINT_NAME, 
        project=project, 
        location=region
    )
    
    
    # Upload model to vertex model registry
    model_upload = aiplatform.Model.upload(
        display_name = DISPLAY_NAME, 
        artifact_uri = model.uri.replace("model", ""),
        serving_container_image_uri =  container_image,
        serving_container_health_route=f"/v1/models/{MODEL_NAME}",
        serving_container_predict_route=f"/v1/models/{MODEL_NAME}:predict",
        serving_container_environment_variables={"MODEL_NAME": MODEL_NAME},       
    )
    
    
    model_deploy = model_upload.deploy(
        machine_type="n1-standard-4", 
        endpoint=endpoint,
        traffic_split={"0": 100},
        deployed_model_display_name=DISPLAY_NAME,
    )
    

    # Save data to the output params
    vertex_model.uri = model_deploy.resource_name
    

Writing deploy_model.py


#### modularize components into pipeline 

In [14]:
@dsl.pipeline(name="ml-pipeline", pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_table: str = DATASET,
    project: str = PROJECT_ID,
    region: str = REGION,
    container_image: str = CONTAINER_IMAGE
):
    
    from load_data import load_data
    from preprocessing import preprocessing
    from decision_tree_train import decision_tree_train
    from random_forest_train import random_forest_train
    from evaluate_model import evaluate_model
    from deploy_model import deploy_model
     
    # execute load_data component
    load_data_op = load_data(bq_table)
    
    # execute preprocessing component
    preprocessing_op = preprocessing(load_data_op.output)
    
    # execute decision_tree_train & random_forest_train components
    train_decision_tree_op = decision_tree_train(preprocessing_op.outputs["train_dataset"])
    train_random_forest_op = random_forest_train(preprocessing_op.outputs["train_dataset"])
    
    # execute evaluate_model component
    evaluate_model_op = evaluate_model(
        preprocessing_op.outputs["test_dataset"],
        train_decision_tree_op.output,
        train_random_forest_op.output
    )
    
    # set conditional to get best model
    # run if random_forest is the best model
    with dsl.Condition(
        evaluate_model_op.outputs["optimal_model"] == "random_forest",
        name="deploy-model",
    ) as condition:
        condition.display_name = "deploy_random_forest"
        deploy_model_op = deploy_model(
            model=train_random_forest_op.outputs['model'],
            project=project,
            region=region, 
            serving_container_image_uri = container_image)
        
    # run if decision_tree is the best model    
    with dsl.Condition(
        evaluate_model_op.outputs["optimal_model"] == "decision_tree",
        name="deploy-model",
    ) as condition:
        condition.display_name = "deploy_decision_tree"
        deploy_model_op = deploy_model(
            model=train_decision_tree_op.outputs['model'],
            project=project,
            region=region, 
            serving_container_image_uri = container_image)


In [15]:
# generate a JSON file that you'll use to run the pipeline:

compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json")


In [16]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")


In [17]:
# create vertex pipeline job

api_client = aiplatform.PipelineJob(
    display_name="conversion-model-pipeline",
    template_path="pipeline.json",
    job_id=f"conversion-model-pipeline-{TIMESTAMP}",
    enable_caching=True,
    project=PROJECT_ID,
    location=REGION,
)

In [18]:
# run vertex pipeline job

api_client.submit(service_account=SERVICE_ACCOUNT)