### Installation
Install the packages required for executing this notebook.

## Some of the source codes are based on
https://towardsdatascience.com/how-to-set-up-custom-vertex-ai-pipelines-step-by-step-467487f81cad 

In [1]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

Collecting kfp>2
  Downloading kfp-2.9.0.tar.gz (595 kB)
     ---------------------------------------- 0.0/595.6 kB ? eta -:--:--
      --------------------------------------- 10.2/595.6 kB ? eta -:--:--
      --------------------------------------- 10.2/595.6 kB ? eta -:--:--
     -- ---------------------------------- 41.0/595.6 kB 245.8 kB/s eta 0:00:03
     ----- ------------------------------- 92.2/595.6 kB 521.8 kB/s eta 0:00:01
     ----------------- -------------------- 276.5/595.6 kB 1.3 MB/s eta 0:00:01
     ----------------------------------- -- 563.2/595.6 kB 2.2 MB/s eta 0:00:01
     -------------------------------------- 595.6/595.6 kB 2.2 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting kfp-pipeline-spec==0.4.0 (from kfp>2)
  Downloading kfp_pipeline_spec-0.4.0-py3-none-any.whl.metadata (301 bytes)
Collecting kfp-server-api<2.4.0,>=2.1.0 (from kfp>2)
  Downloading kfp_server_api-2.3.0.tar.gz 

## Restart the kernel
Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [3]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Check the versions of the packages you installed. The KFP SDK version should be >=1.6.

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

Traceback (most recent call last):
  File "<string>", line 1, in <module>
ModuleNotFoundError: No module named 'kfp'
'grep' is not recognized as an internal or external command,
operable program or batch file.
Traceback (most recent call last):
  File "<string>", line 1, in <module>
ModuleNotFoundError: No module named 'google_cloud_pipeline_components'


In [2]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,ModelDeployOp)
from google_cloud_pipeline_components.types import artifact_types

#### Project and Pipeline Configurations

In [3]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "spatial-path-435110-f1"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://temp_as1de2024_3"

#### Create Pipeline Components


#### Pipeline Component : Remove outliers


In [4]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)

def remove_outliers(dataset: Input[Dataset], df_cleaned: Output[Dataset]):
    """Remove outliers using the IQR method."""
    import pandas as pd

    df = pd.read_csv(dataset.path, index_col=None)
    Q1 = df.quantile(0.25)
    Q3 = df.quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    df_clean = df[~((df < lower_bound) | (df > upper_bound)).any(axis=1)]
    df_clean.to_csv(df_cleaned.path + ".csv", index=False, encoding='utf-8-sig')


#### Pipeline Component : Train test split


In [5]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def train_test_split(clean_dataset: Input[Dataset], dataset_train: Output[Dataset], dataset_test: Output[Dataset]):
    '''train_test_split'''
    import pandas as pd
    import logging 
    import sys
    from sklearn.model_selection import train_test_split as tts
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 
    
    alldata = pd.read_csv(clean_dataset.path+".csv", index_col=None)
    train, test = tts(alldata, test_size=0.2)
    train.to_csv(dataset_train.path + ".csv" , index=False, encoding='utf-8-sig')
    test.to_csv(dataset_test.path + ".csv" , index=False, encoding='utf-8-sig')


#### Pipeline Component : Training RandomForestModel


In [6]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def train_forest(features: Input[Dataset], model: Output[Model]):
    '''train a RandomForestModel with default parameters'''
    import pandas as pd
    from sklearn.ensemble import RandomForestRegressor
    import pickle 
    
    
    data = pd.read_csv(features.path+".csv")
    rf_model = RandomForestRegressor(n_estimators=100, random_state=42)
    rf_model.fit(data.drop('Median_House_Value',axis=1), data['Median_House_Value'])
   # model.metadata["framework"] = "LR"
    file_name = model.path + f".pkl"
    with open(file_name, 'wb') as file:  
        pickle.dump(rf_model, file)   


#### Pipeline Component : Model Evaluation

In [7]:
@dsl.component(
    packages_to_install = [
       "pandas", "scikit-learn==1.3.2", "numpy"
    ], base_image="python:3.10.7-slim"
)
def rf_model_evaluation(
    test_set:  Input[Dataset],
    rf_model: Input[Model],
) -> NamedTuple('outputs', approval=bool):
  
    import pandas as pd
    import logging     
    import json
    import typing
    import pickle
    from sklearn.metrics import r2_score
    
    def threshold_check(val1):
        cond = False
        if val1 >= 0.3 :
            cond = True
        return cond

    data = pd.read_csv(test_set.path+".csv")
    
     #Loading the saved model with joblib
    m_filename = rf_model.path + ".pkl"
    model = pickle.load(open(m_filename, 'rb'))
    
    x_test = data.drop(columns=["Median_House_Value"])
    y_target = data['Median_House_Value']
    y_pred = model.predict(x_test)    
    
    
    outputs = NamedTuple('outputs', approval=bool)
    r2 = r2_score(y_target, y_pred)
    pipeline_check = threshold_check(float(r2))
    return outputs(pipeline_check)


### Upload Model and Metrics to Google Bucket 

In [8]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_model_to_gcs(project_id: str, model_repo: str, model: Input[Model]):
    '''upload model to gsc'''
    from google.cloud import storage   
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)    
  
    # upload the model to GCS
    client = storage.Client(project=project_id)
    bucket = client.bucket(model_repo)
    blob = bucket.blob('model.pkl')
    source_file_name= model.path + '.pkl'
   
    blob.upload_from_filename(source_file_name)    
    
    print(f"File {source_file_name} uploaded to {model_repo}.")

### Trigger the CI/CD for model deployment


In [9]:
@dsl.component(
    packages_to_install=["google-cloud-build"],
    base_image="python:3.10.7-slim"
)
def run_build_trigger(project_id:str, trigger_id:str):
    import sys
    from google.cloud.devtools import cloudbuild_v1    
    import logging 
    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 
    
    # Create a client
    client = cloudbuild_v1.CloudBuildClient()
    name = f"projects/{project_id}/locations/us-central1/triggers/{trigger_id}"
    # Initialize request argument(s)
    request = cloudbuild_v1.RunBuildTriggerRequest(        
        project_id=project_id,
        trigger_id=trigger_id,
        name=name
    )

    # Make the request
    operation = client.run_build_trigger(request=request)
    
    logging.info("Trigger the CI-CD Pipeline: " + trigger_id)

### Deploy the model at Vertext AI 


#### Define the Pipeline

In [10]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="california-houses-predictor-training-pipeline")
def pipeline(project_id: str, data_bucket: str, dataset_uri: str, model_repo: str, model_repo_uri:str, trigger_id:str):    
    
    dataset_op = kfp.dsl.importer(
        artifact_uri=dataset_uri,
        artifact_class=Dataset,
        reimport=False,
    )
    
    cleaned_dataset_op = remove_outliers(dataset=dataset_op.output)

    train_test_split_op = train_test_split(clean_dataset=cleaned_dataset_op.outputs["df_cleaned"])
       
    training_rf_job_run_op = train_forest(features=train_test_split_op.outputs["dataset_train"])
    
    model_evaluation_op = rf_model_evaluation(
        test_set=train_test_split_op.outputs["dataset_test"],
        rf_model=training_rf_job_run_op.outputs["model"],
    )
    
    with dsl.If(
        model_evaluation_op.outputs["approval"]== True,
        name="approve-model",
    ):
        upload_model_to_gc_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_rf_job_run_op.outputs['model']
        )    
        
        trigger_model_deployment_cicd = run_build_trigger(
            project_id=project_id,
            trigger_id=trigger_id
        ).after(upload_model_to_gc_op) 

#### Compile the pipeline into a JSON file

In [11]:
from kfp import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='california_house_predictor_training_pipeline.yaml')

#### Submit the pipeline run

In [12]:
import google.cloud.aiplatform as aip

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=PROJECT_ID,
    location=REGION,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="california-house-predictor",
    enable_caching=True,
    template_path="california_house_predictor_training_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        'project_id': PROJECT_ID, # makesure to use your project id 
        'data_bucket': 'data_as1de2024_3',  # makesure to use your data bucket name 
        'dataset_uri':'gs://data_as1de2024_3/California_Houses.csv',
        'model_repo':'models_as1de2024_3', # makesure to use your model bucket name 
        'model_repo_uri':'gs://models_as1de2024_3', # makesure to use your model bucket name
        'trigger_id':'333a4eb2-7dc1-4490-9de7-05ec8fd2e864'

        #333a4eb2-7dc1-4490-9de7-05ec8fd2e864
    }
)

job.run()

DefaultCredentialsError: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.