Install and import packages

In [27]:
import os

# The Vertex AI Workbench Notebook product has specific requirementss
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME") and not os.getenv("VIRTUAL_ENV")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists(
    "/opt/deeplearning/metadata/env_version"
)

# Vertex AI Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_WORKBENCH_NOTEBOOK:
    USER_FLAG = "--user"

! pip3 install --upgrade google-cloud-aiplatform {USER_FLAG} -q
! pip3 install -U google-cloud-storage {USER_FLAG} -q
! pip3 install {USER_FLAG} kfp google-cloud-pipeline-components --upgrade -q

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
kfp 1.8.14 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.5.0 which is incompatible.
google-cloud-pipeline-components 1.0.25 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.5.0 which is incompatible.[0m[31m
[0m

In [286]:
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import component
from kfp.v2.dsl import (
    Input,
    Output,
    Artifact,
    Dataset,
)

Project and pipeline configuration

In [287]:
#The Google Cloud project that this pipeline runs in.
project_id = "de-2022-ng"
# The region that this pipeline runs in
region = "us-west1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
pipeline_root_path = "gs://test_data_de2022_ng"

Pipeline Component: Data Ingestion

In [288]:
from typing import Dict

def download_data(project_id: str, bucket: str, file_name: str) -> Dict:
    from google.cloud import storage
    import pandas as pd
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    # Download file from google bucket
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(bucket)
    blob = bucket.blob(file_name)
    local_path = '/tmp/'+ file_name
    blob.download_to_filename(local_path)
    logging.info('Downloaded Data!')

    # Create dataframe from downloaded data
    data_dict = pd.read_csv(local_path, index_col=None, squeeze=True).to_dict()
    logging.info('Built dict')
    return data_dict

In [289]:
# create a KFP component for data ingestion
data_ingestion_comp = kfp.components.create_component_from_func(
    download_data, output_component_file='data_ingestion.yaml', packages_to_install=['google-cloud-storage', 'pandas'])

Pipeline Component: Train RandomForestRegressor

In [290]:
from pandas import DataFrame
from typing import NamedTuple, Dict

def train_rfr(data: Dict, project_id: str, model_repo: str) -> Dict:
    import json
    import logging 
    import sys
    import os
    import joblib
    
    import pandas as pd
    from google.cloud import storage

    from sklearn.ensemble import RandomForestRegressor
    from sklearn.model_selection import train_test_split

    logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)

    data = pd.DataFrame.from_dict(data)  
    
    logging.info('Features:' + str(list(data.columns)))
    
    X = data.drop(['MEDV'], axis=1)
    y = data['MEDV']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, 
                                                        random_state=101)
    
    model = RandomForestRegressor()
    model.fit(X_train, y_train)

    metrics = {
        "accuracy": model.score(X_test, y_test)
    }
    logging.info("RFR accuracy:" + str(metrics['accuracy']))

    # Save model locally
    local_file = '/tmp/local_rfr_model.pkl'
    joblib.dump(model, local_file)

    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('rfr_model.pkl')
    # Upload the locally saved model
    blob.upload_from_filename(local_file)

    print("Saved the model to GCP bucket : " + model_repo)
    return metrics



In [291]:
# create a KFP component for training 
train_rfr_com = kfp.components.create_component_from_func(
    train_rfr, output_component_file='train_rfr_model.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'joblib', 'scikit-learn'])

Pipeline Component: Train LinearRegressor

In [292]:
from typing import Dict

def train_lr (data: Dict, project_id: str, model_repo: str) -> Dict:
    '''train a LinearRegression with default parameters'''
    import json
    import logging 
    import sys
    import os
    import joblib

    import pandas as pd
    from google.cloud import storage

    from sklearn.linear_model import LinearRegression
    from sklearn.model_selection import train_test_split
        
    logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
    
    data = pd.DataFrame.from_dict(data)  
    
    logging.info('Features:' + str(list(data.columns)))
    
    X = data.drop(['MEDV'], axis=1)
    y = data['MEDV']
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, 
                                                        random_state=101)
  
    model = LinearRegression()
    model.fit(X_train, y_train)

    metrics = {
        "accuracy": model.score(X_test, y_test)
    }
    logging.info("LR accuracy:" + str(metrics['accuracy']))
   
    # Save the model localy
    local_file = '/tmp/local_lr_model.pkl'
    joblib.dump(model, local_file)
    # write out output
  
    # Save to GCS as model.h5
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('lr_model.pkl')
   # Upload the locally saved model
    blob.upload_from_filename(local_file)
  
    print("Saved the model to GCP bucket : " + model_repo)
    return metrics

In [293]:
# create a KFP component for training lr model
train_lr_com = kfp.components.create_component_from_func(
    train_lr, output_component_file='train_lr_model.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'joblib', 'scikit-learn'])

Pipeline Component: Prediction RFR

In [294]:
from typing import Dict

def predict_rfr(project_id: str, model_repo: str, data: Dict) -> Dict:
    import pandas as pd
    from google.cloud import storage

    import json
    import logging
    import sys
    import os
    import joblib
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    data = pd.DataFrame.from_dict(data)  
    
    # Extract independent variables
    X = data.drop(['MEDV'], axis=1)
    
    # Download RFR model
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('rfr_model.pkl')
    blob.download_to_filename('/tmp/local_rfr_model.pkl')

    # Load RandomForestRegressor model
    model = joblib.load('/tmp/local_rfr_model.pkl')

    pred = model.predict(X)   

    logging.info(pred)
    return pred

In [295]:
# create a KFP component for prediction LR 
prediction_rfr_comp = kfp.components.create_component_from_func(
    predict_rfr, output_component_file='prediction_rfr_comp.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'joblib', 'scikit-learn'])

Pipeline Component: Prediction LR

In [296]:
from typing import Dict

def predict_lr(project_id: str, model_repo: str, data: Dict) -> Dict:
    import pandas as pd
    from google.cloud import storage
    
    from sklearn.metrics import r2_score, mean_squared_error, median_absolute_error

    import json
    import logging
    import sys
    import os
    import joblib
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    data = pd.DataFrame.from_dict(data)  
    
    # Split independent and dependent variables
    X = data.drop(['MEDV'], axis=1)
    y_true = data['MEDV']

    # Download LR model
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('lr_model.pkl')
    blob.download_to_filename('/tmp/local_lr_model.pkl')

    # Load RandomForestRegressor model
    model = joblib.load('/tmp/local_lr_model.pkl')

    y_pred = model.predict(X)
    
    metrics = {
        "R2": r2_score(y_pred, y_true),
        "MSE": mean_squared_error(y_pred, y_true), 
        "MAE": median_absolute_error(y_pred, y_true)
    }
    
    return metrics


In [297]:
# create a KFP component for prediction LR 
prediction_lr_comp = kfp.components.create_component_from_func(
    predict_lr, output_component_file='prediction_lr_comp.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'joblib', 'scikit-learn'])

#### Pipeline Component: Model Selection

In [306]:
from typing import Dict

def compare_model(rfr_metrics: Dict, lr_metrics: Dict) -> str:
    import logging
    import json
    import sys
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    logging.info(rfr_metrics)
    logging.info(lr_metrics)
    if rfr_metrics["accuracy"] > lr_metrics["accuracy"]:
        return "RFR"
    else :
        return "LR"

In [307]:
# create a KFP component for selecting between RFR and LR
compare_model_comp = kfp.components.create_component_from_func(
    compare_model, output_component_file='model_selection_comp.yaml')

#### Define Pipeline

In [308]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="house-pricing-prediction-pipeline",
    pipeline_root=pipeline_root_path)

def pipeline(project_id: str, data_bucket: str, trainset_filename: str, model_repo: str, testset_filename: str, ):
    
    di_op = data_ingestion_comp(
        project_id=project_id,
        bucket=data_bucket,
        file_name=trainset_filename
    )

 
    training_rfr_job_run_op = train_rfr_com(
        project_id=project_id,
        model_repo=model_repo,       
        data=di_op.output
    )
    
     
    training_lr_job_run_op = train_lr_com(
        project_id=project_id,
        model_repo=model_repo,       
        data=di_op.output
    )
    
    pre_di_op = data_ingestion_comp(
        project_id=project_id,
        bucket=data_bucket,
        file_name=testset_filename
    ).after(training_rfr_job_run_op, training_lr_job_run_op)
        
        
    comp_model_op = compare_model_comp(training_rfr_job_run_op.output,
                                       training_lr_job_run_op.output).after(training_rfr_job_run_op, training_lr_job_run_op)  
    
    # defining the branching condition
    with dsl.Condition(comp_model_op.output=="RFR"):
        predict_rfr_job_run_op = prediction_rfr_comp(
        project_id=project_id,
        model_repo=model_repo,       
        data=pre_di_op.output
        )
    with dsl.Condition(comp_model_op.output=="LR"):
        predict_lr_job_run_op = prediction_lr_comp(
        project_id=project_id,
        model_repo=model_repo,       
        data=pre_di_op.output
       )

#### Compile Pipeline into JSON

In [309]:
from kfp.v2 import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='house_pricing_training_pipeline.json')

In [310]:
import google.cloud.aiplatform as aip

job = aip.PipelineJob(
    display_name="house-pricing",
    enable_caching=False,
    template_path="house_pricing_training_pipeline.json",
    pipeline_root=pipeline_root_path,
    parameter_values={
        'project_id': project_id, # makesure to use your project id 
        'data_bucket': 'test_data_de2022_ng',  # makesure to use your data bucket name 
        'trainset_filename': 'train_set.csv',     # makesure to upload these to your data bucket from DE2022/lab4/data
        'testset_filename': 'test_set.csv',    # makesure to upload these to your data bucket from DE2022/lab4/data
        'model_repo':'model_repo_de2022_ng' # makesure to use your model bucket name 
    }
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/647289875126/locations/us-central1/pipelineJobs/house-pricing-prediction-pipeline-20221025161231
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/647289875126/locations/us-central1/pipelineJobs/house-pricing-prediction-pipeline-20221025161231')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/house-pricing-prediction-pipeline-20221025161231?project=647289875126
PipelineJob projects/647289875126/locations/us-central1/pipelineJobs/house-pricing-prediction-pipeline-20221025161231 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/647289875126/locations/us-central1/pipelineJobs/house-pricing-prediction-pipeline-20221025161231 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/647289875126/locations/us-central1/pipelineJobs/house-pricing-prediction-pipeline-20221025161231 current state:


RuntimeError: Job failed with:
code: 9
message: "The DAG failed because some tasks failed. The failed tasks are: [condition-1].; Job (project_id = de-2022-ng, job_id = 7219315282651643904) is failed due to the above error.; Failed to handle the job: {project_number = 647289875126, job_id = 7219315282651643904}"
