#  Set parameters and initialize aiplatform client library

In [1]:
# Set parameters
project_id = 'ise543-419203'
location = 'us-central1'

In [2]:
from google.cloud import aiplatform
aiplatform.init(project=project_id, location=location)

In [3]:
# !pip install kfp

In [4]:
# !pip install gcsfs

In [5]:
# !pip install fsspec

# Define components

# Import data

In [6]:
from kfp.dsl import pipeline, component
from kfp.dsl import InputPath, OutputPath, Dataset
from kfp.dsl import Artifact
from kfp.dsl import Output, Input

@component(base_image='python:3.11',
           packages_to_install=["pandas", "fsspec", "gcsfs"])
def import_data(
    df_output: Output[Dataset]
):
    import pandas as pd

    # Specify the correct path to the CSV file in Google Cloud Storage
    df_path = 'gs://ise543_final_pj/Final Project Dataset.csv'


    # Read data from the CSV file
    df = pd.read_csv(df_path)


    # Save the DataFrame to the output path provided by the Kubeflow Pipeline component
    df.to_csv(df_output.path, index=False)


## Import test data

In [7]:
@component(base_image='python:3.11',
           packages_to_install=["pandas", "fsspec", "gcsfs"])
def import_test_data(
    df_test_output: Output[Dataset]
):

    import pandas as pd

    #  test df_path
    df_test_path = 'gs://ise543_final_pj/Final Project Evaluation Dataset - Student(1).csv'


    df_test = pd.read_csv(df_test_path)

    df_test.to_csv(df_test_output.path, index=False)

# Sperate ID

In [8]:
@component(base_image='python:3.11',
           packages_to_install=["pandas"])
def separate_id(
    df_input: Input[Dataset],
    ID_output: Output[Dataset],
    processed_data_output: Output[Dataset]
):
    import pandas as pd

    # Read data
    data = pd.read_csv(df_input.path)


    # Separate out the ID column and assume the column name is 'ID'
    ids = data[['patientID']]
    data_without_id = data.drop(columns=['patientID'])



    # Save the ID column to the specified output path
    ids.to_csv(ID_output.path, index=False)


    # Save processed data without ID to another output path
    data_without_id.to_csv(processed_data_output.path, index=False)



## Sperate ID in test

In [9]:
@component(base_image='python:3.11',
           packages_to_install=["pandas"])
def separate_id_test(
    df_test_input: Input[Dataset],
    ID_test_output: Output[Dataset],
    processed_test_data_output: Output[Dataset]
):
    import pandas as pd

    data_test = pd.read_csv(df_test_input.path)

    # Separate out the ID column and assume the column name is 'ID'
    ids_test = data_test[['patientID']]
    data_without_id_test = data_test.drop(columns=['patientID'])

    ids_test.to_csv(ID_test_output.path, index=False)

    data_without_id_test.to_csv(processed_test_data_output.path, index=False)

In [10]:
# @component(base_image='python:3.11',
#            packages_to_install=["pandas"])
# def drop_target_test(
#     processed_test_data_input: Input[Dataset],
#     X_test_output: Output[Dataset]
#     # y_test_output: Output[Dataset]

# ):
#     import pandas as pd

#     # Load the processed data
#     data = pd.read_csv(processed_test_data_input.path)

#     # Separate features and target using 'TenYearCHD' as the target column
#     X_test = data.drop(columns=['TenYearCHD'])
#     # y_test = data['TenYearCHD']

#     X_test.to_csv(X_test_output.path, index=False)
#     # y_test.to_csv(y_test_output.path, index=False)


# Split data into train and val

In [11]:
@component(base_image='python:3.11',
           packages_to_install=["pandas", "scikit-learn"])
def split_data(
    processed_data_input: Input[Dataset],
    X_train_output: Output[Dataset],
    X_val_output: Output[Dataset],
    y_train_output: Output[Dataset],
    y_val_output: Output[Dataset],
    split_details: Output[Artifact],  # Output details of the split



    split_ratio: float = 0.2,
    random_seed: int = 42,
):
    import pandas as pd
    from sklearn.model_selection import train_test_split

    # Load the processed data
    data = pd.read_csv(processed_data_input.path)

    # Separate features and target using 'TenYearCHD' as the target column
    X = data.drop(columns=['TenYearCHD'])
    y = data['TenYearCHD']


    # Perform the split
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=split_ratio, random_state=random_seed)

    # Save the split datasets
    X_train.to_csv(X_train_output.path, index=False)
    X_val.to_csv(X_val_output.path, index=False)
    y_train.to_csv(y_train_output.path, index=False)
    y_val.to_csv(y_val_output.path, index=False)

    # Optionally save some details about the split
    details = f"Train size: {len(X_train)}, Validation size: {len(X_val)}"
    with open(split_details.path, 'w') as f:
        f.write(details)



## Feature selection

In [12]:
from kfp.v2.dsl import component, InputPath, Input, Output, Dataset

@component(base_image='python:3.11',
           packages_to_install=["pandas"])
def feature_selection(
    X_train_input: Input[Dataset],
    X_train_output: Output[Dataset],
    X_val_input: Input[Dataset],
    X_val_output: Output[Dataset],
    X_test_input: Input[Dataset],
    X_test_output: Output[Dataset]

):
    import pandas as pd

    # Use the .path property to read and write data
    X_train = pd.read_csv(X_train_input.path)
    X_val = pd.read_csv(X_val_input.path)

    X_test = pd.read_csv(X_test_input.path)

    # Select specific feature columns
    X_train = X_train[['age', 'sysBP', 'prevalentHyp', 'glucose', 'BPMeds']]
    X_val = X_val[['age', 'sysBP', 'prevalentHyp', 'glucose', 'BPMeds']]

    X_test = X_test[['age', 'sysBP', 'prevalentHyp', 'glucose', 'BPMeds']]

    # Save to the path provided by Dataset
    X_train.to_csv(X_train_output.path, index=False)
    X_val.to_csv(X_val_output.path, index=False)

    X_test.to_csv(X_test_output.path, index=False)

  from kfp.v2.dsl import component, InputPath, Input, Output, Dataset


## Data preprocessing

### Impute 'BPMeds', 'glucose' in X_train

In [13]:
from kfp.dsl import component, Input, Output, Dataset, Artifact

@component(base_image='python:3.11',
           packages_to_install=["pandas"])
def impute_missing_value_train(
           X_train_input: Input[Dataset],
           X_train_output: Output[Dataset],
           mode_bpm_output: Output[Artifact],
           median_glucose_output: Output[Artifact]
):
    import pandas as pd

    # Load data from input paths
    X_train = pd.read_csv(X_train_input.path)  # Use .path to access the dataset

    # Calculate the mode of 'BPMeds' and the median of 'glucose'
    mode_bpm = X_train['BPMeds'].mode()[0]
    median_glucose = X_train['glucose'].median()

    # Write mode and median to their respective output paths
    with open(mode_bpm_output.path, 'w') as file:
        file.write(str(mode_bpm))
    with open(median_glucose_output.path, 'w') as file:
        file.write(str(median_glucose))

    # Impute missing values in the training dataset
    X_train['BPMeds'] = X_train['BPMeds'].fillna(mode_bpm)
    X_train['glucose'] = X_train['glucose'].fillna(median_glucose)

    # Save the imputed training dataset
    X_train.to_csv(X_train_output.path, index=False)




### Impute 'BPMeds', 'glucose' in X_val

In [14]:
from kfp.dsl import component, Input, Output, Dataset, Artifact

@component(base_image='python:3.11',
           packages_to_install=["pandas"])
def impute_missing_value_val(
           X_val_input: Input[Dataset],
           X_val_output: Output[Dataset],
           mode_bpm_info: Input[Artifact],
           median_glucose_info: Input[Artifact]
):
    import pandas as pd

    # Load validation dataset
    X_val = pd.read_csv(X_val_input.path)

    # Read mode and median values from files
    with open(mode_bpm_info.path, 'r') as file:
        mode_bpm = float(file.read())
    with open(median_glucose_info.path, 'r') as file:
        median_glucose = float(file.read())

    # Impute missing values in the validation dataset
    X_val['BPMeds'] = X_val['BPMeds'].fillna(mode_bpm)
    X_val['glucose'] = X_val['glucose'].fillna(median_glucose)

    # Save the imputed validation dataset
    X_val.to_csv(X_val_output.path, index=False)


### Impute 'BPMeds', 'glucose' in X_test

In [15]:
from kfp.dsl import component, Input, Output, Dataset, Artifact

@component(base_image='python:3.11',
           packages_to_install=["pandas"])
def impute_missing_value_test(
           X_test_input: Input[Dataset],
           X_test_output: Output[Dataset],
           mode_bpm_info: Input[Artifact],
           median_glucose_info: Input[Artifact]
):
    import pandas as pd

    # Load validation dataset
    X_test = pd.read_csv(X_test_input.path)

    # Read mode and median values from files
    with open(mode_bpm_info.path, 'r') as file:
        mode_bpm = float(file.read())
    with open(median_glucose_info.path, 'r') as file:
        median_glucose = float(file.read())

    # Impute missing values in the validation dataset
    X_test['BPMeds'] = X_test['BPMeds'].fillna(mode_bpm)
    X_test['glucose'] = X_test['glucose'].fillna(median_glucose)

    # Save the imputed validation dataset
    X_test.to_csv(X_test_output.path, index=False)

### Data quality steps (dealing with outliers and skewness) in X_train

In [16]:
@component(base_image='python:3.11',
           packages_to_install=["pandas", "numpy"])
def data_quality_train(
    X_train_input: Input[Dataset],
    X_train_output: Output[Dataset],
    IQR_train_output: Output[Artifact]
):
    import pandas as pd
    import numpy as np

    X_train = pd.read_csv(X_train_input.path)
    def log_transform(x):
        return np.log1p(x)

    continuous_features = ['age', 'sysBP', 'glucose']
    X_train[continuous_features] = X_train[continuous_features].apply(log_transform)

    Q1 = X_train[continuous_features].quantile(0.25)
    Q3 = X_train[continuous_features].quantile(0.75)
    IQR = Q3 - Q1

    # Explicitly set the index for bounds DataFrame to match continuous_features
    bounds = pd.DataFrame({
        'lower_bound': Q1 - 1.5 * IQR,
        'upper_bound': Q3 + 1.5 * IQR
    }, index=continuous_features)

    # Save bounds to CSV ensuring index is included
    bounds.to_csv(IQR_train_output.path)

    for feature in continuous_features:
        lower_bound = bounds.loc[feature, 'lower_bound']
        upper_bound = bounds.loc[feature, 'upper_bound']
        X_train[feature] = np.where(X_train[feature] < lower_bound, lower_bound, X_train[feature])
        X_train[feature] = np.where(X_train[feature] > upper_bound, upper_bound, X_train[feature])

    X_train.to_csv(X_train_output.path, index=False)




### Data quality steps (dealing with outliers and skewness) in X_val

In [17]:
@component(base_image='python:3.11',
           packages_to_install=["pandas", "numpy"])
def data_quality_val(
    X_val_input: Input[Dataset],
    IQR_train_input: Input[Artifact],
    X_val_output: Output[Dataset]
):
    import pandas as pd
    import numpy as np

    X_val = pd.read_csv(X_val_input.path)
    IQR_stats = pd.read_csv(IQR_train_input.path, index_col=0)

    # Check the index after loading
    print("Indices in IQR_stats:", IQR_stats.index)

    def log_transform(x):
        return np.log1p(x)

    continuous_features = ['age', 'sysBP', 'glucose']
    X_val[continuous_features] = X_val[continuous_features].apply(log_transform)

    for feature in continuous_features:
        lower_bound = IQR_stats.at[feature, 'lower_bound']
        upper_bound = IQR_stats.at[feature, 'upper_bound']
        X_val[feature] = np.where(X_val[feature] < lower_bound, lower_bound, X_val[feature])
        X_val[feature] = np.where(X_val[feature] > upper_bound, upper_bound, X_val[feature])

    X_val.to_csv(X_val_output.path, index=False)


### Data quality steps (dealing with outliers and skewness) in X_test

In [18]:
@component(base_image='python:3.11',
           packages_to_install=["pandas", "numpy"])
def data_quality_test(
    X_test_input: Input[Dataset],
    IQR_train_input: Input[Artifact],
    X_test_output: Output[Dataset]
):
    import pandas as pd
    import numpy as np

    X_test = pd.read_csv(X_test_input.path)
    IQR_stats = pd.read_csv(IQR_train_input.path, index_col=0)

    # Check the index after loading
    print("Indices in IQR_stats:", IQR_stats.index)

    def log_transform(x):
        return np.log1p(x)

    continuous_features = ['age', 'sysBP', 'glucose']
    X_test[continuous_features] = X_test[continuous_features].apply(log_transform)

    for feature in continuous_features:
        lower_bound = IQR_stats.at[feature, 'lower_bound']
        upper_bound = IQR_stats.at[feature, 'upper_bound']
        X_test[feature] = np.where(X_test[feature] < lower_bound, lower_bound, X_test[feature])
        X_test[feature] = np.where(X_test[feature] > upper_bound, upper_bound, X_test[feature])

    X_test.to_csv(X_test_output.path, index=False)

## Feature engineering

### Standardization X_train

In [19]:
@component(base_image='python:3.11',
           packages_to_install=["pandas", "scikit-learn", "joblib"])  # Added joblib for model saving
def standardize_train(
    X_train_input: Input[Dataset],
    X_train_output: Output[Dataset],
    scaler_output_info: Output[Artifact]
):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    import joblib  # Import joblib for saving the scaler

    X_train = pd.read_csv(X_train_input.path)
    scaler = StandardScaler()
    continuous_features = ['age', 'sysBP', 'glucose']

    # Fit the scaler and transform the data
    X_train_scaled = scaler.fit_transform(X_train[continuous_features])

    # Convert the scaled array back into a DataFrame
    X_train_scaled_df = pd.DataFrame(X_train_scaled, columns=continuous_features)

    # Combine with non-scaled columns if needed
    X_train[continuous_features] = X_train_scaled_df

    # Save the DataFrame to CSV
    X_train.to_csv(X_train_output.path, index=False)

    # Save the scaler model to a file
    joblib.dump(scaler, scaler_output_info.path)


### Standardization X_val

In [20]:
@component(base_image='python:3.11',
           packages_to_install=["pandas", "scikit-learn", "joblib"])  # Added joblib for model loading
def standardize_val(
    X_val_input: Input[Dataset],
    X_val_output: Output[Dataset],
    scaler_input_info: Input[Artifact]
):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    import joblib  # Import joblib for loading the scaler

    X_val = pd.read_csv(X_val_input.path)

    # Load the scaler model from file
    scaler = joblib.load(scaler_input_info.path)
    continuous_features = ['age', 'sysBP', 'glucose']

    # Transform the validation data
    X_val_scaled = scaler.transform(X_val[continuous_features])

    # Convert the scaled array back into a DataFrame
    X_val_scaled_df = pd.DataFrame(X_val_scaled, columns=continuous_features)

    # Combine with non-scaled columns if needed
    X_val[continuous_features] = X_val_scaled_df

    # Save the DataFrame to CSV
    X_val.to_csv(X_val_output.path, index=False)


### Standardization X_test

In [21]:
@component(base_image='python:3.11',
           packages_to_install=["pandas", "scikit-learn", "joblib"])  # Added joblib for model loading
def standardize_test(
    X_test_input: Input[Dataset],
    X_test_output: Output[Dataset],
    scaler_input_info: Input[Artifact]
):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    import joblib  # Import joblib for loading the scaler

    X_test = pd.read_csv(X_test_input.path)

    # Load the scaler model from file
    scaler = joblib.load(scaler_input_info.path)
    continuous_features = ['age', 'sysBP', 'glucose']

    # Transform the validation data
    X_test_scaled = scaler.transform(X_test[continuous_features])

    # Convert the scaled array back into a DataFrame
    X_test_scaled_df = pd.DataFrame(X_test_scaled, columns=continuous_features)

    # Combine with non-scaled columns if needed
    X_test[continuous_features] = X_test_scaled_df

    # Save the DataFrame to CSV
    X_test.to_csv(X_test_output.path, index=False)

## Apply oversampling method (SMOTE) only on training set

In [22]:
@component(base_image='python:3.11',
           packages_to_install=["pandas", "scikit-learn","imbalanced-learn"])  # Correct package name for imbalanced-learn
def oversampling(
    X_train_input: Input[Dataset],
    y_train_input: Input[Dataset],  # Adding y_train as an input
    X_train_output: Output[Dataset],
    y_train_output: Output[Dataset],  # Adding an output for the resampled y_train
    random_state: int = 42
):
    import pandas as pd
    from imblearn.over_sampling import SMOTE

    # Load input data
    X_train = pd.read_csv(X_train_input.path)
    y_train = pd.read_csv(y_train_input.path)  # Assuming y_train is also a CSV file

    # Create a SMOTE instance with the provided random state
    sm = SMOTE(random_state=random_state)

    # Apply SMOTE resampling
    X_train_SMOTE, y_train_SMOTE = sm.fit_resample(X_train, y_train)

    # Save the resampled data
    X_train_SMOTE.to_csv(X_train_output.path, index=False)
    y_train_SMOTE.to_csv(y_train_output.path, index=False)  # Save the resampled target variable


# Modeling

## Logistic regression

### Train lr

In [23]:
from kfp.dsl import Metrics
from kfp.dsl import Model

@component(base_image='python:3.11', packages_to_install=["pandas", "scikit-learn", "joblib"])
def train_logistic_regression(
    X_train_input: Input[Dataset],
    y_train_input: Input[Dataset],
    logistic_model_output: Output[Model]  # Corrected the name here for clarity
):

    import joblib
    import pandas as pd
    from sklearn.linear_model import LogisticRegression
    import os


    # Load the training data
    X_train_SMOTE = pd.read_csv(X_train_input.path)
    y_train_SMOTE = pd.read_csv(y_train_input.path)

    # Create a logistic regression model
    logistic_model = LogisticRegression()

    logistic_model.fit(X_train_SMOTE, y_train_SMOTE)


    # Save the model to the designated gcs output path
    os.makedirs(logistic_model_output.path, exist_ok=True)
    joblib.dump(logistic_model, os.path.join(logistic_model_output.path, "model.joblib"))


### Evaluate lr on val

In [24]:
from kfp.dsl import Metrics

@component(base_image='python:3.11',
           packages_to_install=["pandas", "scikit-learn", "joblib"])
def evaluate_logistic_regression(
    X_val_input: Input[Dataset],
    y_val_input: Input[Dataset],
    logistic_model_input: Input[Model],
    metrics: Output[Metrics]
):

    from sklearn.metrics import confusion_matrix, accuracy_score, f1_score, roc_auc_score, precision_score, recall_score, precision_recall_curve, auc
    import joblib
    import pandas as pd

    # Load the validation data
    X_val_scaled = pd.read_csv(X_val_input.path)
    y_val = pd.read_csv(y_val_input.path)



    # Load the model
    logistic_model_file_path = logistic_model_input.path + "/model.joblib"
    logistic_trained_model = joblib.load(logistic_model_file_path)

    # Make predictions on the validation data
    y_val_pred = logistic_trained_model.predict(X_val_scaled)
    y_val_probs = logistic_trained_model.predict_proba(X_val_scaled)[:, 1]  # probabilities for the positive class

    # Calculate metrics
    logistic_accuracy = accuracy_score(y_val, y_val_pred)
    logistic_precision = precision_score(y_val, y_val_pred, average='weighted')
    logistic_recall = recall_score(y_val, y_val_pred, average='weighted')
    logistic_f1 = f1_score(y_val, y_val_pred, average='weighted')
    logistic_auc = roc_auc_score(y_val, y_val_probs)

    # Confusion matrix
    logistic_conf_matrix = confusion_matrix(y_val, y_val_pred)

    # Precision-recall curve and AUC-PR
    precision, recall, _ = precision_recall_curve(y_val, y_val_probs)
    logistic_auc_pr = auc(recall, precision)

    # Log metrics
    metrics.log_metric("Accuracy", logistic_accuracy)
    metrics.log_metric("Precision", logistic_precision)
    metrics.log_metric("Recall", logistic_recall)
    metrics.log_metric("F1 Score", logistic_f1)
    metrics.log_metric("AUC-ROC", logistic_auc)
    metrics.log_metric("AUC-PR", logistic_auc_pr)


## Random forest

### Train rf

In [25]:
@component(base_image='python:3.11', packages_to_install=["pandas", "scikit-learn", "joblib"])
def train_random_forest(
    X_train_input: Input[Dataset],
    y_train_input: Input[Dataset],
    rf_model_output: Output[Model]  # Corrected the name here for clarity
):

    import joblib
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    import os

    # Load the training data
    X_train_SMOTE = pd.read_csv(X_train_input.path)
    y_train_SMOTE = pd.read_csv(y_train_input.path)

    # Create a random forest model
    rf_model = RandomForestClassifier()

    rf_model.fit(X_train_SMOTE, y_train_SMOTE)

    # Save the model to gcs
    os.makedirs(rf_model_output.path, exist_ok=True)
    joblib.dump(rf_model, os.path.join(rf_model_output.path, "model.joblib"))

### Evaluate rf on val

In [26]:
@component(base_image='python:3.11',
           packages_to_install=["pandas", "scikit-learn", "joblib"])
def evaluate_random_forest(
    X_val_input: Input[Dataset],
    y_val_input: Input[Dataset],
    rf_model_input: Input[Model],
    metrics: Output[Metrics]
):

    from sklearn.metrics import confusion_matrix, accuracy_score, f1_score, roc_auc_score, precision_score, recall_score, precision_recall_curve, auc
    import joblib
    import pandas as pd

    # Load the validation data
    X_val_scaled = pd.read_csv(X_val_input.path)
    y_val = pd.read_csv(y_val_input.path)

    # Load the model
    rf_model_file_path = rf_model_input.path + "/model.joblib"
    rf_trained_model = joblib.load(rf_model_file_path)

    # Make predictions on the validation data
    y_val_pred = rf_trained_model.predict(X_val_scaled)
    y_val_probs = rf_trained_model.predict_proba(X_val_scaled)[:, 1]  # probabilities for the positive class

    # Calculate metrics
    rf_accuracy = accuracy_score(y_val, y_val_pred)
    rf_precision = precision_score(y_val, y_val_pred, average='weighted')
    rf_recall = recall_score(y_val, y_val_pred, average='weighted')
    rf_f1 = f1_score(y_val, y_val_pred, average='weighted')
    rf_auc = roc_auc_score(y_val, y_val_probs)

    # Confusion matrix
    rf_conf_matrix = confusion_matrix(y_val, y_val_pred)

    # Precision-recall curve and AUC-PR
    precision, recall, _ = precision_recall_curve(y_val, y_val_probs)
    rf_auc_pr = auc(recall, precision)

    # Log metrics
    metrics.log_metric("Accuracy", rf_accuracy)
    metrics.log_metric("Precision", rf_precision)
    metrics.log_metric("Recall", rf_recall)
    metrics.log_metric("F1 Score", rf_f1)
    metrics.log_metric("AUC-ROC", rf_auc)
    metrics.log_metric("AUC-PR", rf_auc_pr)





# Predict_and_combine

In [27]:
@component(base_image='python:3.11',
           packages_to_install=["pandas", "scikit-learn", "joblib", "gcsfs"])
def predict_and_combine(
    test_features_path: InputPath('Dataset'),
    patient_ids_path: InputPath('Dataset'),
    model_path: str,
    predictions_path: OutputPath('Dataset')
):
    import pandas as pd
    import joblib
    import gcsfs

    # Create a GCS file system object
    fs = gcsfs.GCSFileSystem()

    # Load the trained model
    with fs.open(model_path, 'rb') as f:
      model = joblib.load(f)

    # Load the test features
    test_features = pd.read_csv(test_features_path)

    # Load patient IDs
    patient_ids = pd.read_csv(patient_ids_path)

    # Make predictions
    predictions = model.predict(test_features)

    # Combine patientID and predictions
    results = pd.DataFrame({
        'patientID': patient_ids['patientID'],
        'TenYearCHD': predictions
    })

    # Save the results to the specified path
    results.to_csv(predictions_path, index=False)



# Define pipeline

In [32]:
@pipeline(name='CHD_pipeline')
def CHD_pipeline():
    # Import data
    import_data_task = import_data()

    # Import test data
    import_test_data_task = import_test_data()

    # Sperate ID
    separate_id_task = separate_id(
        df_input=import_data_task.outputs['df_output']

    )

    separate_id_test_task = separate_id_test(
        df_test_input=import_test_data_task.outputs['df_test_output']
    )

    # drop_target_test_task = separate_id_test(
    #     processed_test_data_input=separate_id_test_task.outputs['processed_test_data_output']
    # )

    # Split the dataset into training and validation sets
    split_data_task = split_data(
        processed_data_input = separate_id_task. outputs['processed_data_output']
    )

    # Feature selection
    feature_selection_task = feature_selection(
        X_train_input=split_data_task.outputs['X_train_output'],
        X_val_input=split_data_task.outputs['X_val_output'],
        X_test_input=separate_id_test_task.outputs['processed_test_data_output']
    )

    # Data preprocessing
    impute_missing_value_train_task = impute_missing_value_train(
        X_train_input=feature_selection_task.outputs['X_train_output']
    )

    impute_missing_value_val_task = impute_missing_value_val(
        X_val_input=feature_selection_task.outputs['X_val_output'],
        mode_bpm_info=impute_missing_value_train_task.outputs['mode_bpm_output'],
        median_glucose_info=impute_missing_value_train_task.outputs['median_glucose_output']
    )

    impute_missing_value_test_task = impute_missing_value_test(
        X_test_input=feature_selection_task.outputs['X_test_output'],
        mode_bpm_info=impute_missing_value_train_task.outputs['mode_bpm_output'],
        median_glucose_info=impute_missing_value_train_task.outputs['median_glucose_output']
    )



    data_quality_train_task = data_quality_train(
        X_train_input=impute_missing_value_train_task.outputs['X_train_output']
    )

    data_quality_val_task = data_quality_val(
        X_val_input=impute_missing_value_val_task.outputs['X_val_output'],
        IQR_train_input=data_quality_train_task.outputs['IQR_train_output']
    )

    data_quality_test_task = data_quality_test(
        X_test_input=impute_missing_value_test_task.outputs['X_test_output'],
        IQR_train_input=data_quality_train_task.outputs['IQR_train_output']
    )


    standardize_train_task = standardize_train(
        X_train_input=data_quality_train_task.outputs['X_train_output']
    )

    standardize_val_task = standardize_val(
        X_val_input=data_quality_val_task.outputs['X_val_output'],
        scaler_input_info=standardize_train_task.outputs['scaler_output_info']
    )

    standardize_test_task = standardize_test(
        X_test_input=data_quality_test_task.outputs['X_test_output'],
        scaler_input_info=standardize_train_task.outputs['scaler_output_info']
    )



    oversampling_task = oversampling(
        X_train_input=standardize_train_task.outputs['X_train_output'],
        y_train_input=split_data_task.outputs['y_train_output']
    )

    # Train the model
    train_logistic_regression_task = train_logistic_regression(
        X_train_input=oversampling_task.outputs['X_train_output'],
        y_train_input=oversampling_task.outputs['y_train_output']
    )

    train_random_forest_task = train_random_forest(
        X_train_input=oversampling_task.outputs['X_train_output'],
        y_train_input=oversampling_task.outputs['y_train_output']
    )



    # Evaluate the model
    evaluate_logistic_regression_task = evaluate_logistic_regression(
        X_val_input=standardize_val_task.outputs['X_val_output'],
        y_val_input=split_data_task.outputs['y_val_output'],
        logistic_model_input=train_logistic_regression_task.outputs['logistic_model_output']
    )

    evaluate_random_forest_task = evaluate_random_forest(
        X_val_input=standardize_val_task.outputs['X_val_output'],
        y_val_input=split_data_task.outputs['y_val_output'],
        rf_model_input=train_random_forest_task.outputs['rf_model_output']
    )

    # Predict and combine
    predict_and_combine_task = predict_and_combine(
        test_features_path=standardize_test_task.outputs['X_test_output'],
        patient_ids_path=separate_id_test_task.outputs['ID_test_output'],
        model_path='gs://ise543_final_pj/125143825809/chd-pipeline-20240501211757/train-random-forest_3515890258916933632/rf_model_output/model.joblib'

    )


# Compile and run pipeline

In [33]:
from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=CHD_pipeline,
    package_path = 'CHD_pipeline.json'
)

pipeline_job = aiplatform.PipelineJob(
    display_name='CHD_pipeline',
    template_path='CHD_pipeline.json',
    pipeline_root='gs://ise543_final_pj',

    enable_caching=True
)

pipeline_job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/125143825809/locations/us-central1/pipelineJobs/chd-pipeline-20240502034344
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/125143825809/locations/us-central1/pipelineJobs/chd-pipeline-20240502034344')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/chd-pipeline-20240502034344?project=125143825809
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/125143825809/locations/us-central1/pipelineJobs/chd-pipeline-20240502034344 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob run completed. Resource name: projects/125143825809/locations/us-central1/