### Installation
Install the packages required for executing this notebook.

In [2]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

Collecting kfp>2
  Downloading kfp-2.9.0.tar.gz (595 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m595.6/595.6 kB[0m [31m22.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Collecting kfp-pipeline-spec==0.4.0 (from kfp>2)
  Downloading kfp_pipeline_spec-0.4.0-py3-none-any.whl.metadata (301 bytes)
Collecting kfp-server-api<2.4.0,>=2.1.0 (from kfp>2)
  Downloading kfp_server_api-2.3.0.tar.gz (84 kB)
  Preparing metadata (setup.py) ... [?25ldone


## Restart the kernel
Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [3]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Check the versions of the packages you installed. The KFP SDK version should be >2.

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 2.7.0
google-cloud-aiplatform==1.69.0
google_cloud_pipeline_components version: 2.17.0


In [2]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.types import artifact_types

#### Project and Pipeline Configurations

In [24]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "your poject id"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://your temp bucket"

#### Pipeline Component : Data Ingestion

In [5]:
@dsl.component(
    packages_to_install=["pandas","google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def download_data(project_id: str, bucket: str, file_name: str, dataset: Output[Dataset]):
    '''download data'''
    from google.cloud import storage
    import pandas as pd
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Downloaing the file from a google bucket 
    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket)
    blob = bucket.blob(file_name)
    blob.download_to_filename(dataset.path + ".csv")
    logging.info('Downloaded Data!')

#### Pipeline Component : Training-MLP 

In [6]:
import os
import pandas as pd
import numpy as np
import joblib
import logging
import sys
from typing import NamedTuple
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam

# Define the DSL component decorator
@dsl.component(
    packages_to_install=['pandas', 'keras', 'tensorflow', 'h5py'],
    base_image="python:3.10.7-slim"
)
def train_mlp(features: Input[Dataset], out_model: Output[Model]) -> NamedTuple('outputs', metrics=dict):
    '''Train a Multi-Layer Perceptron (MLP) with cross-validation and save the model.'''
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    # Load the dataset
    df = pd.read_csv(features.path + ".csv")
    logging.info("Columns: %s", df.columns)

    # Split features and target
    X = df.iloc[:, :-1]  # All columns except the last
    y = df.iloc[:, -1]   # The last column

    # Split the data into training and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

    # Initialize StratifiedKFold for cross-validation
    kf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    
    # Collect results for accuracy
    accuracies = []

    # Cross-validation loop
    for train_index, val_index in kf.split(X_train, y_train):
        X_fold_train, X_fold_val = X_train.iloc[train_index], X_train.iloc[val_index]
        y_fold_train, y_fold_val = y_train.iloc[train_index], y_train.iloc[val_index]

        # Standardize the data
        scaler = StandardScaler()
        X_fold_train_scaled = scaler.fit_transform(X_fold_train)
        X_fold_val_scaled = scaler.transform(X_fold_val)

        # Build the MLP model
        model = Sequential()
        model.add(Dense(128, input_dim=X_fold_train_scaled.shape[1], activation='relu', kernel_regularizer=l2(0.01)))
        model.add(BatchNormalization())
        model.add(Dropout(0.4))
        model.add(Dense(64, activation='relu', kernel_regularizer=l2(0.01)))
        model.add(BatchNormalization())
        model.add(Dropout(0.4))
        model.add(Dense(32, activation='relu', kernel_regularizer=l2(0.01)))
        model.add(Dense(1, activation='sigmoid'))  # Output layer for binary classification

        # Compile the model
        optimizer = Adam(learning_rate=0.001)
        model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])

        # Early stopping callback
        early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

        # Train the model
        history = model.fit(X_fold_train_scaled, y_fold_train,
                            epochs=200,
                            batch_size=16,
                            validation_data=(X_fold_val_scaled, y_fold_val),
                            callbacks=[early_stopping],
                            verbose=0)

        # Evaluate the model on validation set
        val_loss, val_acc = model.evaluate(X_fold_val_scaled, y_fold_val, verbose=0)
        accuracies.append(val_acc)

    # Final evaluation on the test set
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # Retrain the model on the entire training data
    model.fit(X_train_scaled, y_train, epochs=200, batch_size=16, verbose=0)

    # Evaluate the model on the test set
    test_loss, test_acc = model.evaluate(X_test_scaled, y_test, verbose=0)

    # Save the model
    try:
        model_repo = os.environ['MODEL_REPO']
        if model_repo:
            file_path = os.path.join(model_repo, "heart_disease_model.h5")
            model.save(file_path)
            logging.info("Saved the model to the location: " + model_repo)
        else:
            logging.error("MODEL_REPO environment variable is not set.")
    except Exception as e:
        logging.error("Error saving the model: %s", e)

    # Return metrics as NamedTuple
    metrics_dict = {
        "accuracy": test_acc,
        "loss": test_loss,
    }

    # Define metadata for the output model
    out_model.metadata["file_type"] = ".h5"
    out_model.metadata["algo"] = "mlp"

    return NamedTuple('outputs', metrics=dict)(metrics=metrics_dict)


#### Pipeline Component : Training LogisticRegression

In [7]:
@dsl.component(
    packages_to_install=['pandas', 'scikit-learn==1.3.2'],
    base_image="python:3.10.7-slim"
)
def train_lr(features: Input[Dataset], out_model: Output[Model]) -> NamedTuple('outputs', metrics=dict):
    '''Train a Logistic Regression model with default parameters'''
    import pandas as pd
    from sklearn.linear_model import LogisticRegression
    from sklearn.model_selection import train_test_split
    import logging 
    import sys
    import os
    import pickle  
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    # Load the dataset
    df = pd.read_csv(features.path + ".csv")
    logging.info("Columns: %s", df.columns)        

    # Create a copy of the DataFrame
    df_copy = df.copy()

    # Split features and target
    X = df_copy.drop('target', axis=1)  # Use 'target' column as the label
    y = df_copy['target']

    # Split the data into training and test sets
    x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=101)

    # Initialize and train the Logistic Regression model
    model_lr = LogisticRegression(max_iter=150)  # Increase max_iter if needed
    model_lr.fit(x_train, y_train)

    # Compute metrics
    metrics_dict = {
        "accuracy": model_lr.score(x_test, y_test)
    }
    logging.info("Metrics: %s", metrics_dict)  

    # Save the model
    model_file = out_model.path + ".pkl"
    with open(model_file, 'wb') as f:  
        pickle.dump(model_lr, f)

    # Define metadata for the output model
    out_model.metadata["file_type"] = ".pkl"
    out_model.metadata["algo"] = "lr"

    # Return metrics as NamedTuple
    return NamedTuple('outputs', metrics=dict)(metrics_dict)


#### Pipeline Component : Prediction-MLP

In [8]:
@dsl.component(
    packages_to_install=['pandas', 'keras', 'tensorflow', 'h5py', 'joblib'],
    base_image="python:3.10.7-slim"
)
def predict_mlp(model: Input[Model], features: Input[Dataset], results: Output[Dataset]):
    import pandas as pd
    from keras.models import load_model
    import logging
    import sys
    import os
    import joblib
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    # Load the features dataset
    df = pd.read_csv(features.path + ".csv")
    logging.info("Columns: %s", df.columns)

    # Create a copy of the DataFrame for modifications
    dfcp = df.copy()

    # Extract features for prediction
    xNew = dfcp.iloc[:, :-1].values  # Use all columns except the last one
    logging.info("New features shape: %s", xNew.shape)

    # Load the model
    model_mlp = load_model(model.path + '.h5')

    # Load the scaler
    scaler_file_path = os.path.join(model.path, "scaler.pkl") 
    scaler = joblib.load(scaler_file_path)

    # Standardize the new input features
    xNew_scaled = scaler.transform(xNew)

    # Make predictions
    result = model_mlp.predict(xNew_scaled)
    y_classes = (result > 0.5).astype(int)  
    logging.info("Predicted classes: %s", y_classes)

    # Prepare results
    dfcp['predicted_class'] = y_classes.tolist()
    dfcp.to_csv(results.path + ".csv", index=False, encoding='utf-8-sig')


#### Pipeline Component : Prediction-LR

In [9]:
@dsl.component(
    packages_to_install=['pandas', 'scikit-learn==1.3.2'],
    base_image="python:3.10.7-slim"
)
def predict_lr(model: Input[Model], features: Input[Dataset], results: Output[Dataset]):
    import pandas as pd
    import pickle  
    import logging
    import sys
    import os
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    # Load the features dataset
    df = pd.read_csv(features.path + ".csv")
    logging.info("Columns: %s", df.columns)

    # Create a copy of the DataFrame
    df_copy = df.copy()

    # Load the saved model
    model_file = model.path + ".pkl"
    model_lr = pickle.load(open(model_file, 'rb'))

    # Extract features for prediction (using the same features as in training)
    xNew = df_copy.drop('target', axis=1)  # Exclude 'target' column from features

    # Make predictions
    y_classes = model_lr.predict(xNew)
    logging.info("Predicted classes: %s", y_classes)

    # Prepare results
    df_copy['predicted_class'] = y_classes.tolist()  # Add predictions to the DataFrame
    df_copy.to_csv(results.path + ".csv", index=False, encoding='utf-8-sig')


#### Pipeline Component : Algorithm Selection 

In [19]:
@dsl.component(
    base_image="python:3.10.7-slim"
)
def compare_model(mlp_metrics: dict, lr_metrics: dict) -> str:
    import logging
    import json
    import sys
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    logging.info(mlp_metrics)
    logging.info(lr_metrics)
    if mlp_metrics.get("accuracy") > lr_metrics.get("accuracy"):
        return "MLP"
    else :
        return "LR"

### Upload Model and Metrics to Google Bucket 

In [20]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_model_to_gcs(project_id: str, model_repo: str, model: Input[Model]):
    '''upload model to gsc'''
    from google.cloud import storage   
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)    
  
    # upload the model to GCS
    client = storage.Client(project=project_id)
    bucket = client.bucket(model_repo)
    blob = bucket.blob(str(model.metadata["algo"]) + '_model' + str(model.metadata["file_type"])) 
    blob.upload_from_filename(model.path + str(model.metadata["file_type"]))       
    
    print("Saved the model to GCP bucket : " + model_repo)

#### Define the Pipeline

In [21]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="heart_disease_prediction_pipeline",)
def pipeline(project_id: str, data_bucket: str, trainset_filename: str, model_repo: str, testset_filename: str):
    
    
    di_op = download_data(
        project_id=project_id,
        bucket=data_bucket,
        file_name=trainset_filename
    )

 
    training_mlp_job_run_op = train_mlp(
        features=di_op.outputs["dataset"]
    )
    
     
    training_lr_job_run_op = train_lr(
        features=di_op.outputs["dataset"]
    )
    
    pre_di_op = download_data(
        project_id=project_id,
        bucket=data_bucket,
        file_name=testset_filename
    ).after(training_mlp_job_run_op, training_lr_job_run_op)
        
        
    comp_model__op = compare_model(mlp_metrics=training_mlp_job_run_op.outputs["metrics"],
                                       lr_metrics=training_lr_job_run_op.outputs["metrics"]).after(training_mlp_job_run_op, training_lr_job_run_op)  
    
    # defining the branching condition
    with dsl.If(comp_model__op.output=="MLP"):
        predict_mlp_job_run_op = predict_mlp(
            model=training_mlp_job_run_op.outputs["out_model"],      
            features=pre_di_op.outputs["dataset"]
        )
        upload_model_mlp_to_gc_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_mlp_job_run_op.outputs['out_model']
        ).after(predict_mlp_job_run_op)
        
    with dsl.If(comp_model__op.output=="LR"):
        predict_lr_job_run_op = predict_lr(
            model=training_lr_job_run_op.outputs["out_model"],     
            features=pre_di_op.outputs["dataset"]
        )
        upload_model_lr_to_gc_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_lr_job_run_op.outputs['out_model']
        ).after(predict_lr_job_run_op)  

#### Compile the pipeline into a JSON file

In [22]:
from kfp import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='heart_disease_predictor_pipeline.yaml')

#### Submit the pipeline run

In [23]:
import google.cloud.aiplatform as aip

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=PROJECT_ID,
    location=REGION,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="heart_disease_prediction_pipeline",
    enable_caching=False,
    template_path="heart_disease_predictor_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        'project_id': PROJECT_ID, # makesure to use your project id 
        'data_bucket': 'heart_pred_data_bucket',  # makesure to use your data bucket name 
        'trainset_filename': 'training_set.csv',     # makesure to upload these to your data bucket 
        'testset_filename': 'test_set.csv',    # makesure to upload these to your data bucket 
        'model_repo':'heart_pred_model_bucket' # makesure to use your model bucket name 
    }
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/966204339179/locations/us-central1/pipelineJobs/diabetes-prdictor-training-pipeline-v2-20241007202913
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/966204339179/locations/us-central1/pipelineJobs/diabetes-prdictor-training-pipeline-v2-20241007202913')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/diabetes-prdictor-training-pipeline-v2-20241007202913?project=966204339179
PipelineJob projects/966204339179/locations/us-central1/pipelineJobs/diabetes-prdictor-training-pipeline-v2-20241007202913 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/966204339179/locations/us-central1/pipelineJobs/diabetes-prdictor-training-pipeline-v2-20241007202913 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/966204339179/locations/us-central1/pipelineJobs/diabetes-prdictor-training-pipeline-v2-