In [1]:
json_data = {
    "artifacts": {
        "imputed_dataset_path": {
            "artifacts": [{
                "name": "projects/49985438948/locations/us-central1/metadataStores/default/artifacts/6811526190828747751",
                "uri": "gs://ise543-module8-hw-yufei/49985438948/final-project-pipeline-20240503233914/train-imputation_-3274262545669554176/imputed_dataset_path",
                "metadata": {}
            }]
        },
        "mean_a1c_dia": {
            "artifacts": [{
                "name": "projects/49985438948/locations/us-central1/metadataStores/default/artifacts/1438184060222455176",
                "uri": "gs://ise543-module8-hw-yufei/49985438948/final-project-pipeline-20240503233914/train-imputation_-3274262545669554176/mean_a1c_dia",
                "metadata": {"value": 2.1493724929514166}
            }]
        },
        "mean_a1c_nondia": {
            "artifacts": [{
                "name": "projects/49985438948/locations/us-central1/metadataStores/default/artifacts/16561020661929813690",
                "uri": "gs://ise543-module8-hw-yufei/49985438948/final-project-pipeline-20240503233914/train-imputation_-3274262545669554176/mean_a1c_nondia",
                "metadata": {"value": 1.6347599082650512}
            }]
        },
        "mean_saved": {
            "artifacts": [{
                "name": "projects/49985438948/locations/us-central1/metadataStores/default/artifacts/12794769974312219209",
                "uri": "gs://ise543-module8-hw-yufei/49985438948/final-project-pipeline-20240503233914/train-imputation_-3274262545669554176/mean_saved",
                "metadata": {"value": {"log_tot_chol": 5.45446608070892, "log_bmi": 3.2765795514207583, "heartRate": 75.75385119632907}}
            }]
        },
        "mode_cigs": {
            "artifacts": [{
                "name": "projects/49985438948/locations/us-central1/metadataStores/default/artifacts/16704505771573258107",
                "uri": "gs://ise543-module8-hw-yufei/49985438948/final-project-pipeline-20240503233914/train-imputation_-3274262545669554176/mode_cigs",
                "metadata": {"value": 20.0}
            }]
        },
        "mode_saved": {
            "artifacts": [{
                "name": "projects/49985438948/locations/us-central1/metadataStores/default/artifacts/4211559894150523597",
                "uri": "gs://ise543-module8-hw-yufei/49985438948/final-project-pipeline-20240503233914/train-imputation_-3274262545669554176/mode_saved",
                "metadata": {"value": {"BPMeds": 0.0, "education": 1.0}}
            }]
        }
    }
}

In [10]:
from kfp.v2.dsl import OutputPath, InputPath, component
from kfp.dsl import pipeline, Output, Dataset
@component(packages_to_install=['pandas','numpy','fsspec','gcsfs'])
def perform_initial_data_prep(input_dataset_path: str,
                              output_dataset_path: OutputPath('Dataset')):
    import pandas as pd
    import numpy as np
    
    df=pd.read_csv(input_dataset_path)
    
    #feature selection
    df = df.drop('glucose', axis=1)
    
    #log transform
    df['log_bmi'] = np.log(df['BMI']+1)
    df['log_tot_chol'] = np.log(df['totChol']+1)
    df['log_sysbp'] = np.log(df['sysBP']+1)
    df['log_a1c'] = np.log(df['a1c']+1)
    df=df.drop('BMI',axis=1)
    df=df.drop('totChol',axis=1)
    df=df.drop('sysBP',axis=1)
    df=df.drop('a1c',axis=1)
    
    df.to_csv(output_dataset_path, index=False)

In [3]:
# Data imputation
@component(packages_to_install=["pandas"])
def test_imputation(test_dataset_path: InputPath('Dataset'),
                    imputed_dataset_path: OutputPath('Dataset'),
                    mean_saved: dict,
                    mode_saved: dict,
                    mode_cigs: float,
                    a1c_mean_dia: float,
                    a1c_mean_nondia: float):
    import pandas as pd
    
    # Load test dataset
    df = pd.read_csv(test_dataset_path)
    
    # Apply mean and mode imputations
    for col, value in mean_saved.items():
        df[col].fillna(value, inplace=True)
    for col, value in mode_saved.items():
        df[col].fillna(value, inplace=True)
    df.loc[(df['cigsPerDay'].isnull()) & (df['currentSmoker'] == 0), 'cigsPerDay'] = 0
    df.loc[(df['cigsPerDay'].isnull()) & (df['currentSmoker'] == 1), 'cigsPerDay'] = mode_cigs

    # Apply A1C imputation
    df.loc[(df['log_a1c'].isnull()) & (df['diabetes'] == 1), 'log_a1c'] = a1c_mean_dia
    df.loc[(df['log_a1c'].isnull()) & (df['diabetes'] == 0), 'log_a1c'] = a1c_mean_nondia
    
    # Save the imputed dataframe
    df.to_csv(imputed_dataset_path, index=False)

In [4]:
# Data engineering
@component(packages_to_install=["pandas", "numpy", "scikit-learn","joblib",'fsspec','gcsfs'])
def test_engineering(test_dataset_path: InputPath('Dataset'),
                     updated_test_path: OutputPath('Dataset'),
                     scaler_path: str,
                     cluster_path: str,
                     drop_originals: bool = True):
    
    import pandas as pd
    import joblib
    from sklearn.preprocessing import StandardScaler
    from sklearn.cluster import KMeans
    import gcsfs

    # Load test dataset
    df_test = pd.read_csv(test_dataset_path)
    
    fs=gcsfs.GCSFileSystem()

    # Load scaler and k-means models
    with fs.open(scaler_path,'rb') as f:
        scaler = joblib.load(f)
    with fs.open(cluster_path,'rb') as f:
        kmeans = joblib.load(f)    

    # Apply scaling
    features_to_scale = ['age', 'education', 'income']
    data_scaled = scaler.transform(df_test[features_to_scale])

    # Apply clustering
    clusters = kmeans.predict(data_scaled)
    df_test['Demographic Cluster'] = clusters

    # Optionally drop original features
    if drop_originals:
        df_test.drop(features_to_scale, axis=1, inplace=True)

    # Save the updated dataframe
    df_test.to_csv(updated_test_path, index=False)

In [28]:
#prediction
@component(packages_to_install=["pandas", "numpy", "scikit-learn","joblib",'fsspec','gcsfs',"xgboost"])
def perform_prediction(dataset_for_predict:InputPath('Dataset'),
                       model_path:str,
                       predictions_path:OutputPath('Dataset')
                      ):
    import pandas as pd
    import joblib
    from sklearn.ensemble import VotingClassifier
    import gcsfs

    fs=gcsfs.GCSFileSystem()
    
    # Load the test dataset
    pred_df = pd.read_csv(dataset_for_predict)
    pred_df_x=pred_df.drop('patientID',axis=1)


    # Load the ensemble model
    with fs.open(model_path,'rb') as f:
        trainde_model=joblib.load(f)

    # Make predictions
    y_pred = trainde_model.predict(pred_df_x)
    pred_df['pred_TenYearCHD']=y_pred
    pred_df=pred_df[['patientID','pred_TenYearCHD']]
    
    pred_df.to_csv(predictions_path, index=False)

  return component_factory.create_component_from_func(


In [29]:
# load the path
kmean_cluster_path='gs://ise543-module8-hw-yufei/49985438948/final-project-pipeline-20240503233914/scale-and-cluster-train-data_-2121341041062707200/kmeans_path'
scaler_path='gs://ise543-module8-hw-yufei/49985438948/final-project-pipeline-20240503233914/scale-and-cluster-train-data_-2121341041062707200/scaler_path'
model_path='gs://ise543-module8-hw-yufei/49985438948/final-project-pipeline-20240503234919/train-voting-ensemble_5219526351551201280/output_ensemble_model'

a1c_mean_dia = json_data['artifacts']['mean_a1c_dia']['artifacts'][0]['metadata']['value']
a1c_mean_nondia = json_data['artifacts']['mean_a1c_nondia']['artifacts'][0]['metadata']['value']
mean_saved = json_data['artifacts']['mean_saved']['artifacts'][0]['metadata']['value']
mode_cigs = json_data['artifacts']['mode_cigs']['artifacts'][0]['metadata']['value']
mode_saved = json_data['artifacts']['mode_saved']['artifacts'][0]['metadata']['value']

@pipeline(name='final_inference_pipeline')
def final_inference_pipeline(dataset_for_prediction: str,
                             a1c_mean_dia: float=a1c_mean_dia,
                             a1c_mean_nondia: float=a1c_mean_nondia,
                             mean_saved: dict=mean_saved,
                             mode_cigs: float=mode_cigs,
                             mode_saved: dict=mode_saved,
                             cluster_path: str=kmean_cluster_path,
                             scaler_path: str=scaler_path,
                             model_path: str=model_path
                             ):

    initial_pre_dataset=perform_initial_data_prep(input_dataset_path=dataset_for_prediction)
    
    imputed_data=test_imputation(test_dataset_path=initial_pre_dataset.output,
                                 mean_saved=mean_saved,
                                 mode_saved=mode_saved,
                                 mode_cigs=mode_cigs,
                                 a1c_mean_dia=a1c_mean_dia,
                                 a1c_mean_nondia=a1c_mean_nondia
                                )
    
    engineered_data=test_engineering(test_dataset_path=imputed_data.output,
                                     scaler_path=scaler_path,
                                     cluster_path=cluster_path
                                    )
    
    prediction_perform=perform_prediction(dataset_for_predict=engineered_data.output,
                                          model_path=model_path)    

In [30]:
from kfp.v2 import compiler
from google.cloud import aiplatform

project_id = 'ise543-module8-421223'
location = 'us-central1'
aiplatform.init(project=project_id, location=location)

compiler.Compiler().compile(
    pipeline_func=final_inference_pipeline,
    package_path='final_inference_pipeline.json'
)


pipeline_job = aiplatform.PipelineJob(
    display_name='final_inference_pipeline',
    template_path='final_inference_pipeline.json',
    pipeline_root='gs://ise543-module8-hw-yufei',
    parameter_values={
        'dataset_for_prediction': 'gs://ise543-module8-hw-yufei/Final Project Evaluation Dataset - Student.csv'},      
    enable_caching=True
)

In [31]:
pipeline_job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/49985438948/locations/us-central1/pipelineJobs/final-inference-pipeline-20240506180434
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/49985438948/locations/us-central1/pipelineJobs/final-inference-pipeline-20240506180434')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/final-inference-pipeline-20240506180434?project=49985438948
PipelineJob projects/49985438948/locations/us-central1/pipelineJobs/final-inference-pipeline-20240506180434 current state:
3
PipelineJob projects/49985438948/locations/us-central1/pipelineJobs/final-inference-pipeline-20240506180434 current state:
3
PipelineJob projects/49985438948/locations/us-central1/pipelineJobs/final-inference-pipeline-20240506180434 current state:
3
PipelineJob projects/49985438948/locations/us-central1/pipelineJobs/final-inference-pipeline-20240506180434 current state:
3
Pip

In [35]:
score_data=pd.read_csv('./score dataset.csv')

In [36]:
score_data

Unnamed: 0,patientID,pred_TenYearCHD
0,110399,0
1,189047,0
2,957019,0
3,208967,0
4,230935,0
...,...,...
419,186677,0
420,462501,1
421,802256,0
422,516993,1
