# Heart Disease Prediction Training Pipeline
### Installation
Install the packages required for executing this notebook.

In [1]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.2[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


## Restart the kernel
Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [2]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Check the versions of the packages you installed. The KFP SDK version should be >2.

In [3]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 2.14.6
google-cloud-aiplatform==1.122.0
google_cloud_pipeline_components version: 2.21.0


In [1]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.types import artifact_types

#### Project and Pipeline Configurations

In [2]:
# TODO: Replace with your project ID
PROJECT_ID = "pelagic-pager-472017-v5"
# The region that this pipeline runs in
REGION = "us-central1"
# TODO: Replace with your temp bucket name
PIPELINE_ROOT = "gs://temp2_de2025_group6"

#### Pipeline Component 1: Data Ingestion

In [3]:
@dsl.component(
    packages_to_install=["pandas","google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def download_data(project_id: str, bucket: str, file_name: str, dataset: Output[Dataset]):
    '''Download heart disease data from GCS bucket'''
    from google.cloud import storage
    import pandas as pd
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Download the file from Google Cloud Storage bucket 
    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket)
    blob = bucket.blob(file_name)
    blob.download_to_filename(dataset.path + ".csv")
    logging.info('Downloaded heart disease data!')

#### Pipeline Component 2: Train MLP Model

In [4]:
@dsl.component(
    packages_to_install=['pandas', 'keras', 'tensorflow', 'h5py', 'scikit-learn'],
    base_image="python:3.10.7-slim"
)
def train_mlp_model(features: Input[Dataset], model_output: Output[Model]) -> NamedTuple('outputs', metrics=dict):
    '''Train a MLP model for heart disease prediction'''
    import pandas as pd
    from keras.layers import Dense
    from keras.models import Sequential
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Load data
    df = pd.read_csv(features.path + ".csv")
    logging.info(f"Dataset shape: {df.shape}")
    logging.info(f"Columns: {df.columns.tolist()}")
    
    # Prepare features and target
    X = df.drop('target', axis=1).values
    y = df['target'].values
    
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # Scale features
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)
    
    # Build MLP model 
    model = Sequential()
    model.add(Dense(24, input_dim=13, activation='relu'))  
    model.add(Dense(16, activation='relu'))                
    model.add(Dense(8, activation='relu'))                 
    model.add(Dense(1, activation='sigmoid'))
    
    # Compile model
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    
    # Train model 
    history = model.fit(X_train, y_train, epochs=150, batch_size=10, verbose=0, validation_split=0.2)  
    
    # Evaluate model on test set
    test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0)
    
    logging.info(f"Test Accuracy: {test_accuracy}")
    logging.info(f"Test Loss: {test_loss}")
    
    metrics_dict = {
        "accuracy": float(test_accuracy),
        "loss": float(test_loss),
    }
    
    # Save model metadata
    model_output.metadata["file_type"] = ".keras"
    model_output.metadata["algo"] = "MLP"  
    
    # Save the model
    model_file = model_output.path + ".keras"
    model.save(model_file)
    logging.info(f"Model saved to {model_file}")
    
    outputs = NamedTuple('outputs', metrics=dict)
    return outputs(metrics_dict)

#### Pipeline Component 3: Train XGBoost Model

In [5]:
@dsl.component(
    packages_to_install=['pandas', 'xgboost', 'scikit-learn'],
    base_image="python:3.10.7-slim"
)
def train_xgboost_model(features: Input[Dataset], model_output: Output[Model]) -> NamedTuple('outputs', metrics=dict):
    '''Train an XGBoost model for heart disease prediction'''
    import pandas as pd
    import xgboost as xgb
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    import pickle
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Load data
    df = pd.read_csv(features.path + ".csv")
    logging.info(f"Dataset shape: {df.shape}")
    
    # Prepare features and target
    X = df.drop('target', axis=1).values
    y = df['target'].values
    
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # Scale features
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)
    
    # Train XGBoost model
    model_xgb = xgb.XGBClassifier(
        n_estimators=100,
        max_depth=5,
        learning_rate=0.1,
        random_state=42
    )
    model_xgb.fit(X_train, y_train)
    
    # Evaluate model
    test_accuracy = model_xgb.score(X_test, y_test)
    
    logging.info(f"XGBoost Test Accuracy: {test_accuracy}")
    
    metrics_dict = {
        "accuracy": float(test_accuracy),
        "loss": 0.0,  # XGBoost doesn't directly provide loss
    }
    
    # Save model metadata
    model_output.metadata["file_type"] = ".pkl"
    model_output.metadata["algo"] = "xgboost"
    
    # Save the model
    model_file = model_output.path + ".pkl"
    with open(model_file, 'wb') as f:
        pickle.dump(model_xgb, f)
    logging.info(f"XGBoost model saved to {model_file}")
    
    outputs = NamedTuple('outputs', metrics=dict)
    return outputs(metrics_dict)

#### Pipeline Component 4: Model Comparison

In [6]:
@dsl.component(
    base_image="python:3.10.7-slim"
)
def compare_models(mlp_metrics: dict, xgb_metrics: dict) -> str:  
    '''Compare MLP and XGBoost models and select the best one'''  
    import logging
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    logging.info(f"MLP metrics: {mlp_metrics}")  
    logging.info(f"XGBoost metrics: {xgb_metrics}")
    
    mlp_accuracy = mlp_metrics.get("accuracy", 0)  
    xgb_accuracy = xgb_metrics.get("accuracy", 0)
    
    if mlp_accuracy > xgb_accuracy:
        logging.info("MLP model selected")  
        return "MLP"  # 改：NN → MLP
    else:
        logging.info("XGBoost model selected")
        return "XGB"

#### Pipeline Component 5: Upload Model to GCS

In [7]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_model_to_gcs(project_id: str, model_repo: str, model: Input[Model]):
    '''Upload the selected model to Google Cloud Storage'''
    from google.cloud import storage   
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Upload the model to GCS
    client = storage.Client(project=project_id)
    bucket = client.bucket(model_repo)
    
    # Use 'model.keras' as the standard filename for deployment
    blob = bucket.blob('model.keras')
    blob.upload_from_filename(model.path + str(model.metadata["file_type"]))
    
    logging.info(f"Model uploaded to GCS bucket: gs://{model_repo}/model.keras")
    logging.info(f"Algorithm: {model.metadata['algo']}")

#### Define the Pipeline

In [8]:
@kfp.dsl.pipeline(
    name="heart-disease-training-pipeline")
def pipeline(project_id: str, data_bucket: str, trainset_filename: str, model_repo: str):
    '''
    Heart Disease Prediction Training Pipeline
    
    This pipeline:
    1. Downloads training data from GCS
    2. Trains two models: Neural Network and XGBoost
    3. Compares models and selects the best one
    4. Uploads the best model to GCS for deployment
    '''
    
    # Step 1: Download training data
    download_task = download_data(
        project_id=project_id,
        bucket=data_bucket,
        file_name=trainset_filename
    )
    
    # Step 2: Train MLP model
    train_mlp_task = train_mlp_model(
        features=download_task.outputs["dataset"]
    )
    
    # Step 3: Train XGBoost model
    train_xgb_task = train_xgboost_model(
        features=download_task.outputs["dataset"]
    )
    
    # Step 4: Compare models
    compare_task = compare_models(
        mlp_metrics=train_mlp_task.outputs["metrics"],
        xgb_metrics=train_xgb_task.outputs["metrics"]
    ).after(train_mlp_task, train_xgb_task)
    
    # Step 5: Upload the best model (conditional branching)
    with dsl.If(compare_task.output == "MLP"):
        upload_nn_model = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=train_mlp_task.outputs['model_output']
        )
    
    with dsl.If(compare_task.output == "XGB"):
        upload_xgb_model = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=train_xgb_task.outputs['model_output']
        )

#### Compile the Pipeline into a YAML file

In [9]:
from kfp import compiler
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path='heart_disease_training_pipeline.yaml'
)
print("Pipeline compiled successfully!")

Pipeline compiled successfully!


#### Submit the Pipeline Run (Optional - for testing)

In [10]:
import google.cloud.aiplatform as aip

# Initialize Vertex AI
aip.init(
    project=PROJECT_ID,
    staging_bucket=PIPELINE_ROOT,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="heart-disease-training-pipeline",
    enable_caching=False,
    template_path="heart_disease_training_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        'project_id': PROJECT_ID,
        'data_bucket': 'data2_de2025_group6',  # TODO: Replace with your data bucket
        'trainset_filename': 'Heart_disease_cleveland_new.csv',
        'model_repo': 'models2_de2025_group6'  # TODO: Replace with your model bucket
    }
)

# Submit the job
job.run()