In [None]:
! pip install kfp
! pip install google-cloud-pipeline-components
! pip install gcsfs
! pip install fsspec

In [1]:
# Set parameters
project_id = 'flash-parity-420600'
location = 'us-central1'

In [2]:
import kfp
from google.cloud import aiplatform
aiplatform.init(project=project_id, location=location)
from kfp.v2.dsl import pipeline
from kfp.v2.dsl import component
from kfp.v2.dsl import OutputPath, InputPath, Dataset, Model, Metrics, Input, Output, Artifact, ClassificationMetrics

  from kfp.v2.dsl import pipeline


# Drop A1C, PaiteintID

In [14]:
@component(packages_to_install=["pandas", "gcsfs", "fsspec"])
def drop_columns_component(
    input_dataset_path: str,
    output_dataset_path: OutputPath('Dataset'),
    columns_to_drop: list
):
    import pandas as pd

    # Load the dataset
    df = pd.read_csv(input_dataset_path)

    # Drop the specified columns
    df = df.drop(columns=columns_to_drop)

    # Save the dataset
    df.to_csv(output_dataset_path, index=False)

  return component_factory.create_component_from_func(


# Split Data

In [15]:
@component(packages_to_install=["pandas", "scikit-learn", "gcsfs", "fsspec"])
def split_component(
    input_dataset_path: InputPath('Dataset'),
    train_dataset_path: OutputPath('Dataset'),
    validation_dataset_path: OutputPath('Dataset'),
    test_size: float = 0.1,
    random_state: int = 42):
    import pandas as pd
    from sklearn.model_selection import train_test_split

    # Load the dataset
    df = pd.read_csv(input_dataset_path)

    # Split into train+validation and test
    train_df, val_df = train_test_split(df, test_size=test_size, random_state=random_state)

    # Save the split datasets
    train_df.to_csv(train_dataset_path, index=False)
    val_df.to_csv(validation_dataset_path, index=False)


# Impute train

In [16]:
@component(packages_to_install=["pandas", "scikit-learn", "numpy", "gcsfs", "fsspec"])
def train_impute_data_component(
    train_dataset_path: InputPath('Dataset'),
    imputed_train_dataset_path: OutputPath('Dataset'),
    imputation_params_path: OutputPath(),
    imputation_metadata: Output[Artifact]  # Output for metadata
):
    import pandas as pd
    import numpy as np
    import json
    from sklearn.impute import SimpleImputer
    train_df = pd.read_csv(train_dataset_path)
    # Handling 'cigsPerDay' custom imputation
    train_df.loc[train_df['currentSmoker'] == 0, 'cigsPerDay'] = train_df.loc[train_df['currentSmoker'] == 0, 'cigsPerDay'].fillna(0)
    median_cigs = train_df[train_df['currentSmoker'] == 1]['cigsPerDay'].median()
    train_df.loc[train_df['currentSmoker'] == 1, 'cigsPerDay'] = train_df.loc[train_df['currentSmoker'] == 1, 'cigsPerDay'].fillna(median_cigs)
    # General median and mode imputation
    median_imputer = SimpleImputer(strategy='median')
    mode_imputer = SimpleImputer(strategy='most_frequent')
    median_cols = ['glucose', 'totChol', 'BMI', 'heartRate']
    mode_cols = ['education', 'BPMeds']
    train_df[median_cols] = median_imputer.fit_transform(train_df[median_cols])
    train_df[mode_cols] = mode_imputer.fit_transform(train_df[mode_cols])

    # Convert 'BPMeds' and 'Education' to integer
    train_df['BPMeds'] = train_df['BPMeds'].astype(int)
    train_df['education'] = train_df['education'].astype(int)

    imputation_params = {
        'median_cigs': median_cigs,
        'median_values': median_imputer.statistics_.tolist(),
        'mode_values': mode_imputer.statistics_.tolist()
    }
    with open(imputation_params_path, 'w') as f:
        json.dump(imputation_params, f)

    # Saving metadata
    imputation_metadata.metadata = imputation_params
    train_df.to_csv(imputed_train_dataset_path, index=False)




# Impute Validation

In [17]:
@component(packages_to_install=["pandas", "scikit-learn", "numpy", "gcsfs", "fsspec"])
def validation_impute_data_component(
    validation_dataset_path: InputPath('Dataset'),
    imputation_params_path: InputPath(),
    imputed_validation_dataset_path: OutputPath('Dataset'),
    validation_imputation_metadata: Output[Artifact]  # Output for metadata
):
    import pandas as pd
    import json
    import numpy as np
    from sklearn.impute import SimpleImputer

    # Load the validation dataset
    validation_df = pd.read_csv(validation_dataset_path)

    # Load imputation parameters from the JSON file
    with open(imputation_params_path, 'r') as f:
        imputation_params = json.load(f)

    # Applying custom imputation for 'cigsPerDay'
    validation_df.loc[validation_df['currentSmoker'] == 0, 'cigsPerDay'] = validation_df.loc[validation_df['currentSmoker'] == 0, 'cigsPerDay'].fillna(0)
    validation_df.loc[validation_df['currentSmoker'] == 1, 'cigsPerDay'] = validation_df.loc[validation_df['currentSmoker'] == 1, 'cigsPerDay'].fillna(imputation_params['median_cigs'])

    # Applying general median and mode imputation using parameters from the training dataset
    median_cols = ['glucose', 'totChol', 'BMI', 'heartRate']
    mode_cols = ['education', 'BPMeds']
    median_imputer = SimpleImputer(strategy='median')
    mode_imputer = SimpleImputer(strategy='most_frequent')
    median_imputer.statistics_ = np.array(imputation_params['median_values'])
    mode_imputer.statistics_ = np.array(imputation_params['mode_values'])
    validation_df[median_cols] = median_imputer.transform(validation_df[median_cols])
    validation_df[mode_cols] = mode_imputer.transform(validation_df[mode_cols])

    # Convert 'BPMeds' and 'Education' to integer
    validation_df['BPMeds'] = validation_df['BPMeds'].astype(int)
    validation_df['education'] = validation_df['education'].astype(int)

    # Save the imputed validation dataset
    validation_df.to_csv(imputed_validation_dataset_path, index=False)

    # Saving metadata for imputation
    validation_imputation_metadata.metadata = {
        "median_cigs_used": imputation_params['median_cigs'],
        "median_values_used": imputation_params['median_values'],
        "mode_values_used": imputation_params['mode_values']
    }




# Impute Evaluation

In [130]:
@component(packages_to_install=["pandas", "scikit-learn", "numpy", "gcsfs", "fsspec"])
def evaluation_impute_data_component(
    evaluation_dataset_path: InputPath('Dataset'),
    imputation_params_path: str,
    imputed_evaluation_dataset_path: OutputPath('Dataset'),
    evaluation_imputation_metadata: Output[Artifact]  # Output for metadata
):
    import pandas as pd
    import json
    import numpy as np
    from sklearn.impute import SimpleImputer
    import gcsfs
    # Load the evaluation dataset
    evaluation_df = pd.read_csv(evaluation_dataset_path)
    # Load imputation parameters from the JSON file
    fs = gcsfs.GCSFileSystem()
    with fs.open(imputation_params_path, 'r') as f:
        imputation_params = json.load(f)
    # Applying custom imputation for 'cigsPerDay'
    evaluation_df.loc[evaluation_df['currentSmoker'] == 0, 'cigsPerDay'] = evaluation_df.loc[evaluation_df['currentSmoker'] == 0, 'cigsPerDay'].fillna(0)
    evaluation_df.loc[evaluation_df['currentSmoker'] == 1, 'cigsPerDay'] = evaluation_df.loc[evaluation_df['currentSmoker'] == 1, 'cigsPerDay'].fillna(imputation_params['median_cigs'])
    # Applying general median and mode imputation using parameters from the training dataset
    median_cols = ['glucose', 'totChol', 'BMI', 'heartRate']
    mode_cols = ['education', 'BPMeds']
    median_imputer = SimpleImputer(strategy='median')
    mode_imputer = SimpleImputer(strategy='most_frequent')
    median_imputer.statistics_ = np.array(imputation_params['median_values'])
    mode_imputer.statistics_ = np.array(imputation_params['mode_values'])
    evaluation_df[median_cols] = median_imputer.transform(evaluation_df[median_cols])
    evaluation_df[mode_cols] = mode_imputer.transform(evaluation_df[mode_cols])
    # Convert 'BPMeds' and 'Education' to integer
    evaluation_df['BPMeds'] = evaluation_df['BPMeds'].astype(int)
    evaluation_df['education'] = evaluation_df['education'].astype(int)
    # Save the imputed evaluation dataset
    evaluation_df.to_csv(imputed_evaluation_dataset_path, index=False)
    # Saving metadata for imputation
    evaluation_imputation_metadata.metadata = {
        "median_cigs_used": imputation_params['median_cigs'],
        "median_values_used": imputation_params['median_values'],
        "mode_values_used": imputation_params['mode_values']
    }

  return component_factory.create_component_from_func(


# Winsorize train

In [24]:
@component(packages_to_install=["pandas", "numpy", "gcsfs", "fsspec"])
def train_winsorize_component(
    train_dataset_path: InputPath('Dataset'),
    winsorized_train_dataset_path: OutputPath('Dataset'),
    winsorize_params_path: OutputPath(),
    winsorize_metadata: Output[Artifact]  # Output for metadata
):
    import pandas as pd
    import numpy as np
    import json

    train_df = pd.read_csv(train_dataset_path)

    numerical_columns = ['age', 'cigsPerDay', 'totChol', 'income', 'sysBP', 'diaBP', 'BMI', 'heartRate', 'glucose']
    winsorize_thresholds = {}

    for col in numerical_columns:
        sorted_train_data = np.sort(train_df[col])
        lower_limit_index = int(0.01 * len(sorted_train_data))  # 1% lower limit
        upper_limit_index = int(0.99 * len(sorted_train_data)) - 1  # 99% upper limit
        winsorize_thresholds[col] = (float(sorted_train_data[lower_limit_index]), float(sorted_train_data[upper_limit_index]))

        # Apply winsorization to train dataset
        train_df[col] = np.where(train_df[col] < sorted_train_data[lower_limit_index], sorted_train_data[lower_limit_index], train_df[col])
        train_df[col] = np.where(train_df[col] > sorted_train_data[upper_limit_index], sorted_train_data[upper_limit_index], train_df[col])

    # Save winsorize parameters
    with open(winsorize_params_path, 'w') as f:
        json.dump(winsorize_thresholds, f)

    # Saving metadata
    winsorize_metadata.metadata = {col: list(thresholds) for col, thresholds in winsorize_thresholds.items()}

    train_df.to_csv(winsorized_train_dataset_path, index=False)


# Winsorize validation

In [25]:
@component(packages_to_install=["pandas", "numpy", "gcsfs", "fsspec"])
def validation_winsorize_component(
    validation_dataset_path: InputPath('Dataset'),
    winsorize_params_path: InputPath(),
    winsorized_validation_dataset_path: OutputPath('Dataset'),
    validation_winsorize_metadata: Output[Artifact]  # Output for metadata
):
    import pandas as pd
    import numpy as np
    import json

    validation_df = pd.read_csv(validation_dataset_path)

    # Load winsorize parameters from the JSON file
    with open(winsorize_params_path, 'r') as f:
        winsorize_thresholds = json.load(f)

    numerical_columns = ['age', 'cigsPerDay', 'totChol', 'income', 'sysBP', 'diaBP', 'BMI', 'heartRate', 'glucose']

    for col in numerical_columns:
        lower_limit, upper_limit = winsorize_thresholds[col]

        # Apply winsorization to validation dataset
        validation_df[col] = np.where(validation_df[col] < lower_limit, lower_limit, validation_df[col])
        validation_df[col] = np.where(validation_df[col] > upper_limit, upper_limit, validation_df[col])

    # Saving metadata
    validation_winsorize_metadata.metadata = {col: list(thresholds) for col, thresholds in winsorize_thresholds.items()}

    validation_df.to_csv(winsorized_validation_dataset_path, index=False)

#Winsorize Evaluation

In [131]:
@component(packages_to_install=["pandas", "numpy", "gcsfs", "fsspec"])
def evaluation_winsorize_component(
    evaluation_dataset_path: InputPath('Dataset'),
    winsorize_params_path: str,
    winsorized_evaluation_dataset_path: OutputPath('Dataset'),
    evaluation_winsorize_metadata: Output[Artifact]  # Output for metadata
):
    import pandas as pd
    import numpy as np
    import json
    import gcsfs

    evaluation_df = pd.read_csv(evaluation_dataset_path)

    # Load winsorize parameters from the JSON file
    fs = gcsfs.GCSFileSystem()
    with fs.open(winsorize_params_path, 'r') as f:
        winsorize_thresholds = json.load(f)

    numerical_columns = ['age', 'cigsPerDay', 'totChol', 'income', 'sysBP', 'diaBP', 'BMI', 'heartRate', 'glucose']

    for col in numerical_columns:
        lower_limit, upper_limit = winsorize_thresholds[col]

        # Apply winsorization to evaluation dataset
        evaluation_df[col] = np.where(evaluation_df[col] < lower_limit, lower_limit, evaluation_df[col])
        evaluation_df[col] = np.where(evaluation_df[col] > upper_limit, upper_limit, evaluation_df[col])

    # Saving metadata
    evaluation_winsorize_metadata.metadata = {col: list(thresholds) for col, thresholds in winsorize_thresholds.items()}

    evaluation_df.to_csv(winsorized_evaluation_dataset_path, index=False)

# Log Transformation Train

In [26]:
@component(packages_to_install=["pandas", "numpy", "gcsfs", "fsspec"])
def train_log_transform_component(
    train_dataset_path: InputPath('Dataset'),
    log_transformed_train_dataset_path: OutputPath('Dataset')
):
    import pandas as pd
    import numpy as np

    train_df = pd.read_csv(train_dataset_path)

    skewed_features = ['age', 'BMI', 'heartRate', 'totChol', 'glucose']
    for feature in skewed_features:
        train_df[feature] = np.log1p(train_df[feature])

    train_df.to_csv(log_transformed_train_dataset_path, index=False)


# Log Transformation Validation

In [27]:
@component(packages_to_install=["pandas", "numpy", "gcsfs", "fsspec"])
def validation_log_transform_component(
    validation_dataset_path: InputPath('Dataset'),
    log_transformed_validation_dataset_path: OutputPath('Dataset')
):
    import pandas as pd
    import numpy as np

    validation_df = pd.read_csv(validation_dataset_path)

    skewed_features = ['age', 'BMI', 'heartRate', 'totChol', 'glucose']
    for feature in skewed_features:
        validation_df[feature] = np.log1p(validation_df[feature])

    validation_df.to_csv(log_transformed_validation_dataset_path, index=False)

# Log Transformation Evaluation

In [92]:
@component(packages_to_install=["pandas", "numpy", "gcsfs", "fsspec"])
def evaluation_log_transform_component(
    evaluation_dataset_path: InputPath('Dataset'),
    log_transformed_evaluation_dataset_path: OutputPath('Dataset')
):
    import pandas as pd
    import numpy as np

    evaluation_df = pd.read_csv(evaluation_dataset_path)

    skewed_features = ['age', 'BMI', 'heartRate', 'totChol', 'glucose']
    for feature in skewed_features:
        evaluation_df[feature] = np.log1p(evaluation_df[feature])

    evaluation_df.to_csv(log_transformed_evaluation_dataset_path, index=False)

#Box-cox Transform Train

In [32]:
@component(packages_to_install=["pandas", "numpy", "scikit-learn", "scipy", "gcsfs", "fsspec"])
def train_boxcox_transform_component(
    train_dataset_path: InputPath('Dataset'),
    boxcox_transformed_train_dataset_path: OutputPath('Dataset'),
    boxcox_transformers_path: OutputPath(),
    boxcox_metadata: Output[Artifact]
):
    import pandas as pd
    import numpy as np
    from scipy.stats import boxcox
    from sklearn.preprocessing import PowerTransformer
    import pickle

    train_df = pd.read_csv(train_dataset_path)

    skewed_features_second = ['BMI', 'heartRate', 'glucose']
    transformers = {feature: PowerTransformer(method='box-cox', standardize=False) for feature in skewed_features_second}
    for feature in skewed_features_second:
        train_df[feature] = transformers[feature].fit_transform(train_df[[feature]].values)

    with open(boxcox_transformers_path, 'wb') as f:
        pickle.dump(transformers, f)

    boxcox_metadata.metadata = {feature: float(transformer.lambdas_[0]) for feature, transformer in transformers.items()}

    train_df.to_csv(boxcox_transformed_train_dataset_path, index=False)

# Box-cox Transform Validation

In [33]:
@component(packages_to_install=["pandas", "numpy", "scikit-learn", "scipy", "gcsfs", "fsspec"])
def validation_boxcox_transform_component(
    validation_dataset_path: InputPath('Dataset'),
    boxcox_transformers_path: InputPath(),
    boxcox_transformed_validation_dataset_path: OutputPath('Dataset')
):
    import pandas as pd
    import numpy as np
    from scipy.stats import boxcox
    from sklearn.preprocessing import PowerTransformer
    import pickle

    validation_df = pd.read_csv(validation_dataset_path)

    with open(boxcox_transformers_path, 'rb') as f:
        transformers = pickle.load(f)

    skewed_features_second = ['BMI', 'heartRate', 'glucose']
    for feature in skewed_features_second:
        validation_df[feature] = transformers[feature].transform(validation_df[[feature]].values)

    validation_df.to_csv(boxcox_transformed_validation_dataset_path, index=False)

#Box-cox Transform Evaluation

In [132]:
@component(packages_to_install=["pandas", "numpy", "scikit-learn", "scipy", "gcsfs", "fsspec"])
def evaluation_boxcox_transform_component(
    evaluation_dataset_path: InputPath('Dataset'),
    boxcox_transformers_path: str,
    boxcox_transformed_evaluation_dataset_path: OutputPath('Dataset')
):
    import pandas as pd
    import numpy as np
    from scipy.stats import boxcox
    from sklearn.preprocessing import PowerTransformer
    import pickle
    import gcsfs

    evaluation_df = pd.read_csv(evaluation_dataset_path)

    fs = gcsfs.GCSFileSystem()
    with fs.open(boxcox_transformers_path, 'rb') as f:
        transformers = pickle.load(f)

    skewed_features_second = ['BMI', 'heartRate', 'glucose']
    for feature in skewed_features_second:
        evaluation_df[feature] = transformers[feature].transform(evaluation_df[[feature]].values)

    evaluation_df.to_csv(boxcox_transformed_evaluation_dataset_path, index=False)

#Binning train

In [37]:
@component(packages_to_install=["pandas", "numpy", "gcsfs", "fsspec"])
def train_binning_component(
    train_dataset_path: InputPath('Dataset'),
    binned_train_dataset_path: OutputPath('Dataset'),
    bin_edges_path: OutputPath(),
    binning_metadata: Output[Artifact]
):
    import pandas as pd
    import numpy as np
    import json

    train_df = pd.read_csv(train_dataset_path)

    q = train_df['income'].quantile([0.25, 0.5, 0.75]).values
    train_df['income_category'] = pd.qcut(train_df['income'], q=[0, 0.25, 0.5, 0.75, 1], labels=['Low', 'Middle', 'High', 'Very High'])
    train_df.drop(columns='income', inplace=True)

    with open(bin_edges_path, 'w') as f:
        json.dump({'q': q.tolist()}, f)

    binning_metadata.metadata = {'bin_edges': q.tolist()}

    train_df.to_csv(binned_train_dataset_path, index=False)

# Binning validation

In [38]:
@component(packages_to_install=["pandas", "numpy", "gcsfs", "fsspec"])
def validation_binning_component(
    validation_dataset_path: InputPath('Dataset'),
    bin_edges_path: InputPath(),
    binned_validation_dataset_path: OutputPath('Dataset')
):
    import pandas as pd
    import numpy as np
    import json

    validation_df = pd.read_csv(validation_dataset_path)

    with open(bin_edges_path, 'r') as f:
        bin_edges = json.load(f)['q']

    validation_df['income_category'] = pd.cut(validation_df['income'], bins=[validation_df['income'].min()] + bin_edges + [validation_df['income'].max()], labels=['Low', 'Middle', 'High', 'Very High'])
    validation_df.drop(columns='income', inplace=True)

    validation_df.to_csv(binned_validation_dataset_path, index=False)


# Binning evaluation

In [133]:
@component(packages_to_install=["pandas", "numpy", "gcsfs", "fsspec"])
def evaluation_binning_component(
    evaluation_dataset_path: InputPath('Dataset'),
    bin_edges_path: str,
    binned_evaluation_dataset_path: OutputPath('Dataset')
):
    import pandas as pd
    import numpy as np
    import json
    import gcsfs

    fs = gcsfs.GCSFileSystem()

    evaluation_df = pd.read_csv(evaluation_dataset_path)

    with fs.open(bin_edges_path, 'r') as f:
        bin_edges = json.load(f)['q']

    evaluation_df['income_category'] = pd.cut(evaluation_df['income'], bins=[evaluation_df['income'].min()] + bin_edges + [evaluation_df['income'].max()], labels=['Low', 'Middle', 'High', 'Very High'])
    evaluation_df.drop(columns='income', inplace=True)

    evaluation_df.to_csv(binned_evaluation_dataset_path, index=False)

# Feature Combine Train

In [36]:
@component(packages_to_install=["pandas", "gcsfs", "fsspec"])
def train_feature_combine_component(
    train_dataset_path: InputPath('Dataset'),
    combined_train_dataset_path: OutputPath('Dataset')
):
    import pandas as pd

    train_df = pd.read_csv(train_dataset_path)

    def categorize_smoking(row):
        if row['currentSmoker'] == 0:
            return 'non-smoker'
        elif row['cigsPerDay'] <= 5:
            return 'light smoker'
        elif row['cigsPerDay'] <= 20:
            return 'moderate smoker'
        else:
            return 'heavy smoker'

    train_df['smoking_status'] = train_df.apply(categorize_smoking, axis=1)
    train_df.drop(columns=['currentSmoker', 'cigsPerDay'], inplace=True)

    def create_bp_category(row):
        if row['prevalentHyp'] == 1:
            return 'Hypertension'
        else:
            if row['sysBP'] < 120 and row['diaBP'] < 80:
                return 'Normal'
            elif (row['sysBP'] >= 120 and row['sysBP'] < 140) or (row['diaBP'] >= 80 and row['diaBP'] < 90):
                return 'Prehypertension'
            elif row['sysBP'] >= 140 or row['diaBP'] >= 90:
                return 'Hypertension'

    train_df['bp_category'] = train_df.apply(create_bp_category, axis=1)
    train_df = train_df.drop(['sysBP', 'diaBP', 'prevalentHyp'], axis=1)

    train_df.to_csv(combined_train_dataset_path, index=False)

# Feature Combine Validation

In [39]:
@component(packages_to_install=["pandas", "gcsfs", "fsspec"])
def validation_feature_combine_component(
    validation_dataset_path: InputPath('Dataset'),
    combined_validation_dataset_path: OutputPath('Dataset')
):
    import pandas as pd

    validation_df = pd.read_csv(validation_dataset_path)

    def categorize_smoking(row):
        if row['currentSmoker'] == 0:
            return 'non-smoker'
        elif row['cigsPerDay'] <= 5:
            return 'light smoker'
        elif row['cigsPerDay'] <= 20:
            return 'moderate smoker'
        else:
            return 'heavy smoker'

    validation_df['smoking_status'] = validation_df.apply(categorize_smoking, axis=1)
    validation_df.drop(columns=['currentSmoker', 'cigsPerDay'], inplace=True)

    def create_bp_category(row):
        if row['prevalentHyp'] == 1:
            return 'Hypertension'
        else:
            if row['sysBP'] < 120 and row['diaBP'] < 80:
                return 'Normal'
            elif (row['sysBP'] >= 120 and row['sysBP'] < 140) or (row['diaBP'] >= 80 and row['diaBP'] < 90):
                return 'Prehypertension'
            elif row['sysBP'] >= 140 or row['diaBP'] >= 90:
                return 'Hypertension'

    validation_df['bp_category'] = validation_df.apply(create_bp_category, axis=1)
    validation_df = validation_df.drop(['sysBP', 'diaBP', 'prevalentHyp'], axis=1)

    validation_df.to_csv(combined_validation_dataset_path, index=False)

# Feature Combine Evaluation

In [95]:
@component(packages_to_install=["pandas", "gcsfs", "fsspec"])
def evaluation_feature_combine_component(
    evaluation_dataset_path: InputPath('Dataset'),
    combined_evaluation_dataset_path: OutputPath('Dataset')
):
    import pandas as pd

    evaluation_df = pd.read_csv(evaluation_dataset_path)

    def categorize_smoking(row):
        if row['currentSmoker'] == 0:
            return 'non-smoker'
        elif row['cigsPerDay'] <= 5:
            return 'light smoker'
        elif row['cigsPerDay'] <= 20:
            return 'moderate smoker'
        else:
            return 'heavy smoker'

    evaluation_df['smoking_status'] = evaluation_df.apply(categorize_smoking, axis=1)
    evaluation_df.drop(columns=['currentSmoker', 'cigsPerDay'], inplace=True)

    def create_bp_category(row):
        if row['prevalentHyp'] == 1:
            return 'Hypertension'
        else:
            if row['sysBP'] < 120 and row['diaBP'] < 80:
                return 'Normal'
            elif (row['sysBP'] >= 120 and row['sysBP'] < 140) or (row['diaBP'] >= 80 and row['diaBP'] < 90):
                return 'Prehypertension'
            elif row['sysBP'] >= 140 or row['diaBP'] >= 90:
                return 'Hypertension'

    evaluation_df['bp_category'] = evaluation_df.apply(create_bp_category, axis=1)
    evaluation_df = evaluation_df.drop(['sysBP', 'diaBP', 'prevalentHyp'], axis=1)

    evaluation_df.to_csv(combined_evaluation_dataset_path, index=False)

# Encode train

In [43]:
@component(packages_to_install=["pandas", "gcsfs", "fsspec"])
def train_encoding_component(
    train_dataset_path: InputPath('Dataset'),
    encoded_train_dataset_path: OutputPath('Dataset')
):
    import pandas as pd

    train_df = pd.read_csv(train_dataset_path)
    train_df = pd.get_dummies(train_df, columns=['income_category', 'smoking_status', 'bp_category'])

    train_df.to_csv(encoded_train_dataset_path, index=False)

  return component_factory.create_component_from_func(


#Encode validation

In [44]:
@component(packages_to_install=["pandas", "gcsfs", "fsspec"])
def validation_encoding_component(
    validation_dataset_path: InputPath('Dataset'),
    encoded_validation_dataset_path: OutputPath('Dataset')
):
    import pandas as pd

    validation_df = pd.read_csv(validation_dataset_path)
    validation_df = pd.get_dummies(validation_df, columns=['income_category', 'smoking_status', 'bp_category'])

    validation_df.to_csv(encoded_validation_dataset_path, index=False)


#Encode evaluation

In [96]:
@component(packages_to_install=["pandas", "gcsfs", "fsspec"])
def evaluation_encoding_component(
    evaluation_dataset_path: InputPath('Dataset'),
    encoded_evaluation_dataset_path: OutputPath('Dataset')
):
    import pandas as pd

    evaluation_df = pd.read_csv(evaluation_dataset_path)
    evaluation_df = pd.get_dummies(evaluation_df, columns=['income_category', 'smoking_status', 'bp_category'])

    evaluation_df.to_csv(encoded_evaluation_dataset_path, index=False)

# Scaling train

In [63]:
@component(packages_to_install=["pandas", "scikit-learn", "gcsfs", "fsspec"])
def train_scaling_component(
    train_dataset_path: InputPath('Dataset'),
    scaled_train_features_path: OutputPath('Dataset'),
    scaled_train_target_path: OutputPath('Dataset'),
    scaler_path: OutputPath(),
    scaling_metadata: Output[Artifact]
):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    import pickle

    train_df = pd.read_csv(train_dataset_path)

    X_train = train_df.drop('TenYearCHD', axis=1)
    y_train = train_df['TenYearCHD']

    scaler = StandardScaler()
    numerical_features = ['age', 'totChol', 'BMI', 'heartRate', 'glucose']
    X_train[numerical_features] = scaler.fit_transform(X_train[numerical_features])

    with open(scaler_path, 'wb') as f:
        pickle.dump(scaler, f)

    scaling_metadata.metadata = {
        'mean': scaler.mean_.tolist(),
        'scale': scaler.scale_.tolist()
    }

    X_train.to_csv(scaled_train_features_path, index=False)
    y_train.to_csv(scaled_train_target_path, index=False)

#Scaling validation

In [62]:
@component(packages_to_install=["pandas", "scikit-learn", "gcsfs", "fsspec"])
def validation_scaling_component(
    validation_dataset_path: InputPath('Dataset'),
    scaler_path: InputPath(),
    scaled_validation_features_path: OutputPath('Dataset'),
    scaled_validation_target_path: OutputPath('Dataset')
):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    import pickle

    validation_df = pd.read_csv(validation_dataset_path)

    X_val = validation_df.drop('TenYearCHD', axis=1)
    y_val = validation_df['TenYearCHD']

    with open(scaler_path, 'rb') as f:
        scaler = pickle.load(f)

    numerical_features = ['age', 'totChol', 'BMI', 'heartRate', 'glucose']
    X_val[numerical_features] = scaler.transform(X_val[numerical_features])

    X_val.to_csv(scaled_validation_features_path, index=False)
    y_val.to_csv(scaled_validation_target_path, index=False)

# Scaling Evaluation

In [134]:
@component(packages_to_install=["pandas", "scikit-learn", "gcsfs", "fsspec"])
def evaluation_scaling_component(
    evaluation_dataset_path: InputPath('Dataset'),
    scaler_path: str,
    scaled_evaluation_features_path: OutputPath('Dataset')
):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    import pickle
    import gcsfs

    evaluation_df = pd.read_csv(evaluation_dataset_path)
    fs = gcsfs.GCSFileSystem()
    with fs.open(scaler_path, 'rb') as f:
        scaler = pickle.load(f)

    numerical_features = ['age', 'totChol', 'BMI', 'heartRate', 'glucose']
    evaluation_df[numerical_features] = scaler.transform(evaluation_df[numerical_features])

    evaluation_df.to_csv(scaled_evaluation_features_path, index=False)

#Voting Ensemble

In [72]:
from kfp.v2.dsl import Metrics
@component(packages_to_install=["pandas", "scikit-learn", "lightgbm", "imbalanced-learn==0.9.0", "gcsfs", "fsspec"])
def voting_ensemble_component(
    train_features_path: InputPath('Dataset'),
    train_target_path: InputPath('Dataset'),
    validation_features_path: InputPath('Dataset'),
    validation_target_path: InputPath('Dataset'),
    model_path: OutputPath('Model'),
    evaluation_metrics: Output[Metrics]):
    import pandas as pd
    from sklearn.ensemble import VotingClassifier
    from sklearn.linear_model import LogisticRegression
    from sklearn.svm import SVC
    from imblearn.ensemble import BalancedRandomForestClassifier
    import lightgbm as lgb
    from sklearn.naive_bayes import GaussianNB
    from sklearn.metrics import accuracy_score, f1_score
    import pickle
    X_train = pd.read_csv(train_features_path)
    y_train = pd.read_csv(train_target_path)
    X_val = pd.read_csv(validation_features_path)
    y_val = pd.read_csv(validation_target_path)
    # Create instances of individual models
    lr = LogisticRegression(class_weight='balanced', max_iter=1000)
    svm = SVC(kernel='sigmoid', class_weight='balanced', probability=True, random_state=42)
    brf = BalancedRandomForestClassifier(n_estimators=100, random_state=42, class_weight='balanced')
    lgbm = lgb.LGBMClassifier(num_leaves=63, max_depth=-1, learning_rate=0.05, n_estimators=1000, scale_pos_weight=1, random_state=42)
    gn = GaussianNB()
    estimators = [
        ('lr', lr),
        ('svm', svm),
        ('brf', brf),
        ('lgbm', lgbm),
        ('gn', gn)
    ]
    model = VotingClassifier(estimators=estimators, voting='soft')
    model.fit(X_train, y_train)
    # Save the trained model
    with open(model_path, 'wb') as file:
        pickle.dump(model, file)
    # Predict and evaluate
    y_pred = model.predict(X_val)
    accuracy = accuracy_score(y_val, y_pred)
    f1 = f1_score(y_val, y_pred, average='weighted')
    evaluation_metrics.log_metric('accuracy', accuracy)
    evaluation_metrics.log_metric('f1_score', f1)


#Prediction

In [137]:
@component(packages_to_install=["pandas", "scikit-learn", "lightgbm", "imbalanced-learn==0.9.0", "gcsfs", "fsspec"])
def prediction_component(
    evaluation_dataset_path: InputPath('Dataset'),
    original_evaluation_dataset_path: str,
    model_path: str,
    predictions_path: OutputPath('Dataset')
):
    import pandas as pd
    import pickle
    import gcsfs

    # Load the preprocessed evaluation dataset
    evaluation_data = pd.read_csv(evaluation_dataset_path)

    # Load the original evaluation dataset
    original_evaluation_data = pd.read_csv(original_evaluation_dataset_path)

    # Load the trained model
    fs = gcsfs.GCSFileSystem()
    with fs.open(model_path, 'rb') as file:
        model = pickle.load(file)

    # Make predictions on the evaluation dataset
    predictions = model.predict(evaluation_data)

    # Create a DataFrame with the patientID and predictions
    predictions_df = pd.DataFrame({
        'patientID': original_evaluation_data['patientID'],
        'TenYearCHD': predictions
    })

    # Save the predictions with patientID to a CSV file
    predictions_df.to_csv(predictions_path, index=False)

  return component_factory.create_component_from_func(


# Define Initial Pipeline

In [112]:
@pipeline(name="chd-prediction-pipeline")
def chd_prediction_pipeline(input_dataset_path: str):
    # Run the components in order
    processed_dataset_path = drop_columns_component(input_dataset_path=input_dataset_path,
                                                    columns_to_drop=['patientID', 'a1c'])
    split_result = split_component(input_dataset_path=processed_dataset_path.output)

    train_impute_result = train_impute_data_component(
        train_dataset_path=split_result.outputs['train_dataset_path']
    )

    validation_impute_result = validation_impute_data_component(
        validation_dataset_path=split_result.outputs['validation_dataset_path'],
        imputation_params_path=train_impute_result.outputs['imputation_params_path']
    )

    train_winsorize_result = train_winsorize_component(
        train_dataset_path=train_impute_result.outputs['imputed_train_dataset_path']
    )

    validation_winsorize_result = validation_winsorize_component(
        validation_dataset_path=validation_impute_result.outputs['imputed_validation_dataset_path'],
        winsorize_params_path=train_winsorize_result.outputs['winsorize_params_path']
    )

    train_log_transform_result = train_log_transform_component(
        train_dataset_path=train_winsorize_result.outputs['winsorized_train_dataset_path']
    )

    validation_log_transform_result = validation_log_transform_component(
        validation_dataset_path=validation_winsorize_result.outputs['winsorized_validation_dataset_path']
    )

    train_boxcox_transform_result = train_boxcox_transform_component(
        train_dataset_path=train_log_transform_result.outputs['log_transformed_train_dataset_path']
    )

    validation_boxcox_transform_result = validation_boxcox_transform_component(
        validation_dataset_path=validation_log_transform_result.outputs['log_transformed_validation_dataset_path'],
        boxcox_transformers_path=train_boxcox_transform_result.outputs['boxcox_transformers_path']
    )

    train_binning_result = train_binning_component(
        train_dataset_path=train_boxcox_transform_result.outputs['boxcox_transformed_train_dataset_path']
    )

    validation_binning_result = validation_binning_component(
        validation_dataset_path=validation_boxcox_transform_result.outputs['boxcox_transformed_validation_dataset_path'],
        bin_edges_path=train_binning_result.outputs['bin_edges_path']
    )

    train_feature_combine_result = train_feature_combine_component(
        train_dataset_path=train_binning_result.outputs['binned_train_dataset_path']
    )

    validation_feature_combine_result = validation_feature_combine_component(
        validation_dataset_path=validation_binning_result.outputs['binned_validation_dataset_path']
    )

    train_encoding_result = train_encoding_component(
        train_dataset_path=train_feature_combine_result.outputs['combined_train_dataset_path']
    )

    validation_encoding_result = validation_encoding_component(
        validation_dataset_path=validation_feature_combine_result.outputs['combined_validation_dataset_path']
    )

    train_scaling_result = train_scaling_component(
        train_dataset_path=train_encoding_result.outputs['encoded_train_dataset_path']
    )

    validation_scaling_result = validation_scaling_component(
        validation_dataset_path=validation_encoding_result.outputs['encoded_validation_dataset_path'],
        scaler_path=train_scaling_result.outputs['scaler_path']
    )

    voting_ensemble_result = voting_ensemble_component(
        train_features_path=train_scaling_result.outputs['scaled_train_features_path'],
        train_target_path=train_scaling_result.outputs['scaled_train_target_path'],
        validation_features_path=validation_scaling_result.outputs['scaled_validation_features_path'],
        validation_target_path=validation_scaling_result.outputs['scaled_validation_target_path']
    )


# Compile and run Initial Pipeline

In [113]:
from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func=chd_prediction_pipeline,
    package_path = 'chd_prediction_pipeline.json'
)

pipeline_job = aiplatform.PipelineJob(
    display_name='chd-prediction-pipeline',
    template_path='chd_prediction_pipeline.json',
    pipeline_root='gs://ise543-final-project-yfang',
    enable_caching=True,
    parameter_values={
        'input_dataset_path': 'gs://ise543-final-project-yfang/Final Project Dataset.csv',
    }
)

pipeline_job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/990976262210/locations/us-central1/pipelineJobs/chd-prediction-pipeline-20240504034902
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/990976262210/locations/us-central1/pipelineJobs/chd-prediction-pipeline-20240504034902')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/chd-prediction-pipeline-20240504034902?project=990976262210
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/990976262210/locations/us-central1/pipelineJobs/chd-prediction-pipeline-20240504034902 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob run completed. Resource name: 

#Inference Pipeline

In [138]:
imputation_params_path = 'gs://ise543-final-project-yfang/990976262210/chd-prediction-pipeline-20240503212758/train-impute-data-component_7933296970762813440/imputation_params_path'
winsorize_params_path = 'gs://ise543-final-project-yfang/990976262210/chd-prediction-pipeline-20240503212758/train-winsorize-component_2168689447728578560/winsorize_params_path'
boxcox_transformers_path = 'gs://ise543-final-project-yfang/990976262210/chd-prediction-pipeline-20240503212758/train-boxcox-transform-component_3321610952335425536/boxcox_transformers_path'
bin_edges_path = 'gs://ise543-final-project-yfang/990976262210/chd-prediction-pipeline-20240503212758/train-binning-component_-3595918075305656320/bin_edges_path'
scaler_path = 'gs://ise543-final-project-yfang/990976262210/chd-prediction-pipeline-20240504010640/train-scaling-component_471676813140033536/scaler_path'
model_path = 'gs://ise543-final-project-yfang/990976262210/chd-prediction-pipeline-20240504013540/voting-ensemble-component_-2551364436732411904/model_path'

@pipeline(name="chd-prediction-inference-pipeline")
def chd_prediction_inference_pipeline(
    evaluation_dataset_path: str,
    model_path: str = model_path,
    imputation_params_path: str = imputation_params_path,
    winsorize_params_path: str = winsorize_params_path,
    boxcox_transformers_path: str = boxcox_transformers_path,
    bin_edges_path: str = bin_edges_path,
    scaler_path: str = scaler_path
):
    # Preprocessing components for the evaluation dataset
    processed_evaluation_dataset_path = drop_columns_component(input_dataset_path=evaluation_dataset_path, columns_to_drop=['patientID', 'a1c'])

    evaluation_impute_result = evaluation_impute_data_component(
        evaluation_dataset_path=processed_evaluation_dataset_path.output,
        imputation_params_path=imputation_params_path
    )

    evaluation_winsorize_result = evaluation_winsorize_component(
        evaluation_dataset_path=evaluation_impute_result.outputs['imputed_evaluation_dataset_path'],
        winsorize_params_path=winsorize_params_path
    )

    evaluation_log_transform_result = evaluation_log_transform_component(
        evaluation_dataset_path=evaluation_winsorize_result.outputs['winsorized_evaluation_dataset_path']
    )

    evaluation_boxcox_transform_result = evaluation_boxcox_transform_component(
        evaluation_dataset_path=evaluation_log_transform_result.outputs['log_transformed_evaluation_dataset_path'],
        boxcox_transformers_path=boxcox_transformers_path
    )

    evaluation_binning_result = evaluation_binning_component(
        evaluation_dataset_path=evaluation_boxcox_transform_result.outputs['boxcox_transformed_evaluation_dataset_path'],
        bin_edges_path=bin_edges_path
    )

    evaluation_feature_combine_result = evaluation_feature_combine_component(
        evaluation_dataset_path=evaluation_binning_result.outputs['binned_evaluation_dataset_path']
    )

    evaluation_encoding_result = evaluation_encoding_component(
        evaluation_dataset_path=evaluation_feature_combine_result.outputs['combined_evaluation_dataset_path']
    )

    evaluation_scaling_result = evaluation_scaling_component(
        evaluation_dataset_path=evaluation_encoding_result.outputs['encoded_evaluation_dataset_path'],
        scaler_path=scaler_path
    )

    # Prediction component
    prediction_result = prediction_component(
        evaluation_dataset_path=evaluation_scaling_result.outputs['scaled_evaluation_features_path'],
        original_evaluation_dataset_path=evaluation_dataset_path,
        model_path=model_path
    )

# Compile and run Inference Pipeline

In [139]:
from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func=chd_prediction_inference_pipeline,
    package_path='chd_prediction_inference_pipeline.json'
)

pipeline_job = aiplatform.PipelineJob(
    display_name='chd-prediction-inference-pipeline',
    template_path='chd_prediction_inference_pipeline.json',
    pipeline_root='gs://ise543-final-project-yfang',
    enable_caching=True,
    parameter_values={
        'evaluation_dataset_path': 'gs://ise543-final-project-yfang/Final-Project-Evaluation-Dataset.csv'
    }
)

pipeline_job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/990976262210/locations/us-central1/pipelineJobs/chd-prediction-inference-pipeline-20240504050147
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/990976262210/locations/us-central1/pipelineJobs/chd-prediction-inference-pipeline-20240504050147')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/chd-prediction-inference-pipeline-20240504050147?project=990976262210
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/990976262210/locations/us-central1/pipelineJobs/chd-prediction-inference-pipeline-20240504050147 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:Pi