In [1]:
! pip install kfp



In [2]:
!pip install google-cloud-pipeline-components



In [3]:
!pip install gcsfs



In [12]:
!pip install fsspec



In [13]:
!pip install scikit-learn



In [14]:
# Set parameters
project_id = 'final-project-ise-543'
location = 'us-central1'

In [22]:
from google.cloud import aiplatform
aiplatform.init(project=project_id, location=location)

from kfp.v2.dsl import pipeline, component, component, InputPath, OutputPath, Dataset
import joblib
import gcsfs
import fsspec
import pandas as pd
import numpy as np

In [28]:
imputed_age_artifact_path = "gs://final-project-ise-543/67845614716/final-project-pipeline-20240501022509/fill-missing-values_8923059745900724224/executor_output.json"
model_path = 'gs://final-project-ise-543/67845614716/final-project-pipeline-20240501022509/train-decision-tree_-8658993199353692160/trained_model_artifact/decision_tree_model.joblib'

In [29]:
imputed_age_artifact_path = pd.read_json(imputed_age_artifact_path).to_dict()
imputed_age_artifact_path

{'artifacts': {'average_BMI': {'artifacts': [{'name': 'projects/67845614716/locations/us-central1/metadataStores/default/artifacts/16908359639672247856',
     'uri': 'gs://final-project-ise-543/67845614716/final-project-pipeline-20240501022509/fill-missing-values_8923059745900724224/average_BMI',
     'metadata': {'value': 25.785610800131707}}]},
  'average_a1c': {'artifacts': [{'name': 'projects/67845614716/locations/us-central1/metadataStores/default/artifacts/1215537158465249095',
     'uri': 'gs://final-project-ise-543/67845614716/final-project-pipeline-20240501022509/fill-missing-values_8923059745900724224/average_a1c',
     'metadata': {'value': 4.281971983139468}}]},
  'average_chol': {'artifacts': [{'name': 'projects/67845614716/locations/us-central1/metadataStores/default/artifacts/11000477488198328641',
     'uri': 'gs://final-project-ise-543/67845614716/final-project-pipeline-20240501022509/fill-missing-values_8923059745900724224/average_chol',
     'metadata': {'value': 241

In [30]:
imputed_age_artifact_path['artifacts']['average_BMI']['artifacts'][0]['metadata']['value']
imputed_age_artifact_path['artifacts']['median_BP']['artifacts'][0]['metadata']['value']
imputed_age_artifact_path['artifacts']['average_chol']['artifacts'][0]['metadata']['value']
imputed_age_artifact_path['artifacts']['median_education']['artifacts'][0]['metadata']['value']
imputed_age_artifact_path['artifacts']['average_a1c']['artifacts'][0]['metadata']['value']
imputed_age_artifact_path['artifacts']['average_cig']['artifacts'][0]['metadata']['value']
imputed_age_artifact_path['artifacts']['average_glucose']['artifacts'][0]['metadata']['value']
imputed_age_artifact_path['artifacts']['average_heart_rate']['artifacts'][0]['metadata']['value']

75.75385119632907

In [31]:
import gcsfs
import joblib

fs = gcsfs.GCSFileSystem()

with fs.open(model_path, 'rb') as f:
    model = joblib.load(f)

model

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [32]:
from kfp.v2.dsl import InputPath, OutputPath, Dataset

@component(packages_to_install=["pandas", "numpy", "fsspec", "gcsfs"])
def perform_initial_data_preparation(input_dataset_path: str, output_dataset_path: OutputPath(Dataset)):
    import pandas as pd
    import numpy as np

    data = pd.read_csv(input_dataset_path)

    # Binning age into categories
    age_bins = [0, 35, 55, 100]  # Define age bins
    age_labels = ['Young', 'Middle-aged', 'Senior']
    data['age_group'] = pd.cut(data['age'], bins=age_bins, labels=age_labels, right=False)

    #  Binning cigarettes per day into smoker categories
    cig_bins = [-1, 0, 10, 20, float('inf')]  # Define cigarette bins
    cig_labels = ['Non-smoker', 'Light smoker', 'Moderate smoker', 'Heavy smoker']
    data['smoker_type'] = pd.cut(data['cigsPerDay'], bins=cig_bins, labels=cig_labels, right=True)

    # Log transformation of income and blood pressure, handling cases where value might be zero
    data['log_income'] = np.log(data['income'] + 1)  # Adding 1 to avoid log(0)
    data['log_sysBP'] = np.log(data['sysBP'])
    data['log_diaBP'] = np.log(data['diaBP'])

    # Perform one-hot encoding on categorical variables
    data = pd.get_dummies(data, drop_first=True)

    # Convert 'demog Customer Age' to an integer
    # df["demog Customer Age"] = df["demog Customer Age"].astype(int)

    data.to_csv(output_dataset_path, index=False)


  return component_factory.create_component_from_func(


In [33]:
from kfp.v2.dsl import Input
from kfp.v2.dsl import Model

@component(packages_to_install=["pandas"])
def impute_validation(validation_dataset_path: InputPath('Dataset'),
                        imputed_dataset_path: OutputPath('Dataset'),
                        average_cig: float,
                        median_BP: float,
                        median_education: float,
                        average_chol: float,
                        average_BMI:float,
                        average_glucose: float,
                        average_a1c: float,
                        average_heart_rate: float):
    import pandas as pd
    # Load the test dataset
    df = pd.read_csv(validation_dataset_path)

    # Impute missing values in the 'Glucose' column with the provided median value
    df['cigsPerDay'].fillna(average_cig, inplace=True)
    df['BPMeds'].fillna(median_BP, inplace=True)
    df['education'].fillna(median_education, inplace=True)
    df['totChol'].fillna(average_chol, inplace=True)
    df['BMI'].fillna(average_BMI, inplace=True)
    df['glucose'].fillna(average_glucose, inplace=True)
    df['a1c'].fillna(average_a1c, inplace=True)
    df['heartRate'].fillna(average_heart_rate, inplace=True)
    # Save the imputed dataframe to the output path
    df.to_csv(imputed_dataset_path, index=False)

In [34]:
@component(packages_to_install=["pandas", "numpy", "scikit-learn", "joblib", "fsspec", "gcsfs"])
def perform_predictions(dataset_for_prediction_path: InputPath('Dataset'),
                   model_path: str,
                   predictions_path: OutputPath('Dataset')):

    import pandas as pd
    import joblib
    import gcsfs

    # Create a GCS file system object
    import gcsfs
    import joblib

    fs = gcsfs.GCSFileSystem()

    with fs.open(model_path, 'rb') as f:
        trained_model = joblib.load(f)
      # best_estimator_ = trained_model

    # Access the individual base estimators of the BaggingClassifier
    # Load the test dataset
    pred_df = pd.read_csv(dataset_for_prediction_path)

    # Make predictions
    y_pred = trained_model.predict(pred_df.drop(['patientID'], axis=1))

    # Convert the predictions to a dataframe
    pred_df = pd.DataFrame(pred_df['patientID'])
    pred_df['pred'] = y_pred
    pred_df = pred_df[['patientID', 'pred']]

    # Save the predictions
    pred_df.to_csv(predictions_path, index=False)

In [40]:
from kfp.v2.dsl import pipeline, Output, Dataset
imputed_artifact_path = "gs://final-project-ise-543/67845614716/final-project-pipeline-20240501022509/fill-missing-values_8923059745900724224/executor_output.json"
model_path = 'gs://final-project-ise-543/67845614716/final-project-pipeline-20240501022509/train-decision-tree_-8658993199353692160/trained_model_artifact/decision_tree_model.joblib'
training_bmi_median = imputed_age_artifact_path['artifacts']['average_BMI']['artifacts'][0]['metadata']['value']
imputed_bp_dictionary=imputed_age_artifact_path['artifacts']['median_BP']['artifacts'][0]['metadata']['value']
imputed_chol_dictionary=imputed_age_artifact_path['artifacts']['average_chol']['artifacts'][0]['metadata']['value']
imputed_edu_dictionary=imputed_age_artifact_path['artifacts']['median_education']['artifacts'][0]['metadata']['value']
imputed_a1c_dictionary=imputed_age_artifact_path['artifacts']['average_a1c']['artifacts'][0]['metadata']['value']
imputed_cig_dictionary=imputed_age_artifact_path['artifacts']['average_cig']['artifacts'][0]['metadata']['value']
imputed_glucose_dictionary=imputed_age_artifact_path['artifacts']['average_glucose']['artifacts'][0]['metadata']['value']
imputed_hr_dictionary=imputed_age_artifact_path['artifacts']['average_heart_rate']['artifacts'][0]['metadata']['value']
@pipeline(name='final-inference-pipeline')
def final_inference_pipeline(dataset_for_predictions_path: str,
                               training_bmi_median: float = training_bmi_median,
                               imputed_bp_dictionary: float = imputed_bp_dictionary,
                               imputed_chol_dictionary: float = imputed_chol_dictionary,
                               imputed_edu_dictionary: float = imputed_edu_dictionary,
                               imputed_a1c_dictionary: float = imputed_a1c_dictionary,
                               imputed_cig_dictionary: float = imputed_cig_dictionary,
                               imputed_glucose_dictionary: float = imputed_glucose_dictionary,
                               imputed_hr_dictionary: float = imputed_hr_dictionary,
                               model_uri: str = model_path):

    # Process dataset - initial data preparation
    initial_prepared_dataset = perform_initial_data_preparation(input_dataset_path=dataset_for_predictions_path)

    # Impute age
    imputed_dataset = impute_validation(
       validation_dataset_path=initial_prepared_dataset.outputs['output_dataset_path'],
                        average_cig = imputed_cig_dictionary,
                        median_BP = imputed_bp_dictionary,
                        median_education = imputed_edu_dictionary,
                        average_chol = imputed_chol_dictionary,
                        average_BMI = training_bmi_median,
                        average_glucose =  imputed_glucose_dictionary,
                        average_a1c = imputed_a1c_dictionary,
                        average_heart_rate = imputed_hr_dictionary
    )

    perform_predictions(
        dataset_for_prediction_path=imputed_dataset.outputs['imputed_dataset_path'],
        model_path=model_uri
    )

In [41]:
from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func=final_inference_pipeline,
    package_path = 'final_inference_pipeline.json'
)

pipeline_job = aiplatform.PipelineJob(
    display_name='final_inference_pipeline',
    template_path='final_inference_pipeline.json',
    pipeline_root='gs://final-project-ise-543',
    parameter_values={
        'dataset_for_predictions_path': 'gs://unscored_data/Final Project Evaluation Dataset.csv'
    },
    enable_caching=True
)

In [42]:
pipeline_job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/67845614716/locations/us-central1/pipelineJobs/final-inference-pipeline-20240501050200
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/67845614716/locations/us-central1/pipelineJobs/final-inference-pipeline-20240501050200')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/final-inference-pipeline-20240501050200?project=67845614716
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/67845614716/locations/us-central1/pipelineJobs/final-inference-pipeline-20240501050200 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/67845614716/locations