### Installation
Install the packages required for executing this notebook.

In [None]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

## Restart the kernel
Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Check the versions of the packages you installed. The KFP SDK version should be >2.

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 2.14.4
google-cloud-aiplatform==1.118.0
google_cloud_pipeline_components version: 2.21.0


In [2]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.types import artifact_types

#### Project and Pipeline Configurations

In [None]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "de2025-472319"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://temp_de2025_2062061"

#### Pipeline Component : Data Ingestion

In [4]:
@dsl.component(
    packages_to_install=["pandas","google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def download_data(project_id: str, bucket: str, file_name: str, dataset: Output[Dataset]):
    '''download data'''
    from google.cloud import storage
    import pandas as pd
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Downloaing the file from a google bucket 
    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket)
    blob = bucket.blob(file_name)
    blob.download_to_filename(dataset.path + ".csv")
    logging.info('Downloaded Data!')

## Splitting data into train and test


In [None]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def train_test_split(dataset: Input[Dataset], dataset_train: Output[Dataset], dataset_test: Output[Dataset]):
    '''train_test_split'''
    import pandas as pd
    import logging 
    import sys
    from sklearn.model_selection import train_test_split
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 
    
    # Split into train and test
    data = pd.read_csv(dataset.path+".csv", index_col=None)
    train, test = train_test_split(data, test_size=0.2, random_state=42)
    train.to_csv(dataset_train.path + ".csv" , index=False, encoding='utf-8-sig')
    test.to_csv(dataset_test.path + ".csv" , index=False, encoding='utf-8-sig')

#### Pipeline Component : Training-XGBoost


In [None]:
@dsl.component(
    packages_to_install=['pandas', 'xgboost', 'scikit-learn==1.3.2'],
    base_image="python:3.10.7-slim"
)
def train_xgboost (train_set: Input[Dataset], test_set: Input[Dataset], out_model: Output[Model]) -> NamedTuple('outputs', metrics=dict):
    '''train a XGBoost model with default parameters'''
    import pandas as pd
    import numpy as np
    import pickle
    from xgboost import XGBRegressor
    from sklearn.model_selection import GridSearchCV
    from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
    import json
    import logging 
    import sys
    import os
        
    # Load the train and test sets into dataframes
    df_train = pd.read_csv(train_set.path+".csv")
    df_test = pd.read_csv(test_set.path+".csv")

    logging.info(df_train.columns)
    logging.info(df_test.columns)  
        
    # split into input (X) and output (Y) variables
    X_train, y_train = df_train.drop('house_value', axis=1), df_train['house_value']
    X_test, y_test = df_test.drop('house_value', axis=1), df_test['house_value']

    # Train XGBoost Model to get feature importance
    xgb_model = XGBRegressor(n_estimators=100, random_state=42)
    xgb_model.fit(X_train, y_train)

    # XGBoost with top 15 features
    y_pred = xgb_model.predict(X_test)

    # evaluate the model
    y_pred = xgb_model.predict(X_test)

    # Calculate evaluation metrics and store them in a dictionary
    metrics_dict = {
        "mse": mean_squared_error(y_test, y_pred),
        "mae": mean_absolute_error(y_test, y_pred),
        "r2": r2_score(y_test, y_pred),
        "rmse": np.sqrt(mean_squared_error(y_test, y_pred))
    }    
    logging.info(metrics_dict)  
    
    # Store some metadata
    out_model.metadata["file_type"] = ".pkl"
    out_model.metadata["algorithm"] = "xgboost"
    
    # Save the model to a pickle file
    model_file = out_model.path + ".pkl"
    with open(model_file, 'wb') as f:  
        pickle.dump(xgb_model, f)   
    
    # Return the metrics dictionary as an output
    outputs = NamedTuple('outputs', metrics=dict)
    return outputs(metrics_dict)

#### Pipeline Component : Training LinearRegression

In [None]:
@dsl.component(
    packages_to_install=['pandas', 'scikit-learn==1.3.2'],
    base_image="python:3.10.7-slim"
)
def train_lr (train_set: Input[Dataset], test_set: Input[Dataset], out_model: Output[Model]) -> NamedTuple('outputs', metrics=dict):
    '''train a LinearRegression with default parameters'''
    import pandas as pd
    import numpy as np
    from sklearn.linear_model import LinearRegression
    from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
    import json
    import logging 
    import sys
    import os
    import pickle  
       
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Load the train and test sets into dataframes
    df_train = pd.read_csv(train_set.path+".csv")
    df_test = pd.read_csv(test_set.path+".csv")

    logging.info(df_train.columns)
    logging.info(df_test.columns)  
        
    # split into input (X) and output (Y) variables
    X_train, y_train = df_train.drop('house_value', axis=1), df_train['house_value']
    X_test, y_test = df_test.drop('house_value', axis=1), df_test['house_value']

    # Fit the model
    lr_model = LinearRegression()
    lr_model.fit(X_train, y_train)
   
    # Predict using the retrained model
    y_pred = lr_model.predict(X_test)

    # Calculate evaluation metrics and store them in a dictionary
    metrics_dict = {
        "mse": mean_squared_error(y_test, y_pred),
        "mae": mean_absolute_error(y_test, y_pred),
        "r2": r2_score(y_test, y_pred),
        "rmse": np.sqrt(mean_squared_error(y_test, y_pred))
    }    
    logging.info(metrics_dict)  
    
    # Store some metadata
    out_model.metadata["file_type"] = ".pkl"
    out_model.metadata["algorithm"] = "lr"
    
    # Save the model to a pickle file
    model_file = out_model.path + ".pkl"
    with open(model_file, 'wb') as f:  
        pickle.dump(lr_model, f)   
    
    # Return the metrics dictionary as an output
    outputs = NamedTuple('outputs', metrics=dict)
    return outputs(metrics_dict)

#### Pipeline Component : Algorithm Selection 

In [None]:
@dsl.component(
    base_image="python:3.10.7-slim"
)
def compare_model(xgboost_metrics: dict, lr_metrics: dict) -> str:
    import logging
    import json
    import sys
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    logging.info(xgboost_metrics)
    logging.info(lr_metrics)

    xgboost_r2 = xgboost_metrics.get('r2')
    xgboost_rmse = xgboost_metrics.get('rmse')

    lr_r2 = lr_metrics.get('r2')
    lr_rmse = lr_metrics.get('rmse')


    # Compare metrics and select the best model
    if xgboost_r2 > lr_r2:
        return "XGBoost"
    elif xgboost_r2 == lr_r2:
        # If R2 scores are equal, compare RMSE
        if xgboost_rmse < lr_rmse:
            return "XGBoost"
        else:
            return "LR"
    else:
        return "LR"

### Upload Model and Metrics to Google Bucket 

In [None]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_model_to_gcs(project_id: str, model_repo: str, model: Input[Model]):
    '''upload model to gsc'''
    from google.cloud import storage   
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)    
  
    # upload the model to GCS
    client = storage.Client(project=project_id)
    bucket = client.bucket(model_repo)
    blob = bucket.blob(str(model.metadata["algorithm"]) + '_model' + str(model.metadata["file_type"])) 
    blob.upload_from_filename(model.path + str(model.metadata["file_type"]))       
    
    print("Saved the model to GCP bucket : " + model_repo)

# Trigger deployment of the serving app

In [None]:
@dsl.component(
    packages_to_install=["google-cloud-build"],
    base_image="python:3.10.7-slim"
)
def run_build_trigger(project_id:str, trigger_id:str):
    import sys
    from google.cloud.devtools import cloudbuild_v1    
    import logging 
    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 
    
    # Create a client
    client = cloudbuild_v1.CloudBuildClient()
    name = f"projects/{project_id}/locations/us-central1/triggers/{trigger_id}"
    # Initialize request argument(s)
    request = cloudbuild_v1.RunBuildTriggerRequest(        
        project_id=project_id,
        trigger_id=trigger_id,
        name=name
    )

    # Make the request
    operation = client.run_build_trigger(request=request)
    
    logging.info("Trigger the CI-CD Pipeline: " + trigger_id)

#### Define the Pipeline

In [None]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="house_price_prediction_pipeline",)
def pipeline(project_id: str, data_bucket: str, file_name: str, model_repo: str, trigger_id_lr: str, trigger_id_xgb: str):
    
    # Download the dataset
    di_op = download_data(
        project_id=project_id,
        bucket=data_bucket,
        file_name=file_name
    )

    # Split the dataset into a train (80%) and test (20%) set
    train_test_split_op = train_test_split(dataset=di_op.outputs["dataset"]).after(di_op)

    # Train XGBoost model
    training_xgboost_op = train_xgboost(
        train_set=train_test_split_op.outputs["dataset_train"],
        test_set=train_test_split_op.outputs["dataset_test"]
    ).after(train_test_split_op)

    # Train Linear Regression model
    training_lr_op = train_lr(
        train_set=train_test_split_op.outputs["dataset_train"],
        test_set=train_test_split_op.outputs["dataset_test"]
    ).after(train_test_split_op)

    # Compare the models
    compare_model_op = compare_model(
        xgboost_metrics=training_xgboost_op.outputs["metrics"],
        lr_metrics=training_lr_op.outputs["metrics"]
    ).after(training_xgboost_op, training_lr_op)

    # Define branching based on the best model
    with dsl.If(compare_model_op.output == "XGBoost"):
        upload_xgboost_model_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_xgboost_op.outputs["out_model"]
        ).after(compare_model_op)

        # Trigger the CI CD pipeline for building the serving app using this new model
        trigger_model_deployment_cicd = run_build_trigger(
            project_id=project_id,
            trigger_id=trigger_id_xgb
        ).after(upload_xgboost_model_op)

    with dsl.If(compare_model_op.output == "LR"):
        upload_lr_model_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_lr_op.outputs["out_model"]
        ).after(compare_model_op)

        # Trigger the CI CD pipeline for building the serving app using this new model
        trigger_model_deployment_cicd = run_build_trigger(
            project_id=project_id,
            trigger_id=trigger_id_lr
        ).after(upload_lr_model_op)


#### Compile the pipeline into a JSON file

In [None]:
from kfp import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='house-price-prediction-pipeline.yaml')

#### Submit the pipeline run

In [None]:
import google.cloud.aiplatform as aip

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=PROJECT_ID,
    location=REGION,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="house-price-prediction-pipeline",
    enable_caching=False,
    template_path="house-price-prediction-pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        'project_id': PROJECT_ID, # makesure to use your project id 
        'data_bucket': 'data_de2025_2062061',  # makesure to use your data bucket nam 
        'file_name':'California_Houses_Processed.csv',
        'model_repo':'models_de2025_2062061', # makesure to use your model bucket name 
        'trigger_id_lr': '09a20369-9c53-4f5c-8a91-fe9b485abe88',
        'trigger_id_xgb': '7146f791-b966-4303-828e-1dff2e5622d2'
    }
)

job.run()