In [1]:
! pip install kfp

Collecting kfp
  Downloading kfp-2.11.0.tar.gz (345 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/345.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━[0m [32m266.2/345.4 kB[0m [31m7.8 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m345.4/345.4 kB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting kfp-pipeline-spec==0.6.0 (from kfp)
  Downloading kfp_pipeline_spec-0.6.0-py3-none-any.whl.metadata (293 bytes)
Collecting kfp-server-api<2.4.0,>=2.1.0 (from kfp)
  Downloading kfp_server_api-2.3.0.tar.gz (84 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.0/84.0 kB[0m [31m10.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting kubernetes<31,>=8.0.0 (from kfp)
  Downloading kubernetes-30.1.0-py2.py3-none-any.whl.metadat

In [2]:
!pip install google-cloud-pipeline-components

Collecting google-cloud-pipeline-components
  Downloading google_cloud_pipeline_components-2.18.0-py3-none-any.whl.metadata (6.0 kB)
Collecting kfp<2.11.0,>=2.6.0 (from google-cloud-pipeline-components)
  Downloading kfp-2.10.1.tar.gz (343 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m343.6/343.6 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting kfp-pipeline-spec==0.5.0 (from kfp<2.11.0,>=2.6.0->google-cloud-pipeline-components)
  Downloading kfp_pipeline_spec-0.5.0-py3-none-any.whl.metadata (293 bytes)
Downloading google_cloud_pipeline_components-2.18.0-py3-none-any.whl (1.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m46.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading kfp_pipeline_spec-0.5.0-py3-none-any.whl (9.1 kB)
Building wheels for collected packages: kfp
  Building wheel for kfp (setup.py) ... [?25l[?25hdone
  Created wheel for kfp: filename=kfp-2

In [3]:
!pip install gcsfs



In [4]:
!pip install fsspec



In [5]:
pip install scikit-learn



In [6]:
project_id = 'gcp-final-project-444704'
location = 'us-central1'

In [7]:
from google.cloud import aiplatform
aiplatform.init(project=project_id, location=location)

from kfp.v2.dsl import pipeline, component, InputPath, OutputPath, Dataset
import joblib
import gcsfs
import fsspec
import pandas as pd
import numpy as np

  from kfp.v2.dsl import pipeline, component, InputPath, OutputPath, Dataset


In [8]:
 from kfp.v2.dsl import pipeline, component, InputPath, OutputPath

# Step 1: Initial Data Preparation (Reuse the same component as in training)
@component(packages_to_install=["pandas", "numpy", "fsspec", "gcsfs"])
def initial_data_preparation(
    input_dataset_path: str,  # Input: Raw dataset path
    prepared_dataset_path: OutputPath("Dataset")  # Output: Cleaned and prepared dataset path
):
    import pandas as pd
    import numpy as np

    # Load dataset
    df = pd.read_csv(input_dataset_path)

    # Handle missing values
    df['Exercise Frequency'].fillna('None', inplace=True)
    df['Type of Treatment'].fillna('None', inplace=True)
    df['Number of Prior Visits'].fillna(df['Number of Prior Visits'].median(), inplace=True)
    df['Medications Prescribed'].fillna(df['Medications Prescribed'].mean(), inplace=True)

    # Handle outliers
    df['Age'] = df['Age'].clip(upper=df['Age'].quantile(0.99))
    df['Adjusted Weight (kg)'] = df['Adjusted Weight (kg)'].clip(upper=df['Adjusted Weight (kg)'].quantile(0.95))
    df['Length of Stay'] = np.log1p(df['Length of Stay'])
    df['Number of Prior Visits'] = df['Number of Prior Visits'].clip(upper=df['Number of Prior Visits'].quantile(0.95))

    # Drop unnecessary columns
    columns_to_drop = ['Hospital ID', 'Weight (kg)']
    df = df.drop(columns=columns_to_drop)

    # Save the prepared dataset
    df.to_csv(prepared_dataset_path, index=False)

# Step 2: Perform Validation (Normalize the test dataset using the scaler from training pipeline)
@component(packages_to_install=["pandas", "numpy", "joblib", "gcsfs", "scikit-learn"])
def perform_validation(
    validation_dataset_path: InputPath("Dataset"),  # Input: Raw test dataset
    scaler_path: str,                              # Input: Path to saved scaler
    normalized_validation_dataset_path: OutputPath("Dataset")  # Output: Processed test dataset
):
    import pandas as pd
    import joblib
    import gcsfs

    # Load validation dataset
    df_validate = pd.read_csv(validation_dataset_path)

    # Extract PatientID for later use
    if 'PatientID' in df_validate.columns:
        patient_ids = df_validate['PatientID']
    else:
        raise ValueError("PatientID column is missing in the input dataset.")

    # Drop PatientID temporarily for transformations
    df_validate = df_validate.drop(columns=['PatientID'])

    # Load the scaler
    fs = gcsfs.GCSFileSystem()
    with fs.open(scaler_path, 'rb') as f:
        scaler = joblib.load(f)

    # Categorical encoding: Binary encoding for 'Gender' and 'Smoker'
    df_validate['Gender'] = df_validate['Gender'].map({'Male': 0, 'Female': 1})
    df_validate['Smoker'] = df_validate['Smoker'].astype(int)

    # One-hot encoding for categorical variables
    categorical_columns = ['Ethnicity', 'Diet Type', 'Type of Treatment', 'Exercise Frequency']
    df_validate = pd.get_dummies(df_validate, columns=categorical_columns, drop_first=False)

    # Ensure numerical columns match the scaler's expected input
    numerical_columns = ['Height (m)', 'BMI', 'Adjusted Weight (kg)', 'Number of Prior Visits',
                         'Medications Prescribed', 'Length of Stay']

    # Apply scaling to numerical features
    df_validate[numerical_columns] = scaler.transform(df_validate[numerical_columns])

    # Match column order used in training pipeline
    #expected_columns = scaler.feature_names_in_  # Retrieve expected column order from the scaler
    #df_validate = df_validate.reindex(columns=expected_columns, fill_value=0)

    # Reattach the PatientID column at the start
    df_validate.insert(0, 'PatientID', patient_ids)

    # Save the normalized validation dataset
    df_validate.to_csv(normalized_validation_dataset_path, index=False)




@component(packages_to_install=["pandas", "joblib", "gcsfs", "scikit-learn", "xgboost"])
def perform_predictions(
    dataset_for_prediction_path: InputPath("Dataset"),
    model_path: str,
    predictions_path: OutputPath("Dataset"),
    gcs_dump_path: str  # Additional input: Path to dump the test data
):
    import pandas as pd
    import joblib
    import gcsfs

    # Load the dataset for predictions
    df = pd.read_csv(dataset_for_prediction_path)

    # Extract the PatientID column for output
    if 'PatientID' in df.columns:
        patient_ids = df['PatientID']
        features = df.drop(columns=['PatientID'])
    else:
        raise ValueError("PatientID column is missing in the input dataset.")

    # Align columns to match the model's training features
    fs = gcsfs.GCSFileSystem()
    with fs.open(model_path, 'rb') as f:
        model = joblib.load(f)
    expected_columns = model.feature_names_in_
    features = features.reindex(columns=expected_columns, fill_value=0)

    # Dump the testing data to GCS for review
    with fs.open(f"{gcs_dump_path}/xgb_test_data.csv", 'w') as f:
        features.to_csv(f, index=False)
    print("Testing DataFrame dumped to GCS.")

    # Print a preview of the data
    print("Testing DataFrame (features) Preview:")
    print(features.head())

    # Make predictions
    predictions = model.predict(features)

    # Save predictions
    output_df = pd.DataFrame({
        'PatientID': patient_ids,
        'readmission_prediction': predictions
    })
    output_df.to_csv(predictions_path, index=False)


# Step 4: Define the Inference Pipeline
@pipeline(name="healthcare-readmissions-inference-pipeline")
def healthcare_inference_pipeline(
    input_test_dataset_path: str,   # Input: Test dataset
    scaler_uri: str,                # Input: Scaler from training pipeline
    model_uri: str                  # Input: Model from training pipeline
):
    # Step 1: Prepare the test dataset
    prepared_test_data = initial_data_preparation(
        input_dataset_path=input_test_dataset_path
    )

    # Step 2: Normalize the test dataset
    normalized_test_data = perform_validation(
        validation_dataset_path=prepared_test_data.outputs["prepared_dataset_path"],
        scaler_path=scaler_uri
    )

    # Step 3: Perform predictions
    perform_predictions(
    dataset_for_prediction_path=normalized_test_data.outputs["normalized_validation_dataset_path"],
    model_path=model_uri,
    gcs_dump_path="gs://healthcare_readmissions/debugdatasets"  # Replace with your GCS bucket path
)


# Compile and Run the Pipeline
from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func=healthcare_inference_pipeline,
    package_path="healthcare_inference_pipeline.json"
)

# Run the pipeline
pipeline_job = aiplatform.PipelineJob(
    display_name="healthcare-readmissions-inference-pipeline",
    template_path="healthcare_inference_pipeline.json",
    pipeline_root="gs://healthcare_readmissions",
    parameter_values={
        "input_test_dataset_path": "gs://healthcare_readmissions/healthcare_readmissions_dataset_test.csv",
        "scaler_uri": "gs://healthcare_readmissions/853819008626/healthcare-readmissions-training-pipeline-20241216091749/split-and-preprocess-data_820267594517839872/scaler_path",  # Update with the actual scaler path from training
        "model_uri": "gs://healthcare_readmissions/853819008626/healthcare-readmissions-training-pipeline-20241216091749/train-model_-8403104442336935936/trained_model_path"  # Update with the actual model path from training
    },
    enable_caching=True
)

pipeline_job.run()





  return component_factory.create_component_from_func(
INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/853819008626/locations/us-central1/pipelineJobs/healthcare-readmissions-inference-pipeline-20241218004423
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/853819008626/locations/us-central1/pipelineJobs/healthcare-readmissions-inference-pipeline-20241218004423')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/healthcare-readmissions-inference-pipeline-20241218004423?project=853819008626
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/853819008626/locations/us-central1/pipelineJobs/healthcare-readmissions-inference-pipeline-20241218004423 curren