### Installation
Install the packages required for executing this notebook.

In [1]:
import os

# The Vertex AI Workbench Notebook product has specific requirements
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME") and not os.getenv("VIRTUAL_ENV")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists(
    "/opt/deeplearning/metadata/env_version"
)

# Vertex AI Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_WORKBENCH_NOTEBOOK:
    USER_FLAG = "--user"

! pip3 install --upgrade google-cloud-aiplatform {USER_FLAG} -q
! pip3 install -U google-cloud-storage {USER_FLAG} -q
! pip3 install {USER_FLAG} kfp google-cloud-pipeline-components --upgrade -q

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
kfp 1.8.14 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.5.0 which is incompatible.
google-cloud-pipeline-components 1.0.25 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.5.0 which is incompatible.[0m[31m
[0m

## Restart the kernel
Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [2]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Check the versions of the packages you installed. The KFP SDK version should be >=1.6.

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 1.8.14


In [2]:
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import component
from kfp.v2.dsl import (
    Input,
    Output,
    Artifact,
    Dataset,
)

#### Project and Pipeline Configurations

In [3]:
#The Google Cloud project that this pipeline runs in.
project_id = "de-project-363307"
# The region that this pipeline runs in
region = "us-west1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
pipeline_root_path = "gs://bodyfat-temp"

#### Create Pipeline Components

We can create a component from Python functions (inline) and from a container. We will first try inline python functions. 

Step 1: Define the python function

Step 2:  Use **kfp.components.create_component_from_func** build the component. This function takes four parameters.

**1.func**: The Python function to convert.

**2.base_image**: (Optional.) Specify the Docker container image to run this function in. 

**3.output_component_file**: (Optional.) Writes your component definition to a file. 

**4.packages_to_install**: (Optional.) A list of versioned Python packages to install before running your function.

Another thing we need to consider is passing parameters between components. We can pass simple parameters such as integer, string, tuple, dict, and list by values. To pass the large datasets or complex configurations, we can use files. We can annotate the Python function’s parameters to indicate input or output files for the component. 

Refer to  https://www.kubeflow.org/docs/components/pipelines/sdk/python-function-components/ for more information.

#### Pipeline Component : Data Ingestion

In [4]:
from typing import Dict

def download_data(project_id: str, bucket: str, file_name: str) -> Dict:
    '''download data'''
    from google.cloud import storage
    import pandas as pd
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Downloaing the file from a google bucket 
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(bucket)
    blob = bucket.blob(file_name)
    local_path = '/tmp/'+ file_name
    blob.download_to_filename(local_path)
    logging.info('Downloaded Data!')
    
    # Convert the data to a dictiory object    
    dict_from_csv = pd.read_csv(local_path, index_col=None, squeeze=True).to_dict()
    logging.info('Returning Data as Dictionary Object!')
    return dict_from_csv

In [5]:
# create a KFP component for data ingestion
data_ingestion_comp = kfp.components.create_component_from_func(
    download_data, output_component_file='data_ingestion.yaml', packages_to_install=['google-cloud-storage', 'pandas'])

#### Pipeline Component : Training Linear Regression

In [117]:
from typing import NamedTuple, Dict
def train_lr (features: Dict, project_id: str, model_repo: str) -> Dict:
    '''train a Linear Regression with default parameters'''
    import pandas as pd
    import numpy as np
    from sklearn.linear_model import LinearRegression
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import PowerTransformer
    from sklearn.metrics import r2_score
    from sklearn.metrics import mean_squared_error
    from google.cloud import storage
    import pickle
    import json
    import logging 
    import sys
    import os
    
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df = pd.DataFrame.from_dict(features)  
    
    logging.info(df.columns)
        
    X = df.drop(['BodyFat', 'Density'], axis=1)
    y = df['Density']

    X['Bmi'] = 703 * X['Weight'] / (X['Height'] * X['Height'])
    X['ACratio'] = X['Abdomen'] / X['Chest']
    X['HTratio'] = X['Hip'] / X['Thigh']
    X.drop(['Weight', 'Height', 'Abdomen', 'Chest', 'Hip', 'Thigh'], axis=1, inplace=True)

    # Splitting the data for the model
    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

    #Transformer
    trans = PowerTransformer()
    X_train = trans.fit_transform(X_train)
    X_test = trans.transform(X_test)

    # define and fit model
    model = LinearRegression()
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    # evaluate the model
    r2 = r2_score(y_test, y_pred)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    lr_metrics = {
        "R2": r2,
    #    "RMSE": rmse,
    }
    logging.info(lr_metrics)
    
    # Save the model localy
    local_file = '/tmp/local_model.pkl'
    pickle.dump(model, open(local_file, 'wb'))
    
    #Save the transformer localy
    local_file_trans = '/tmp/transformer.pkl'
    pickle.dump(trans, open(local_file_trans, 'wb'))
     # write out output
  
    # Save to GCS as model.pkl
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('model.pkl')
    blob_t = bucket.blob('transformer.pkl')
    # Upload the locally saved model
    blob.upload_from_filename(local_file)
    blob_t.upload_from_filename(local_file_trans)

    print("Saved the model to GCP bucket : " + model_repo)
    return lr_metrics

In [118]:
# create a KFP component for training 
train_lr_com = kfp.components.create_component_from_func(
    train_lr, output_component_file='training_lr.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'sklearn', 'np'])

#### Pipeline Component: Train Ridge regression

In [119]:
from typing import NamedTuple, Dict
def train_ridge (features: Dict, project_id: str, model_repo: str) -> Dict:
    '''train a Ridge regression with default parameters'''
    import pandas as pd
    import numpy as np
    from sklearn.linear_model import Ridge
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import PowerTransformer
    from sklearn.metrics import r2_score
    from sklearn.metrics import mean_squared_error
    from google.cloud import storage
    import pickle
    import json
    import logging 
    import sys
    import os
    
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df = pd.DataFrame.from_dict(features)  
    
    logging.info(df.columns)
        
    X = df.drop(['BodyFat', 'Density'], axis=1)
    y = df['Density']

    X['Bmi'] = 703 * X['Weight'] / (X['Height'] * X['Height'])
    X['ACratio'] = X['Abdomen'] / X['Chest']
    X['HTratio'] = X['Hip'] / X['Thigh']
    X.drop(['Weight', 'Height', 'Abdomen', 'Chest', 'Hip', 'Thigh'], axis=1, inplace=True)

    # Splitting the data for the model
    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

    #Transformer
    trans = PowerTransformer()
    X_train = trans.fit_transform(X_train)
    X_test = trans.transform(X_test)

    # define and fit model
    model = Ridge()
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    # evaluate the model
    r2 = r2_score(y_test, y_pred)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    ridge_metrics = {
        "R2": r2,
    #    "RMSE": rmse,
    }
    logging.info(ridge_metrics)
    
    # Save the model localy
    local_file = '/tmp/local_model_ridge.pkl'
    pickle.dump(model, open(local_file, 'wb'))
    
    #Save the transformer localy
    local_file_trans = '/tmp/transformer.pkl'
    pickle.dump(trans, open(local_file_trans, 'wb'))
     # write out output
  
    # Save to GCS as model.pkl
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('model_ridge.pkl')
    blob_t = bucket.blob('transformer.pkl')
    # Upload the locally saved model
    blob.upload_from_filename(local_file)
    blob_t.upload_from_filename(local_file_trans)

    print("Saved the model to GCP bucket : " + model_repo)
    return ridge_metrics

In [120]:
# create a KFP component for training 
train_ridge_com = kfp.components.create_component_from_func(
    train_ridge, output_component_file='train_ridge.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'sklearn', 'np'])

#### Pipeline Component : Prediction-LR

In [121]:
def predict_lr(project_id: str, model_repo: str, features: Dict) -> Dict:
    import pandas as pd
    from google.cloud import storage
    from sklearn.preprocessing import PowerTransformer
    import pickle 
    import json
    import logging
    import sys
    import os
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df = pd.DataFrame.from_dict(features)    
    
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('model.pkl')
    filename = '/tmp/local_model.pkl'
    blob.download_to_filename(filename)
    blob_t = bucket.blob('transformer.pkl')
    filename_t = '/tmp/transformer.pkl'
    blob_t.download_to_filename(filename_t)
        
    #Loading the saved model with joblib
    model = pickle.load(open(filename, 'rb'))
    transformer = pickle.load(open(filename_t, 'rb'))
    
    X = df.drop(['BodyFat', 'Density'], axis=1)

    X['Bmi'] = 703 * X['Weight'] / (X['Height'] * X['Height'])
    X['ACratio'] = X['Abdomen'] / X['Chest']
    X['HTratio'] = X['Hip'] / X['Thigh']
    X.drop(['Weight', 'Height', 'Abdomen', 'Chest', 'Hip', 'Thigh'], axis=1, inplace=True)

    #Transformer
    X = transformer.transform(X)    
    
    dfcp = df.copy()   
    y_classes = model.predict(X)
    logging.info(y_classes)
    dfcp['pclass'] = y_classes.tolist()
    dic = dfcp.to_dict(orient='records') 
    return dic

In [122]:
# create a KFP component for prediction LR 
prediction_lr_com = kfp.components.create_component_from_func(
    predict_lr, output_component_file='prediction_lr_com.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'sklearn'])

#### Pipeline Component: Prediciton Ridge 

In [123]:
def predict_ridge(project_id: str, model_repo: str, features: Dict) -> Dict:
    import pandas as pd
    from google.cloud import storage
    from sklearn.preprocessing import PowerTransformer
    import pickle 
    import json
    import logging
    import sys
    import os
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df = pd.DataFrame.from_dict(features)    
    
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('model_ridge.pkl')
    filename = '/tmp/local_model_ridge.pkl'
    blob.download_to_filename(filename)
    blob_t = bucket.blob('transformer.pkl')
    filename_t = '/tmp/transformer.pkl'
    blob_t.download_to_filename(filename_t)
        
    #Loading the saved model with joblib
    model = pickle.load(open(filename, 'rb'))
    transformer = pickle.load(open(filename_t, 'rb'))
    
    X = df.drop(['BodyFat', 'Density'], axis=1)

    X['Bmi'] = 703 * X['Weight'] / (X['Height'] * X['Height'])
    X['ACratio'] = X['Abdomen'] / X['Chest']
    X['HTratio'] = X['Hip'] / X['Thigh']
    X.drop(['Weight', 'Height', 'Abdomen', 'Chest', 'Hip', 'Thigh'], axis=1, inplace=True)

    #Transformer
    X = transformer.transform(X)    
    
    dfcp = df.copy()   
    y_classes = model.predict(X)
    logging.info(y_classes)
    dfcp['pclass'] = y_classes.tolist()
    dic = dfcp.to_dict(orient='records') 
    return dic

In [124]:
# create a KFP component for prediction Ridge
prediction_ridge_com = kfp.components.create_component_from_func(
    predict_ridge, output_component_file='prediction_ridge_com.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'sklearn'])

#### Pipeline Component: Algorithm Selection

In [125]:
def compare_model(lr_metrics: Dict, ridge_metrics: Dict) -> str:
    import logging
    import json
    import sys
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    logging.info(lr_metrics)
    logging.info(ridge_metrics)
    if lr_metrics.get("R2") > ridge_metrics.get("R2"):
        return "LR"
    else :
        return "RIDGE"

In [126]:
#create a KFP component for selecting between LR and Ridge
compare_model_com = kfp.components.create_component_from_func(
    compare_model, output_component_file='algo_selecion_com.yaml')

#### Define the Pipeline

In [127]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="bodyfat-predictor-training-pipeline",
    pipeline_root=pipeline_root_path)
def pipeline(project_id: str, data_bucket: str, trainset_filename: str, model_repo: str, testset_filename: str, ):
    
    
    di_op = data_ingestion_comp(
        project_id=project_id,
        bucket=data_bucket,
        file_name=trainset_filename
    )
    
     
    training_lr_job_run_op = train_lr_com(
        project_id=project_id,
        model_repo=model_repo,       
        features=di_op.output
    )
    
    training_ridge_job_run_op = train_ridge_com(
        project_id=project_id,
        model_repo=model_repo,       
        features=di_op.output
    )
    
    pre_di_op = data_ingestion_comp(
        project_id=project_id,
        bucket=data_bucket,
        file_name=testset_filename
    ).after(training_lr_job_run_op, training_ridge_job_run_op)
        
        
    comp_model__op = compare_model_com(training_lr_job_run_op.output,
                                       training_ridge_job_run_op.output).after(training_lr_job_run_op, training_ridge_job_run_op)  
    
    # defining the branching condition
    with dsl.Condition(comp_model__op.output=="LR"):
        predict_lr_job_run_op = prediction_lr_com(
        project_id=project_id,
        model_repo=model_repo,       
        features=pre_di_op.output
        )
    with dsl.Condition(comp_model__op.output=="RIDGE"):
        predict_ridge_job_run_op = prediction_ridge_com(
        project_id=project_id,
        model_repo=model_repo,       
        features=pre_di_op.output
       )

#### Compile the pipeline into a JSON file

In [128]:
from kfp.v2 import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='bodyfat_predictor_training_pipeline.json')

#### Submit the pipeline run

In [129]:
import google.cloud.aiplatform as aip

job = aip.PipelineJob(
    display_name="bodyfat-predictor",
    enable_caching=False,
    template_path="bodyfat_predictor_training_pipeline.json",
    pipeline_root=pipeline_root_path,
    parameter_values={
        'project_id': project_id , # makesure to use your project id 
        'data_bucket': 'bodyfat-data',  # makesure to use your data bucket name 
        'trainset_filename': 'training_set.csv',     # makesure to upload these to your data bucket from DE2022/lab4/data
        'testset_filename': 'prediction_set.csv',    # makesure to upload these to your data bucket from DE2022/lab4/data
        'model_repo':'bodyfat-model' # makesure to use your model bucket name 
    }
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/273668530892/locations/us-central1/pipelineJobs/bodyfat-predictor-training-pipeline-20221025103621
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/273668530892/locations/us-central1/pipelineJobs/bodyfat-predictor-training-pipeline-20221025103621')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/bodyfat-predictor-training-pipeline-20221025103621?project=273668530892
PipelineJob projects/273668530892/locations/us-central1/pipelineJobs/bodyfat-predictor-training-pipeline-20221025103621 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/273668530892/locations/us-central1/pipelineJobs/bodyfat-predictor-training-pipeline-20221025103621 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/273668530892/locations/us-central1/pipelineJobs/bodyfat-predictor-training-pipeline-20221025103621 cur