In [1]:
import os
from kfp import dsl
from typing import NamedTuple
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        Markdown)

from kfp import compiler
from google.cloud import storage
from google.cloud import bigquery
from google.cloud.aiplatform import pipeline_jobs

In [2]:
# please insert your service account path here
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r"your Service account path"

In [3]:
PROJECT_ID = "dla-ml-specialization"
PIPELINE_ROOT = "gs://dla-ml-specialization-dataset-2/pipelines/"

In [4]:
@dsl.component(base_image='python:3.10',
    packages_to_install = [
        "pandas",
        "google-cloud-bigquery",
        "db-dtypes",
        "NumPy==1.24.4",
        "SciPy==1.12.0"
    ],
)
def Load_from_BQ(
    config: dict,
    Train_data_BQ: Output[Dataset]
):
    import pandas as pd
    import pickle
    from google.cloud import bigquery

    # 1. Load configuration file
    config = config
    # 2. Read data from BQ
    client = bigquery.Client()
    train_sql = """
        SELECT *
        FROM `dla-ml-specialization.demo_dataset_2.train`
    """
    # Run a Standard SQL query with the project set explicitly
    project_id = config['project_id']
    train_data = client.query(train_sql, project=project_id).to_dataframe()

    train_data.to_csv(f"{Train_data_BQ.path}.csv", index=False)

In [5]:
@dsl.component(base_image='python:3.10',
    packages_to_install = [
        "pandas",
        "db-dtypes",
        "NumPy==1.24.4",
        "SciPy==1.12.0",
        "scikit-learn==1.4.1.post1"
    ],
)
def Preprocessing(
    config: dict,
    Train_data_BQ: Input[Dataset],
    x_trains: Output[Dataset],
    x_tests: Output[Dataset],
    y_trains: Output[Dataset],
    y_tests: Output[Dataset]
):
    import pandas as pd
    import pickle
    from sklearn.model_selection import train_test_split
    
    # training data
    df = pd.read_csv(f"{Train_data_BQ.path}.csv")
    df.drop(['User_ID'], axis = 1, inplace = True)
    df.drop(['Product_ID'], axis = 1, inplace = True)
    df['Product_Category_2'] = df['Product_Category_2'].fillna(df['Product_Category_2'].mode()[0])
    df['Product_Category_3'] = df['Product_Category_3'].fillna(df['Product_Category_3'].mode()[0])
    df['Product_Category_1'] = df['Product_Category_1'].astype('object')
    df['Product_Category_2'] = df['Product_Category_2'].astype('object')
    df['Product_Category_3'] = df['Product_Category_3'].astype('object')
    one_hot_encoded_data = pd.get_dummies(df, columns = ['Gender','City_Category','Stay_In_Current_City_Years','Age'], dtype=float)
    train,test= train_test_split(one_hot_encoded_data,test_size = 0.2,random_state=42)

    # Define your filter condition
    filter_condition = train['Gender_F'] == 1  # Adjust 'value' according to your filter condition

    # Apply the filter
    filtered_df = train[filter_condition]

    # random oversampling
    oversample = filtered_df.sample(n=75000)
    train = pd.concat([train, oversample], ignore_index=True)

    x_test = test.drop(['Purchase'],axis=1)
    y_test = test['Purchase']

    x_train = train.drop(['Purchase'],axis=1)
    y_train = train['Purchase']
    
    x_train.to_csv(f"{x_trains.path}.csv", index=False)
    x_test.to_csv(f"{x_tests.path}.csv", index=False)
    y_train.to_csv(f"{y_trains.path}.csv", index=False)
    y_test.to_csv(f"{y_tests.path}.csv", index=False)  


In [6]:
@dsl.component(base_image='python:3.10',
    packages_to_install = [
        "pandas",
        "db-dtypes",
        "NumPy==1.24.4",
        "SciPy==1.12.0",
        "scikit-learn==1.4.1.post1"
    ],
)
def Training(
    config: dict,
    x_trains: Input[Dataset],
    y_trains: Input[Dataset],
    models: Output[Model]
):
    import pandas as pd
    import pickle
    import numpy as np
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.model_selection import RandomizedSearchCV

    x_train = pd.read_csv(f"{x_trains.path}.csv")
    y_train = pd.read_csv(f"{y_trains.path}.csv")

    # Number of trees in random forest
    n_estimators = [int(x) for x in np.linspace(start = 10, stop = 100, num = 5)]
    # Maximum number of levels in tree
    max_depth = [2,10]
    # Minimum number of samples required to split a node
    min_samples_split = [2, 5]
    # Minimum number of samples required at each leaf node
    min_samples_leaf = [1, 3]

    param_grid = {'n_estimators': n_estimators,
                'max_depth': max_depth,
                'min_samples_leaf': min_samples_leaf,
                'min_samples_split': min_samples_split}


    #defining model
    rf_Model = RandomForestRegressor()

    #tuning hyperparameter
    rf_RandomGrid = RandomizedSearchCV(estimator = rf_Model, param_distributions = param_grid, cv = 2, verbose=2, n_jobs = 4)
    rf_RandomGrid.fit(x_train, y_train)
    params = rf_RandomGrid.best_estimator_.get_params()

    #fitting model
    RF = RandomForestRegressor(n_estimators=params['n_estimators'],
                            max_depth=params['max_depth'],
                            min_samples_leaf=params['min_samples_leaf'],
                            min_samples_split=params['min_samples_split'],
                            random_state=42)
    RF.fit(x_train,y_train)

    filename = f"{models.path}.pkl"
    pickle.dump(RF, open(filename, 'wb'))

In [7]:
@dsl.component(base_image='python:3.10',
    packages_to_install = [
        "pandas",
        "db-dtypes",
        "NumPy==1.24.4",
        "SciPy==1.12.0",
        "scikit-learn==1.4.1.post1"
    ],
)
def Evaluation(
    config: dict,
    x_tests: Input[Dataset],
    y_tests: Input[Dataset],
    models: Input[Model],
    smetrics: Output[Metrics]
):
    import pandas as pd
    import pickle
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.metrics import r2_score, mean_squared_error,mean_absolute_error

    model_path = f"{models.path}.pkl"
    model = pickle.load(open(model_path, 'rb'))

    x_test = pd.read_csv(f"{x_tests.path}.csv")
    y_test = pd.read_csv(f"{y_tests.path}.csv")

    y_pred = model.predict(x_test)

    mae = mean_absolute_error(y_test,y_pred)
    r2 = r2_score(y_test, y_pred)

    smetrics.log_metric("mean_absolute_error", mae)
    smetrics.log_metric("r2_score",r2)

In [8]:
#component deploy if endpoint already existed or haven't existed using if condition
@component(base_image='python:3.10',
    packages_to_install=[
        "google-cloud-aiplatform==1.51.0",
        "google-cloud-storage==2.14.0"
        ])
def Deploy(
    models: Input[Model],
    config: dict):
  
    import logging
    from google.cloud import aiplatform
    from google.cloud import storage
    
    project = config['project_id']
    region = config['region']
    aiplatform.init(project=project, location=region)

    print(models)
    print(models.uri)
    import os
    path,file = os.path.split(f"{models.uri}.pkl")

    import datetime
    
    #moving model.pkl to a fixed gcs path
    gcs_client = storage.Client()
    gcs_bucket = gcs_client.get_bucket("dla-ml-specialization-dataset-2")
    #moving file from GCS source to sorted folder
    name = str(models.uri)
    new_path = name.replace("gs://dla-ml-specialization-dataset-2/", "") 
    object_name = 'model.pkl'
    destination_bucket = storage.Bucket(gcs_client, 'dla-ml-specialization-dataset-2')
    source_blob = gcs_bucket.blob(f'{new_path}.pkl')
    destination_name = f'model/{object_name}'
    blob_copy = gcs_bucket.copy_blob(source_blob, destination_bucket, destination_name)

    
    deployed_model = aiplatform.Model.upload(
        display_name="model_dataset_2",
        model_id = "model_dataset_2",
        parent_model = "1621700486132400128", #existing model_id with the same model_id must exist
        artifact_uri = "gs://dla-ml-specialization-dataset-2/model/",
        serving_container_image_uri="asia-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest"
    )
    endpoint = deployed_model.deploy(machine_type="n1-standard-2",
                            deployed_model_display_name = "model_dataset_2",
                            min_replica_count=1,
                            max_replica_count=1)
    # if config['endpoint_id'] != None:
    #     endpoint = aiplatform.Endpoint(
    #     endpoint_name= config['endpoint_id'],
    #     project=project,
    #     location=region
    #     )
        
    #     # datetime.datetime.now().strftime('%Y%m%d%H%M%S')
    #     # serving image https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers#xgboost
    #     #upload model to mdoel registry
    #     deployed_model = aiplatform.Model.upload(
    #         display_name="model-dataset-2",
    #         model_id = "model-dataset-2",
    #         parent_model = "model-dataset-2", #existing model_id with the same model_id must exist
    #         artifact_uri = path,
    #         serving_container_image_uri="asia-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest"
    #     )
    #     #undeploy previous model version from endpoint
    #     deployed_model_id = endpoint.gca_resource.deployed_models[0].id
    #     endpoint.undeploy(deployed_model_id)

    #     #deploy the new model version to the same endpoint as previous model 
    #     endpoint_model = deployed_model.deploy(
    #         endpoint = endpoint,
    #         deployed_model_display_name = "model-dataset-2",
    #         machine_type="n1-standard-2",
    #         min_replica_count=1,
    #         max_replica_count=1)
    # else:
    #     # datetime.datetime.now().strftime('%Y%m%d%H%M%S')
    #     # serving image https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers#xgboost
    #     deployed_model = aiplatform.Model.upload(
    #             display_name="model-dataset-2",
    #             model_id = "model-dataset-2",
    #             artifact_uri = path,
    #             serving_container_image_uri="asia-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest"
    #     )
    #     endpoint = deployed_model.deploy(machine_type="n1-standard-2",
    #                             deployed_model_display_name = "model-dataset-2",
    #                             min_replica_count=1,
    #                             max_replica_count=1)

In [9]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT + "dataset-2-model",
    # A name for the pipeline. Use to determine the pipeline Context.
    name="dataset-2-model",
)
def pipeline(config: dict):
    dataset_op = Load_from_BQ(config= config)
    preproc_op = Preprocessing(config= config,
                               Train_data_BQ= dataset_op.outputs['Train_data_BQ'])
    training_op = Training(config= config,
                           x_trains= preproc_op.outputs['x_trains'],
                           y_trains= preproc_op.outputs['y_trains'])
    eval_op = Evaluation(
        config = config,
        x_tests= preproc_op.outputs['x_tests'],
        y_tests= preproc_op.outputs['y_tests'],
        models= training_op.outputs['models']
    )

    deploy_op = Deploy(config= config,
                       models= training_op.outputs['models'])

    # with dsl.Condition(
    #     eval_op.outputs["deploy"] == "true",
    #     name="deploy",
    # ):

    #   deploy_op = Deploy(model = training_op.outputs["model"], 
    #                      config= config)

    # we need a solution for xgb models
    # its here https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api#aiplatform_deploy_model_custom_trained_model_sample-python
  

In [10]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path='ml-specialization-dataset2.json')

In [12]:
import os
import yaml
import joblib
config_dir =r"config.yaml"

def load_config() -> dict:
    # Try to load YAML file
    try:
        with open(config_dir, "r") as file:
            config = yaml.safe_load(file)
    except FileNotFoundError as fe:
        raise RuntimeError("Parameters file not found in path.")
    
    # Return params in dict format
    return config

def pickle_load(file_path: str):
    # Load and return pickle file
    return joblib.load(file_path)

def pickle_dump(data, file_path: str) -> None:
    # Dump data into file
    joblib.dump(data, file_path)

params = load_config()

In [13]:
# Load configuration file
config = load_config()

In [None]:
job = pipeline_jobs.PipelineJob(
    display_name="ml-specialization-dataset2-pipeline",
    template_path='ml-specialization-dataset2.json',
    parameter_values={"config":config},
    project = PROJECT_ID,
    location= 'asia-southeast2',
    enable_caching=False,
)
job.submit()