# Vertex AI Pipeline for Heart Disease Detection

This code is based on the lab notebooks from Dr. Indika Kumara and this implementation from Kaggle [notebook](https://www.kaggle.com/code/tanmay111999/heart-failure-prediction-cv-score-90-5-models/) by Tanmay Deshpande.

The dataset is available [here](https://www.kaggle.com/datasets/fedesoriano/heart-failure-prediction) by fedesoriano.

## Install Packages

In [None]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

Test Kernel:

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [None]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.types import artifact_types

## Project and Pipeline Configuration

In [None]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "core-synthesis-435410-v9"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://temp_de2024_mh"   # e.g., gs://temp_de2024

### Pipeline Component: Load, prepare and split data 

In [None]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn"],
    base_image="python:3.10.7-slim"
)
def train_test_split(dataset: Input[Dataset], dataset_train: Output[Dataset], dataset_test: Output[Dataset]):
    '''train_test_split'''
    import pandas as pd
    import logging 
    import sys
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import LabelEncoder
    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 

    df = pd.read_csv(dataset.path, index_col=None)

    le = LabelEncoder()

    # Encode categorical features and log the transformations
    for column in ['Sex', 'ChestPainType', 'RestingECG', 'ExerciseAngina', 'ST_Slope']:
        df[column] = le.fit_transform(df[column])
        logging.info(f"Transformation for {column}: {dict(zip(le.classes_, le.transform(le.classes_)))}")

    # Drop highly correlated features and missing values
    df = df.drop(['RestingBP', 'RestingECG'], axis=1)
    df = df.dropna()

    train, test = train_test_split(df, test_size=0.3, random_state=73)
    train.to_csv(dataset_train.path + ".csv" , index=False, encoding='utf-8-sig')
    test.to_csv(dataset_test.path + ".csv" , index=False, encoding='utf-8-sig')

### Pipeline Component: Train Logistic Regression (LR)

In [None]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def train_lr(features: Input[Dataset], model: Output[Model]):
    '''train a LogisticRegression model'''
    import pandas as pd
    from sklearn.linear_model import LogisticRegression        
    import pickle 
    
    # Load the dataset
    data = pd.read_csv(features.path + ".csv")
    
    # Initialize and train the Logistic Regression model
    model_lr = LogisticRegression(random_state=73, C=10, penalty= 'l2')
    model_lr.fit(data.drop('HeartDisease', axis=1), data['HeartDisease'])
    
    # Add metadata to the model
    model.metadata["framework"] = "Logistic Regression"
    
    # Save the trained model to a file
    file_name =f"{model.path}.pkl"
    with open(file_name, 'wb') as file:  
        pickle.dump(model_lr, file)

### Pipeline Component: Train Decision Tree (DT)

In [None]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def train_dt(features: Input[Dataset], model: Output[Model]):
    '''train a Decision Tree model'''
    import pandas as pd
    from sklearn.tree import DecisionTreeClassifier       
    import pickle 
    
    # Load the dataset
    data = pd.read_csv(features.path + ".csv")
    
    # Initialize and train the Decision Regression model
    model_dt = DecisionTreeClassifier(random_state=73, max_depth = 4, min_samples_leaf = 1)
    model_dt.fit(data.drop('HeartDisease', axis=1), data['HeartDisease'])
    
    # Add metadata to the model
    model.metadata["framework"] = "Decision Tree"
    
    # Save the trained model to a file
    file_name =f"{model.path}.pkl"
    with open(file_name, 'wb') as file:  
        pickle.dump(model_dt, file)

### Pipeline Component: LR Model Evaluation

In [None]:
@dsl.component(
    packages_to_install = [
       "pandas", "scikit-learn==1.3.2", "numpy"
    ], base_image="python:3.10.7-slim"
)
def lr_model_evaluation(
    test_set:  Input[Dataset],
    model_lr: Input[Model],
    thresholds_dict_str: str,
    metrics: Output[ClassificationMetrics],
    kpi: Output[Metrics]
) -> NamedTuple('outputs', approval=bool):
  
    import pandas as pd
    from sklearn.metrics import roc_curve, confusion_matrix, accuracy_score
    import json
    import pickle
    from numpy import nan_to_num

    # Define a function to check if the accuracy is above the threshold
    def threshold_check(val1, val2):
        return val1 >= val2

    # Load the test dataset
    data = pd.read_csv(test_set.path + ".csv")

    # Load the saved model
    with open(model_lr.path + ".pkl", 'rb') as file:
        model = pickle.load(file)

    # Predict the target values
    y_test = data.drop(columns=["HeartDisease"])
    y_target = data['HeartDisease']
    y_pred = model.predict(y_test)
    y_scores = model.predict_proba(y_test)[:, 1]

    # Calculate the ROC curve
    fpr, tpr, thresholds = roc_curve(y_true=y_target, y_score=y_scores, pos_label=True)
    thresholds = nan_to_num(thresholds)
    metrics.log_roc_curve(fpr, tpr, thresholds)

    # Calculate the confusion matrix
    metrics.log_confusion_matrix(
        ['Negative', 'Positive'],
        confusion_matrix(y_target, y_pred).tolist(),
    )

    # Calculate the accuracy
    accuracy = accuracy_score(y_target, y_pred.round())
    thresholds_dict = json.loads(thresholds_dict_str)
    model_lr.metadata["accuracy"] = float(accuracy)
    kpi.log_metric("accuracy", float(accuracy))

    approval_value = threshold_check(float(accuracy), thresholds_dict['roc'])
    return (approval_value,)

### Pipeline Component: DT Model Evaluation

In [None]:
@dsl.component(
    packages_to_install = [
       "pandas", "scikit-learn==1.3.2", "numpy"
    ], base_image="python:3.10.7-slim"
)
def dt_model_evaluation(
    test_set:  Input[Dataset],
    model_dt: Input[Model],
    thresholds_dict_str: str,
    metrics: Output[ClassificationMetrics],
    kpi: Output[Metrics]
) -> NamedTuple('outputs', approval=bool):
  
    import pandas as pd
    from sklearn.metrics import roc_curve, confusion_matrix, accuracy_score
    import json
    import pickle
    from numpy import nan_to_num

    # Define a function to check if the accuracy is above the threshold
    def threshold_check(val1, val2):
        return val1 >= val2

    # Load the test dataset
    data = pd.read_csv(test_set.path + ".csv")

    # Load the saved model
    with open(model_dt.path + ".pkl", 'rb') as file:
        model = pickle.load(file)

    # Predict the target values
    y_test = data.drop(columns=["HeartDisease"])
    y_target = data['HeartDisease']
    y_pred = model.predict(y_test)
    y_scores = model.predict_proba(y_test)[:, 1]

    # Calculate the ROC curve
    fpr, tpr, thresholds = roc_curve(y_true=y_target, y_score=y_scores, pos_label=True)
    thresholds = nan_to_num(thresholds)
    metrics.log_roc_curve(fpr, tpr, thresholds)

    # Calculate the confusion matrix
    metrics.log_confusion_matrix(
        ['Negative', 'Positive'],
        confusion_matrix(y_target, y_pred).tolist(),
    )

    # Calculate the accuracy
    accuracy = accuracy_score(y_target, y_pred.round())
    thresholds_dict = json.loads(thresholds_dict_str)
    model_dt.metadata["accuracy"] = float(accuracy)
    kpi.log_metric("accuracy", float(accuracy))

    approval_value = threshold_check(float(accuracy), thresholds_dict['roc'])
    return (approval_value,)

### Pipeline Component: Select Best Model

In [None]:
@dsl.component(
    base_image="python:3.10.7-slim"
)
def compare_model(
    dt_metrics: Input[Metrics],
    lr_metrics: Input[Metrics],
) -> str:
    import logging

    logging.basicConfig(level=logging.INFO)

    dt_acc = dt_metrics.metadata["accuracy"]
    lr_acc = lr_metrics.metadata["accuracy"]

    logging.info(f"Decision Tree Accuracy: {dt_acc}")
    logging.info(f"Logistic Regression Accuracy: {lr_acc}")

    if dt_acc > lr_acc:
        logging.info("Decision Tree model selected.")
        return "DT"
    else:
        logging.info("Logistic Regression model selected.")
        return "LR"

### Pipeline Component: Upload model and metrics to GCB

In [None]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_model_to_gcs(project_id: str, model_repo: str, model: Input[Model]):
    '''upload model to gcs'''
    from google.cloud import storage   
    import logging 
    import sys
    
    # Set up logging
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)    
  
    # Initialize the Google Cloud Storage client
    client = storage.Client(project=project_id)
    
    # Get the bucket where the model will be uploaded
    bucket = client.bucket(model_repo)
    
    # Create a blob object for the model file
    blob = bucket.blob('heart_disease_model.pkl')
    
    # Upload the model file to the bucket
    source_file_name = model.path + '.pkl'
    blob.upload_from_filename(source_file_name)    
    
    print(f"File {source_file_name} uploaded to {model_repo}.")

### Pipeline Definition

In [None]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="heart-disease-predictor-training-pipeline"
)
def pipeline(project_id: str, data_bucket: str, dataset_uri: str, model_repo: str, thresholds_dict_str: str, model_repo_uri: str):
    
    # Import the dataset
    dataset_op = kfp.dsl.importer(
        artifact_uri=dataset_uri,
        artifact_class=Dataset,
        reimport=False,
    )
    
    # Split the dataset into training and testing sets
    train_test_split_op = train_test_split(dataset=dataset_op.output)
    
    # Train the Logistic Regression model
    training_lr_job_run_op = train_lr(features=train_test_split_op.outputs["dataset_train"])
    
    # Train the Decision Tree model
    training_dt_job_run_op = train_dt(features=train_test_split_op.outputs["dataset_train"])
    
    # Evaluate the Logistic Regression model
    model_lr_evaluation_op = lr_model_evaluation(
        test_set=train_test_split_op.outputs["dataset_test"],
        model_lr=training_lr_job_run_op.outputs["model"],
        thresholds_dict_str=thresholds_dict_str,  # Deploy the model only if the performance is above the threshold
    )
    
    # Evaluate the Decision Tree model
    model_dt_evaluation_op = dt_model_evaluation(
        test_set=train_test_split_op.outputs["dataset_test"],
        model_dt=training_dt_job_run_op.outputs["model"],
        thresholds_dict_str=thresholds_dict_str,  # Deploy the model only if the performance is above the threshold
    )
    
    # Compare the models and select the best one
    compare_model_op = compare_model(
        dt_metrics=model_dt_evaluation_op.outputs["kpi"],
        lr_metrics=model_lr_evaluation_op.outputs["kpi"],
    ).after(model_dt_evaluation_op, model_lr_evaluation_op)
    
    # Condition to choose the Decision Tree model
    with dsl.If(compare_model_op.output == "DT", name="Choose DT Model"):
        with dsl.If(
            model_dt_evaluation_op.outputs["approval"] == True,
            name="Did DT Model Pass Threshold",
        ):
            # Upload the Decision Tree model to GCS
            upload_model_to_gcs(
                project_id=project_id,
                model_repo=model_repo,
                model=training_dt_job_run_op.outputs['model']
            )

    # Condition to choose the Logistic Regression model
    with dsl.If(compare_model_op.output == "LR", name="Choose LR Model"):
        with dsl.If(
            model_lr_evaluation_op.outputs["approval"] == True,
            name="Did LR Model Pass Threshold",
        ):
            # Upload the Logistic Regression model to GCS
            upload_model_to_gcs(
                project_id=project_id,
                model_repo=model_repo,
                model=training_lr_job_run_op.outputs['model']
            )


In [None]:
from kfp import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='heart_disease_predictor_training_pipeline.yaml')

### Run pipeline

In [None]:
import google.cloud.aiplatform as aip

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=PROJECT_ID,
    location=REGION,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="heart-disease-predictor",
    enable_caching=True,
    template_path="heart_disease_predictor_training_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        'project_id': PROJECT_ID, 
        'data_bucket': 'data_de2024_mh',  
        'dataset_uri':'gs://data_de2024_mh/heart.csv',
        'model_repo':'models_de2024_mh', 
        'thresholds_dict_str':'{"roc":0.8}',
        'model_repo_uri':'gs://models_de2024_mh' 
    }
)

job.run()