### Installation
Install the packages required for executing this notebook.

## Some of the source codes are based on
https://towardsdatascience.com/how-to-set-up-custom-vertex-ai-pipelines-step-by-step-467487f81cad 

In [1]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

Collecting kfp>2
  Downloading kfp-2.9.0.tar.gz (595 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m595.6/595.6 kB[0m [31m23.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Collecting kfp-pipeline-spec==0.4.0 (from kfp>2)
  Downloading kfp_pipeline_spec-0.4.0-py3-none-any.whl.metadata (301 bytes)
Collecting kfp-server-api<2.4.0,>=2.1.0 (from kfp>2)
  Downloading kfp_server_api-2.3.0.tar.gz (84 kB)
  Preparing metadata (setup.py) ... [?25ldone


## Restart the kernel
Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [2]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Check the versions of the packages you installed. The KFP SDK version should be >=1.6.

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 2.7.0
google-cloud-aiplatform==1.70.0
google_cloud_pipeline_components version: 2.17.0


In [3]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,ModelDeployOp)
from google_cloud_pipeline_components.types import artifact_types

#### Project and Pipeline Configurations

In [4]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "de2024-435020"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://temp_de2024_2117913"   # e.g., gs://temp_de2024

#### Create Pipeline Components

We can create a component from Python functions (inline) and from a container. We will first try inline python functions. 
Refer to  https://www.kubeflow.org/docs/components/pipelines/v2/components/lightweight-python-components/ for more information.

#### Pipeline Component : Train and Test Split

In [5]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def train_test_split(dataset: Input[Dataset], dataset_train: Output[Dataset], dataset_test: Output[Dataset]):
    '''train_test_split'''
    import pandas as pd
    import logging 
    import sys
    from sklearn.model_selection import train_test_split as tts
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 
    
    alldata = pd.read_csv(dataset.path, index_col=None)
    train, test = tts(alldata, test_size=0.3)
    train.to_csv(dataset_train.path + ".csv" , index=False, encoding='utf-8-sig')
    test.to_csv(dataset_test.path + ".csv" , index=False, encoding='utf-8-sig')

#### Pipeline Component : Training Three Models

In [6]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def train_models(features: Input[Dataset], model_lr: Output[Model], model_dt: Output[Model], model_rf: Output[Model]):
    '''Train LogisticRegression, DecisionTree, and RandomForest models'''
    import pandas as pd
    from sklearn.linear_model import LogisticRegression
    from sklearn.tree import DecisionTreeClassifier
    from sklearn.ensemble import RandomForestClassifier
    import pickle 
    
    data = pd.read_csv(features.path+".csv")
    X, y = data.drop('class', axis=1), data['class']
    
    # Train LogisticRegression
    lr_model = LogisticRegression()  # Local variable for Logistic Regression model
    lr_model.fit(X, y)
    model_lr.metadata["framework"] = "LR" # Add metadata to the model artifact
    with open(model_lr.path + ".pkl", 'wb') as file:  # Save to model_lr.path
        pickle.dump(lr_model, file)
        
    # Train Decision Tree
    dt_model = DecisionTreeClassifier()  # Local variable for Decision Tree model
    dt_model.fit(X, y)
    model_dt.metadata["framework"] = "DT"
    with open(model_dt.path + ".pkl", 'wb') as file:  # Save to model_dt.path
        pickle.dump(dt_model, file)
        
    # Train Random Forest
    rf_model = RandomForestClassifier()  # Local variable for Random Forest model
    rf_model.fit(X, y)
    model_rf.metadata["framework"] = "RF"
    with open(model_rf.path + ".pkl", 'wb') as file:  # Save to model_rf.path
        pickle.dump(rf_model, file)


#### Pipeline Component : Model Evaluation

See https://www.kubeflow.org/docs/components/pipelines/v2/data-types/parameters/ for NamedTuple

In [10]:
@dsl.component(
    packages_to_install = [
       "pandas", "scikit-learn==1.3.2", "numpy"
    ], base_image="python:3.10.7-slim"
)
def model_evaluation(
    test_set:  Input[Dataset],
    model_lr: Input[Model],
    model_dt: Input[Model],
    model_rf: Input[Model],
    best_model: Output[Model],
    metrics: Output[ClassificationMetrics],
    kpi: Output[Metrics]
):
  
    import pandas as pd
    import logging     
    from sklearn.metrics import roc_curve, confusion_matrix, accuracy_score
    import json
    import typing
    import pickle
    from numpy import nan_to_num
    
    def evaluate_model(model_path, X, y):
        with open(model_path + ".pkl", 'rb') as f:
            model = pickle.load(f)
        y_pred = model.predict(X)
        return accuracy_score(y, y_pred)

    data = pd.read_csv(test_set.path+".csv")
    X_test, y_test = data.drop('class', axis=1), data['class']
    
    # Evaluate all three models
    accuracy_lr = evaluate_model(model_lr.path, X_test, y_test)
    accuracy_dt = evaluate_model(model_dt.path, X_test, y_test)
    accuracy_rf = evaluate_model(model_rf.path, X_test, y_test)

    # Log metrics
    kpi.log_metric("Logistic Regression Accuracy", accuracy_lr)
    kpi.log_metric("Decision Tree Accuracy", accuracy_dt)
    kpi.log_metric("Random Forest Accuracy", accuracy_rf)

    # Select the best model based on accuracy
    best_accuracy = max(accuracy_lr, accuracy_dt, accuracy_rf)
    best_model_name = None
    if best_accuracy == accuracy_lr:
        best_model_name = "Logistic Regression"
        best_model.metadata["framework"] = "LR"
        with open(model_lr.path + ".pkl", 'rb') as f:
            with open(best_model.path + ".pkl", 'wb') as f_best:
                f_best.write(f.read())
    elif best_accuracy == accuracy_dt:
        best_model_name = "Decision Tree"
        best_model.metadata["framework"] = "DT"
        with open(model_dt.path + ".pkl", 'rb') as f:
            with open(best_model.path + ".pkl", 'wb') as f_best:
                f_best.write(f.read())
    else:
        best_model_name = "Random Forest"
        best_model.metadata["framework"] = "RF"
        with open(model_rf.path + ".pkl", 'rb') as f:
            with open(best_model.path + ".pkl", 'wb') as f_best:
                f_best.write(f.read())

    print(f"Best Model: {best_model_name} with accuracy {best_accuracy}")

    # Log confusion matrix and ROC curve for the best model
    # (you can extend this based on your existing evaluation component)

    # Log final selected model's accuracy
    kpi.log_metric("Best Model Accuracy", best_accuracy)

### Upload Model and Metrics to Google Bucket 

In [7]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_best_model_to_gcs(project_id: str, model_repo: str, best_model: Input[Model]):
    '''Upload model to Google Cloud Storage'''
    from google.cloud import storage   
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)    
  
    # Initialize the Cloud Storage client
    client = storage.Client(project=project_id)
    bucket = client.bucket(model_repo)
    
    # Set the destination filename for the best model
    blob = bucket.blob('best_model.pkl')
    source_file_name = best_model.path + '.pkl'
   
    blob.upload_from_filename(source_file_name)    
    
    print(f"File {source_file_name} uploaded to {model_repo} as best_model.pkl.")

### Trigger Another CI_CD Pipeline

In [8]:
@dsl.component(
    packages_to_install=["google-cloud-build"],
    base_image="python:3.10.7-slim"
)
def run_build_trigger(project_id:str, trigger_id:str):
    import sys
    from google.cloud.devtools import cloudbuild_v1    
    import logging 
    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 
    
    # Create a client
    client = cloudbuild_v1.CloudBuildClient()
    name = f"projects/{project_id}/locations/us-central1/triggers/{trigger_id}"
    # Initialize request argument(s)
    request = cloudbuild_v1.RunBuildTriggerRequest(        
        project_id=project_id,
        trigger_id=trigger_id,
        name=name
    )

    # Make the request
    operation = client.run_build_trigger(request=request)
    
    logging.info("Trigger the CI-CD Pipeline: " + trigger_id)

#### Define the Pipeline

In [11]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="heartdisease-predictor-training-pipeline"
)
def pipeline(project_id: str, data_bucket: str, dataset_uri: str, model_repo: str, trigger_id: str):    
    
    dataset_op = kfp.dsl.importer(
        artifact_uri=dataset_uri,
        artifact_class=Dataset,
        reimport=False,
    )
     
    train_test_split_op = train_test_split(dataset=dataset_op.output)
    
    # Train all three models (Logistic Regression, Decision Tree, Random Forest)
    training_models_job_run_op = train_models(
        features=train_test_split_op.outputs["dataset_train"]
    )
    
    # Evaluate models and pick the best one
    model_evaluation_op = model_evaluation(
        test_set=train_test_split_op.outputs["dataset_test"],
        model_lr=training_models_job_run_op.outputs["model_lr"],
        model_dt=training_models_job_run_op.outputs["model_dt"],
        model_rf=training_models_job_run_op.outputs["model_rf"]
    )
    
    # Upload the best model to Google Cloud Storage
    upload_model_to_gc_op = upload_best_model_to_gcs(
        project_id=project_id,
        model_repo=model_repo,
        best_model=model_evaluation_op.outputs['best_model']
    )
    
    # Trigger the CI/CD pipeline to deploy the best model
    trigger_model_deployment_cicd = run_build_trigger(
        project_id=project_id,
        trigger_id=trigger_id
    ).after(upload_model_to_gc_op)
      

#### Compile the pipeline into a YAML file and upload to bucket

In [10]:
#from kfp import compiler
#compiler.Compiler().compile(pipeline_func=pipeline, package_path='heartdisease_predictor_training_pipeline.yaml')

In [12]:
# Required imports
from kfp import compiler
from google.cloud import storage

# Compile the pipeline and save the YAML file locally
pipeline_func = pipeline  # Assuming the pipeline function is already defined
yaml_filename = 'heartdisease_predictor_training_pipeline.yaml'

# Compile the pipeline into a YAML file
compiler.Compiler().compile(
    pipeline_func=pipeline_func,
    package_path=yaml_filename
)

# Set your GCS bucket details
bucket_name = 'temp_de2024_2117913'
destination_blob_name = 'heartdisease_predictor_training_pipeline.yaml'

# Upload the file to the GCS bucket
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

# Upload the compiled pipeline YAML file
blob.upload_from_filename(yaml_filename)

print(f"File {yaml_filename} uploaded to {destination_blob_name} in bucket {bucket_name}.")


File heartdisease_predictor_training_pipeline.yaml uploaded to heartdisease_predictor_training_pipeline.yaml in bucket temp_de2024_2117913.


#### Submit the pipeline run

In [13]:
import google.cloud.aiplatform as aip

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=PROJECT_ID,
    location=REGION,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="heartdisease-predictor",
    enable_caching=False,  # Make this False if you want to force re-execution of the pipeline
    template_path="heartdisease_predictor_training_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        'project_id': PROJECT_ID,
        'data_bucket': 'data_de2024_2117913',
        'dataset_uri': 'gs://data_de2024_2117913/training_set.csv',
        'model_repo': 'models_de2024_2117913',
        'trigger_id': 'd80ce0cd-b147-4934-a696-e569db1ee084'
    }
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/354305596470/locations/us-central1/pipelineJobs/heartdisease-predictor-training-pipeline-20241024215044
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/354305596470/locations/us-central1/pipelineJobs/heartdisease-predictor-training-pipeline-20241024215044')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/heartdisease-predictor-training-pipeline-20241024215044?project=354305596470
PipelineJob projects/354305596470/locations/us-central1/pipelineJobs/heartdisease-predictor-training-pipeline-20241024215044 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/354305596470/locations/us-central1/pipelineJobs/heartdisease-predictor-training-pipeline-20241024215044 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/354305596470/locations/us-central1/pipelineJobs/heartdisease-predictor-traini