# VS-Bank Inference Pipeline

## Install required packages

In [None]:
! pip install kfp



In [None]:
!pip install google-cloud-pipeline-components



In [None]:
!pip install gcsfs



In [None]:
!pip install fsspec



In [None]:
!pip install scikit-learn



#  Set parameters, initialize aiplatform client library, and import needed nlibraries

In [None]:
# Set parameters
project_id = 'ise543-module7-homework-418819'
location = 'us-central1'

In [None]:
from google.cloud import aiplatform
aiplatform.init(project=project_id, location=location)

from kfp.v2.dsl import pipeline, component, component, InputPath, OutputPath, Dataset
import joblib
import gcsfs
import fsspec
import pandas as pd
import numpy as np

  from kfp.v2.dsl import pipeline, component, component, InputPath, OutputPath, Dataset


## Configure and test connections to training pipeline

In [None]:
iqr_artifact_path = "gs://finalproject_ise543/753516815850/fp-pipeline-20240501210458/outlier-training_-2892169060877860864/executor_output.json"
imputed_artifact_path = "gs://finalproject_ise543/753516815850/fp-pipeline-20240501210458/impute-training_7484124480583761920/executor_output.json"
scaler_path = 'gs://finalproject_ise543/753516815850/fp-pipeline-20240501210458/normalise-training_1719516957549527040/scaler_path'
model_path = 'gs://finalproject_ise543/753516815850/fp-pipeline-20240501210458/train-model_1143056205246103552/trained_model_artifact'


### Test access to IQR values

In [None]:
iqr_dictionary = pd.read_json(iqr_artifact_path).to_dict()
iqr_dictionary

{'artifacts': {'iqr_values': {'artifacts': [{'name': 'projects/753516815850/locations/us-central1/metadataStores/default/artifacts/15269864420767101183',
     'uri': 'gs://finalproject_ise543/753516815850/fp-pipeline-20240501210458/outlier-training_-2892169060877860864/iqr_values',
     'metadata': {'columns': 'patientID, male, age, education, currentSmoker, cigsPerDay, BPMeds, prevalentStroke, prevalentHyp, diabetes, totChol, sysBP, diaBP, BMI, heartRate, TenYearCHD, a1c, income_log, education_2, education_3, education_4, education_5',
      'Q1': {'patientID': 332212.25,
       'male': 0.0,
       'age': 42.0,
       'education': 1.0,
       'currentSmoker': 0.0,
       'cigsPerDay': 10.0,
       'BPMeds': 0.0,
       'prevalentStroke': 0.0,
       'prevalentHyp': 0.0,
       'diabetes': 0.0,
       'totChol': 205.0,
       'sysBP': 117.0,
       'diaBP': 75.0,
       'BMI': 23.03,
       'heartRate': 68.0,
       'TenYearCHD': 0.0,
       'a1c': 3.733771801235036,
       'income_log

#### Extract parameter value from Python dictionary

In [None]:
Q1 = pd.Series(iqr_dictionary['artifacts']['iqr_values']['artifacts'][0]['metadata']['Q1'])
Q3 = pd.Series(iqr_dictionary['artifacts']['iqr_values']['artifacts'][0]['metadata']['Q3'])
IQR = pd.Series(iqr_dictionary['artifacts']['iqr_values']['artifacts'][0]['metadata']['IQR'])

### Test paths to Median Values for imputation

In [None]:
impute_dictionary = pd.read_json(imputed_artifact_path).to_dict()
impute_dictionary

{'artifacts': {'imputed_dataset_path': {'artifacts': [{'name': 'projects/753516815850/locations/us-central1/metadataStores/default/artifacts/10730424141939125835',
     'uri': 'gs://finalproject_ise543/753516815850/fp-pipeline-20240501210458/impute-training_7484124480583761920/imputed_dataset_path',
     'metadata': {}}]},
  'imputed_values': {'artifacts': [{'name': 'projects/753516815850/locations/us-central1/metadataStores/default/artifacts/15584009842476833504',
     'uri': 'gs://finalproject_ise543/753516815850/fp-pipeline-20240501210458/impute-training_7484124480583761920/imputed_values',
     'metadata': {'columns': 'cigsPerDay, BPMeds, totChol, BMI, heartRate, a1c',
      'medians': {'cigsPerDay': 20.0,
       'BPMeds': 0.0,
       'totChol': 233.0,
       'BMI': 25.38,
       'heartRate': 75.0,
       'a1c': 4.124756767649438}}}]}}}

In [None]:
imputed_values = pd.Series(impute_dictionary['artifacts']['imputed_values']['artifacts'][0]['metadata']['medians'])
imputed_values

cigsPerDay     20.000000
BPMeds          0.000000
totChol       233.000000
BMI            25.380000
heartRate      75.000000
a1c             4.124757
dtype: float64

###Test paths to artifacts

In [None]:
# Create a GCS file system object
fs = gcsfs.GCSFileSystem()

with fs.open(scaler_path, 'rb') as f:
    scaler = joblib.load(f)

scaler

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [None]:
# Create a GCS file system object
fs = gcsfs.GCSFileSystem()

with fs.open(model_path, 'rb') as f:
    model = joblib.load(f)

model

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


#Define components

## Common dataset preparation steps

In [None]:
from kfp.v2.dsl import InputPath, OutputPath, Dataset

@component(packages_to_install=["pandas", "numpy", "fsspec", "gcsfs"])
def perform_initial_data_preparation(input_dataset_path: str, output_dataset_path: OutputPath(Dataset)):
    import pandas as pd
    import numpy as np

    df = pd.read_csv(input_dataset_path)

    df['income_log'] = np.log1p(df['income'])
    df.drop(['income', 'glucose'], axis=1, inplace=True)


    df['education'].fillna(5, inplace=True)


    df.to_csv(output_dataset_path, index=False)

  return component_factory.create_component_from_func(


##One-hot encoding

In [None]:
from kfp.v2.dsl import InputPath

@component(packages_to_install=["pandas", "scikit-learn"])
def onehot_encoding(dataset_path: InputPath('Dataset'),
                  output_path: OutputPath('Dataset')
                  ):
    import pandas as pd
    df = pd.read_csv(dataset_path)
    df_education = pd.get_dummies(df['education'],drop_first=True)
    df_education.columns = ['education_2', 'education_3', 'education_4', 'education_5']
    df_education = df_education.astype(int)

    df = pd.concat([df, df_education], axis=1)
    df.to_csv(output_path, index=False )

##Outlier Handling

In [None]:
from kfp.v2.dsl import Input
from kfp.v2.dsl import Model

@component(packages_to_install=["pandas", "fsspec", "gcsfs"])
def outlier_test(test_dataset_path: InputPath('Dataset'),
                      test_outlier_output_path: OutputPath('Dataset'),
                      iqr_values: str):

    import pandas as pd
    import numpy as np
    # Load the test dataset
    df = pd.read_csv(test_dataset_path)


    iqr_dictionary = pd.read_json(iqr_values).to_dict()

    Q1 = pd.Series(iqr_dictionary['artifacts']['iqr_values']['artifacts'][0]['metadata']['Q1'])
    Q3 = pd.Series(iqr_dictionary['artifacts']['iqr_values']['artifacts'][0]['metadata']['Q3'])
    IQR = pd.Series(iqr_dictionary['artifacts']['iqr_values']['artifacts'][0]['metadata']['IQR'])

    # Apply outlier detection based on the IQR values
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    for col in df.columns:
        if col in ['age', 'a1c',  'cigsPerDay', 'income_log', 'totChol', 'sysBP', 'diaBP', 'BMI', 'heartRate']:
          df[col] = np.where(df[col] < lower_bound[col], lower_bound[col], df[col])
          df[col] = np.where(df[col] > upper_bound[col], upper_bound[col], df[col])


    # Save the imputed dataframe to the output path
    df.to_csv(test_outlier_output_path, index=False)

## Impute component

In [None]:
from kfp.v2.dsl import Input
from kfp.v2.dsl import Model

@component(packages_to_install=["pandas", "fsspec", "gcsfs"])
def impute_test(test_dataset_path: InputPath('Dataset'),
                      imputed_dataset_path: OutputPath('Dataset'),
                      imputed_values: str):

    import pandas as pd
    # Load the test dataset
    df = pd.read_csv(test_dataset_path)

    impute_dictionary = pd.read_json(imputed_values).to_dict()
    imputed_values = pd.Series(impute_dictionary['artifacts']['imputed_values']['artifacts'][0]['metadata']['medians'])

    # Columns to impute median values for
    columns_to_impute = ['age', 'cigsPerDay', 'BPMeds', 'totChol', 'BMI', 'heartRate', 'a1c','sysBP', 'diaBP', 'income_log']

    # Impute missing values using the provided median values
    for col in columns_to_impute:
        df[col].fillna(imputed_values[col], inplace=True)

    # Save the imputed dataframe to the output path
    df.to_csv(imputed_dataset_path, index=False)

##Normalising component

In [None]:
from kfp.v2.dsl import Input
from kfp.v2.dsl import Artifact

@component(packages_to_install=["pandas", "scikit-learn","joblib", "fsspec", "gcsfs"])
def normalise_test(
                       test_dataset_path: InputPath('Dataset'),
                       scaler_path: str,
                      normalised_test_dataset_path: OutputPath('Dataset')
                       ):

    # Load the training dataset
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    import joblib
    import gcsfs

    validation_df = pd.read_csv(test_dataset_path)

    fs = gcsfs.GCSFileSystem()

    with fs.open(scaler_path, 'rb') as f:
      scaler = joblib.load(f)

    # Define the columns to scale and the numeric columns
    columns_to_scale = ['age',  'cigsPerDay', 'totChol', 'sysBP',  'BMI', 'heartRate',  'a1c', 'income_log', 'diaBP']
    # numeric_columns = ['male', 'currentSmoker', 'BPMeds', 'prevalentStroke', 'prevalentHyp', 'diabetes']

    # Separate the columns to be scaled and the numeric columns
    X_val_scaled = validation_df[columns_to_scale]
    # X_val_numeric = validation_df[numeric_columns]
    # y_val = validation_df['TenYearCHD']

    X_validation_normalized = scaler.transform(X_val_scaled)

    validation_df[columns_to_scale] = X_validation_normalized

    # Create DataFrames for the scaled columns and the numeric columns
    # X_val_scaled_df = pd.DataFrame(X_validation_normalized, columns=columns_to_scale)
    # X_val_numeric_df = validation_df[numeric_columns]
    # y_val_df = pd.DataFrame(y_val, columns=['TenYearCHD'])

    # Concatenate the scaled columns, numeric columns, and target into a single DataFrame
    # normalised_validation_df = pd.concat([X_val_scaled_df, X_val_numeric_df, y_val_df], axis=1)

    # Save the imputed dataframe to the output path
    validation_df.to_csv(normalised_test_dataset_path, index=False)

## Perform Predictions


In [None]:
# @component(packages_to_install=["pandas", "numpy", "scikit-learn", "joblib", "fsspec", "gcsfs"])
# def perform_predictions(dataset_for_prediction_path: InputPath('Dataset'),
#                    model_path: str,
#                    predictions_path: OutputPath('Dataset')):

#     import pandas as pd
#     import joblib
#     import gcsfs

#     # Create a GCS file system object
#     fs = gcsfs.GCSFileSystem()

#     # Load the trained model
#     with fs.open(model_path, 'rb') as f:
#       trained_model = joblib.load(f)

#     # Load the test dataset
#     pred_df = pd.read_csv(dataset_for_prediction_path)

#     # Make predictions
#     y_pred = trained_model.predict(pred_df)
#     pred_df['TenYearCHD_pred'] = y_pred
#     pred_df = pred_df[['patientID', 'TenYearCHD_pred']]

#     # Save the predictions
#     pred_df.to_csv(predictions_path, index=False)

In [None]:
@component(packages_to_install=["pandas", "numpy", "scikit-learn", "joblib", "fsspec", "gcsfs"])
def perform_predictions(dataset_for_prediction_path: InputPath('Dataset'),
                   model_path: str,
                   predictions_path: str):

    import pandas as pd
    import joblib
    import gcsfs

    # Create a GCS file system object
    fs = gcsfs.GCSFileSystem()

    # Load the trained model
    with fs.open(model_path, 'rb') as f:
      trained_model = joblib.load(f)

    # Load the test dataset
    pred_df = pd.read_csv(dataset_for_prediction_path)

    # Make predictions
    y_pred = trained_model.predict(pred_df)
    pred_df['TenYearCHD_pred'] = y_pred
    pred_df = pred_df[['patientID', 'TenYearCHD_pred']]

    # Upload the local predictions to GCS
    with fs.open(predictions_path, 'rb') as local_file:
        with fs.open(predictions_path, 'wb') as gcs_file:
            gcs_file.write(local_file.read())

    # Output the local predictions path
    return predictions_path

# Define pipeline

## Define pipeline

In [None]:
from kfp.v2.dsl import pipeline, Output, Dataset
iqr_artifact_path = "gs://finalproject_ise543/753516815850/fp-pipeline-20240430182301/outlier-training_-9149797597806854144/executor_output.json"
imputed_artifact_path = "gs://finalproject_ise543/753516815850/fp-pipeline-20240430182301/impute-training_4469087675361525760/executor_output.json"
scaler_path = 'gs://finalproject_ise543/753516815850/fp-pipeline-20240430182301/normalise-training_9080773693788913664/scaler_path'
model_path = 'gs://finalproject_ise543/753516815850/fp-pipeline-20240430182301/grid-search_-2448441352279556096/best_model_path'
prediction_path = ''

@pipeline(name='fp_inference_pipeline')
def fp_inference_pipeline(input_dataset_path: str,
                               iqr_values: str = iqr_artifact_path,
                               imputed_values:  str = imputed_artifact_path,
                               scaler_path:  str = scaler_path,
                               model_path: str = model_path):

    # Process dataset - initial data preparation
    initial_prepared_dataset = perform_initial_data_preparation(input_dataset_path=input_dataset_path)

    #One-hot encoding
    one_hot_encoding = onehot_encoding(dataset_path=initial_prepared_dataset.outputs['output_dataset_path'])

    # Outlier Handling
    outlier_dataset = outlier_test(test_dataset_path=one_hot_encoding.outputs['output_path'],
                                                   iqr_values=iqr_values)

    # Process imputation
    imputed_data = impute_test(test_dataset_path=outlier_dataset.outputs['test_outlier_output_path'],
                                        imputed_values=imputed_values )

    #Process normalisation
    normalised_result = normalise_test(test_dataset_path=imputed_data.outputs['imputed_dataset_path'],
                                                        scaler_path=scaler_path)

    perform_predictions(
        dataset_for_prediction_path=normalised_result.outputs['normalised_test_dataset_path'],
        model_path=model_path
    )

#  Compile and run pipeline

In [None]:
from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func = fp_inference_pipeline,
    package_path = 'fp_inference_pipeline.json'
)

pipeline_job = aiplatform.PipelineJob(
    display_name='fp_inference_pipeline',
    template_path='fp_inference_pipeline.json',
    pipeline_root='gs://finalproject_ise543',
    parameter_values={
        'input_dataset_path': 'gs://finalproject_ise543/Final Project Evaluation Dataset - Student(1).csv'
    },
    enable_caching=True
)

pipeline_job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/753516815850/locations/us-central1/pipelineJobs/fp-inference-pipeline-20240502162234
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/753516815850/locations/us-central1/pipelineJobs/fp-inference-pipeline-20240502162234')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/fp-inference-pipeline-20240502162234?project=753516815850
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/753516815850/locations/us-central1/pipelineJobs/fp-inference-pipeline-20240502162234 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob run completed. Resource name: projects