## Installation
Install the packages required for executing this notebook.

In [1]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform



## Restart the kernel
Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [2]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Check the versions of the packages you installed. The KFP SDK version should be >2.

In [312]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 2.3.0
google-cloud-aiplatform==1.35.0
google_cloud_pipeline_components version: 2.4.1


In [387]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.types import artifact_types

#### Project and Pipeline Configurations

In [388]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "your-project-id"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://your-temp_de2023-bucket"

#### Pipeline Component : Data Ingestion

In [389]:
@dsl.component(
    packages_to_install=["pandas","google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def download_data(project_id: str, bucket: str, file_name: str, dataset: Output[Dataset]):
    '''download data'''
    from google.cloud import storage
    import pandas as pd
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Downloaing the file from a google bucket 
    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket)
    blob = bucket.blob(file_name)
    blob.download_to_filename(dataset.path + ".json")
    logging.info('Downloaded Data!')

#### Pipeline Component : Train and Test split

In [390]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn"],
    base_image="python:3.10.7-slim"
)
def train_test_split(dataset: Input[Dataset], X_train: Output[Dataset], X_test: Output[Dataset], Y_train: Output[Dataset], Y_test: Output[Dataset]):
    '''train_test_split'''
    import pandas as pd
    import logging 
    import sys
    from sklearn.model_selection import train_test_split
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    data = pd.read_json(dataset.path + ".json", orient='records')
    data_values = data.values
    
    X = data_values[:, 0:6]
    Y = data_values[:, 6]
    
    x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=42)
    
    logging.info(x_train.shape, y_train.shape, x_test.shape, y_test.shape)
    
    pd.DataFrame(x_train).to_json(X_train.path + '.json')
    pd.DataFrame(x_test).to_json(X_test.path + '.json')
    pd.DataFrame(y_train).to_json(Y_train.path + '.json')
    pd.DataFrame(y_test).to_json(Y_test.path + '.json')

#### Pipeline Component : Training Linear Regression

In [392]:
@dsl.component(
    packages_to_install=['pandas', 'scikit-learn', 'tensorflow', 'h5py'],
    base_image="python:3.10.7-slim"
)
def train_lr (X_train: Input[Dataset], Y_train: Input[Dataset], X_test: Input[Dataset], Y_test: Input[Dataset], out_model: Output[Model]) -> NamedTuple('outputs', metrics=dict):
    '''train a MLP with default parameters'''
    import pandas as pd
    from sklearn.linear_model import LinearRegression
    from sklearn import metrics
    import json
    import logging 
    import sys
    import os
    import pickle
        
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df_x_train = pd.read_json(X_train.path + ".json") 
    df_y_train = pd.read_json(Y_train.path + ".json")     
    df_x_test = pd.read_json(X_test.path + ".json") 
    df_y_test = pd.read_json(Y_test.path + ".json")
        
    # define model
    model_lr = LinearRegression()
    
    logging.info(df_x_train.shape, df_y_train.shape, df_x_test.shape, df_y_test.shape)
    
    # Fit the model
    model_lr.fit(df_x_train, df_y_train)
    
    y_pred = model_lr.predict(df_x_test)
    # evaluate the model
    scores_1 = metrics.r2_score(df_y_test, y_pred)
    scores_2 = metrics.mean_absolute_error(df_y_test, y_pred)
    metrics_dict = {
        "R^2": scores_1,
        "Mean absolute error": scores_2
    }
    
    logging.info(metrics_dict)  
    
    out_model.metadata["algo"] = "lr"
    
    # Save the model
    m_file = out_model.path
    with open(m_file, 'wb') as file:  
        pickle.dump(model_lr, file)
    outputs = NamedTuple('outputs', metrics=dict)
    return outputs(metrics_dict)     

#### Pipeline Component : Training Random Forest Regressor

In [393]:
@dsl.component(
    packages_to_install=['pandas', 'scikit-learn', 'tensorflow', 'h5py'],
    base_image="python:3.10.7-slim"
)
def train_rf (X_train: Input[Dataset], Y_train: Input[Dataset], X_test: Input[Dataset], Y_test: Input[Dataset], out_model: Output[Model]) -> NamedTuple('outputs', metrics=dict):
    '''train a MLP with default parameters'''
    import pandas as pd
    from sklearn.ensemble import RandomForestRegressor
    from sklearn import metrics
    import json
    import logging 
    import sys
    import os
    import pickle
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df_x_train = pd.read_json(X_train.path + ".json") 
    df_y_train = pd.read_json(Y_train.path + ".json")     
    df_x_test = pd.read_json(X_test.path + ".json") 
    df_y_test = pd.read_json(Y_test.path + ".json")
        
    # define model
    model_rf = RandomForestRegressor()
    
    logging.info(df_x_train.shape, df_y_train.shape, df_x_test.shape, df_y_test.shape)
    
    # Fit the model
    model_rf.fit(df_x_train, df_y_train)
    
    y_pred = model_rf.predict(df_x_test)
    # evaluate the model
    scores_1 = metrics.r2_score(df_y_test, y_pred)
    scores_2 = metrics.mean_absolute_error(df_y_test, y_pred)
    metrics_dict = {
        "R^2": scores_1,
        "Mean absolute error": scores_2
    }
    
    logging.info(metrics_dict)  
    
    out_model.metadata["algo"] = "rf"
    
    # Save the model
    m_file = out_model.path
    with open(m_file, 'wb') as file:  
        pickle.dump(model_rf, file)
    outputs = NamedTuple('outputs', metrics=dict)
    return outputs(metrics_dict)

#### Pipeline Component: Training Gradient Boosting Regressor

In [394]:
@dsl.component(
    packages_to_install=['pandas', 'scikit-learn', 'tensorflow', 'h5py'],
    base_image="python:3.10.7-slim"
)
def train_gr (X_train: Input[Dataset], Y_train: Input[Dataset], X_test: Input[Dataset], Y_test: Input[Dataset], out_model: Output[Model]) -> NamedTuple('outputs', metrics=dict):
    '''train a MLP with default parameters'''
    import pandas as pd
    from sklearn.ensemble import GradientBoostingRegressor
    from sklearn import metrics
    import json
    import logging 
    import sys
    import os
    import pickle
        
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df_x_train = pd.read_json(X_train.path + ".json") 
    df_y_train = pd.read_json(Y_train.path + ".json")     
    df_x_test = pd.read_json(X_test.path + ".json") 
    df_y_test = pd.read_json(Y_test.path + ".json")
        
    # define model
    model_gr = GradientBoostingRegressor()
    
    logging.info(df_x_train.shape, df_y_train.shape, df_x_test.shape, df_y_test.shape)
    
    # Fit the model
    model_gr.fit(df_x_train, df_y_train)
    
    y_pred = model_gr.predict(df_x_test)
    # evaluate the model
    scores_1 = metrics.r2_score(df_y_test, y_pred)
    scores_2 = metrics.mean_absolute_error(df_y_test, y_pred)
    metrics_dict = {
        "R^2": scores_1,
        "Mean absolute error": scores_2
    }
    
    logging.info(metrics_dict)  
    
    out_model.metadata["algo"] = "gr"
    
    # Save the model
    m_file = out_model.path
    with open(m_file, 'wb') as file:  
        pickle.dump(model_gr, file)
    outputs = NamedTuple('outputs', metrics=dict)
    return outputs(metrics_dict)

#### Pipeline Component : Algorithm Selection 

In [395]:
@dsl.component(
    base_image="python:3.10.7-slim"
)
def compare_models(lr_metrics: dict, rf_metrics: dict, gr_metrics: dict) -> str:
    import logging
    import json
    import sys
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    logging.info(lr_metrics)
    logging.info(rf_metrics)
    logging.info(gr_metrics)
    if lr_metrics.get("R^2") > rf_metrics.get("R^2") and lr_metrics.get("R^2") > gr_metrics.get("R^2"):
        return "LR"
    elif rf_metrics.get("R^2") > gr_metrics.get("R^2"):
        return "RF"
    else:
        return "GR"

### Upload Model and Metrics to Google Bucket 

In [396]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_model_to_gcs(project_id: str, model_repo: str, model: Input[Model]):
    '''upload model to gsc'''
    from google.cloud import storage   
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)    
  
    # upload the model to GCS
    client = storage.Client(project=project_id)
    bucket = client.bucket(model_repo)
    blob = bucket.blob(str(model.metadata["algo"]) + '_model') 
    blob.upload_from_filename(model.path)       
    
    logging.info("Saved the model to GCP bucket : " + model_repo)

#### Define the Pipeline

In [397]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="insurance-prdictor-training-pipeline")
def pipeline(project_id: str, data_bucket: str, dataset_filename: str, model_repo: str):
    
    
    di_op = download_data(
        project_id=project_id,
        bucket=data_bucket,
        file_name=dataset_filename
    )

    train_test_split_op = train_test_split(
        dataset = di_op.outputs["dataset"]
    )
     
    training_lr_job_run_op = train_lr(
        X_train = train_test_split_op.outputs["X_train"], 
        Y_train = train_test_split_op.outputs["Y_train"], 
        X_test = train_test_split_op.outputs["X_test"], 
        Y_test = train_test_split_op.outputs["Y_test"]  
    )
    
    training_rf_job_run_op = train_rf(
        X_train = train_test_split_op.outputs["X_train"], 
        Y_train = train_test_split_op.outputs["Y_train"], 
        X_test = train_test_split_op.outputs["X_test"], 
        Y_test = train_test_split_op.outputs["Y_test"]  
    )
    
    training_gr_job_run_op = train_gr(
        X_train = train_test_split_op.outputs["X_train"], 
        Y_train = train_test_split_op.outputs["Y_train"], 
        X_test = train_test_split_op.outputs["X_test"], 
        Y_test = train_test_split_op.outputs["Y_test"]  
    )
        
    comp_model__op = compare_models(
        lr_metrics=training_lr_job_run_op.outputs["metrics"],
        rf_metrics=training_rf_job_run_op.outputs["metrics"],
        gr_metrics=training_gr_job_run_op.outputs["metrics"]
    ).after(training_lr_job_run_op, training_rf_job_run_op, training_gr_job_run_op)  
    
    # defining the branching condition
    with dsl.If(comp_model__op.output=="LR"):
        upload_model_lr_to_gc_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_lr_job_run_op.outputs['out_model']
        ).after(training_lr_job_run_op)
        
    with dsl.If(comp_model__op.output=="RF"):
        upload_model_rf_to_gc_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_rf_job_run_op.outputs['out_model']
        ).after(training_rf_job_run_op)
        
    with dsl.If(comp_model__op.output=="GR"):
        upload_model_gr_to_gc_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_gr_job_run_op.outputs['out_model']
        ).after(training_gr_job_run_op)

#### Compile the pipeline into a JSON file

In [398]:
from kfp import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='insurance_predictor_training_pipeline.yaml')

#### Submit the pipeline run

In [399]:
import google.cloud.aiplatform as aip

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=PROJECT_ID,
    location=REGION,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="insurance-predictor",
    enable_caching=False,
    template_path="insurance_predictor_training_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        'project_id': PROJECT_ID, # makesure to use your project id 
        'data_bucket': 'your-data_de2023-bucket',  # makesure to use your data bucket name 
        'dataset_filename': 'train_data.json',     # makesure to upload these to your data bucket from DE2022/lab4/data
        'model_repo':'your-models_de2023-bucket' # makesure to use your model bucket name 
    }
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/511734983126/locations/us-central1/pipelineJobs/insurance-prdictor-training-pipeline-20231019130316
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/511734983126/locations/us-central1/pipelineJobs/insurance-prdictor-training-pipeline-20231019130316')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/insurance-prdictor-training-pipeline-20231019130316?project=511734983126
PipelineJob projects/511734983126/locations/us-central1/pipelineJobs/insurance-prdictor-training-pipeline-20231019130316 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/511734983126/locations/us-central1/pipelineJobs/insurance-prdictor-training-pipeline-20231019130316 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/511734983126/locations/us-central1/pipelineJobs/insurance-prdictor-training-pipeline-202310191303