In [1]:
import os

# The Vertex AI Workbench Notebook product has specific requirements
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME") and not os.getenv("VIRTUAL_ENV")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists(
    "/opt/deeplearning/metadata/env_version"
)

# Vertex AI Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_WORKBENCH_NOTEBOOK:
    USER_FLAG = "--user"

! pip3 install --upgrade google-cloud-aiplatform {USER_FLAG} -q
! pip3 install -U google-cloud-storage {USER_FLAG} -q
! pip3 install {USER_FLAG} kfp google-cloud-pipeline-components --upgrade -q

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
kfp 1.8.14 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.5.0 which is incompatible.
google-cloud-pipeline-components 1.0.25 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.5.0 which is incompatible.[0m[31m
[0m

## Restart the kernel

In [2]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [57]:
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import component
from kfp.v2.dsl import (
    Input,
    Output,
    Artifact,
    Dataset,
)

### Pipeline cloud parameters

In [58]:
#The Google Cloud project that this pipeline runs in.
project_id = "deassignement1"
# The region that this pipeline runs in
region = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
pipeline_root_path = "gs://temp-storage-group1/"

# Create Pipeline Components

#### Data Ingestion

In [59]:
from typing import Dict

def download_data(project_id: str, bucket: str, file_name: str) -> Dict:
    '''download data'''
    from google.cloud import storage
    import pandas as pd
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(bucket)
    blob = bucket.blob(file_name)
    local_path = '/tmp/'+ file_name
    blob.download_to_filename(local_path)
    logging.info('Downloaded Data!')
    
    # Convert the data to a dictiory object    
    dict_from_csv = pd.read_csv(local_path, index_col="Id", squeeze=True).to_dict()
    logging.info('Returning Data as Dictionary Object!')
    return dict_from_csv

In [60]:
# create a KFP component for data ingestion
data_ingestion_comp = kfp.components.create_component_from_func(
    download_data, output_component_file='data_ingestion.yaml', packages_to_install=['google-cloud-storage', 'pandas'])

#### Train Test Split

In [61]:
from typing import Dict, NamedTuple

def make_train_test_split(input_data: Dict) -> NamedTuple(
  'Outputs',
  [
    ('train_data', Dict),
    ('test_data', Dict),
  ]):
    import pandas as pd
    from sklearn.model_selection import train_test_split
    import logging
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    data = pd.DataFrame.from_dict(input_data) 
    
    train_data, test_data = train_test_split(data, test_size=0.2, random_state=42)
    
    logging.info("make train test split")
    
    from collections import namedtuple
    example_output = namedtuple(
      'Outputs',
      ['train_data', 'test_data'])
    return example_output(train_data.to_dict(), test_data.to_dict())

In [62]:
# create a KFP component for data ingestion
train_test_split_comp = kfp.components.create_component_from_func(
    make_train_test_split, output_component_file='train_test_split.yaml', packages_to_install=['scikit-learn', 'pandas'])

#### Model Training

In [78]:
from typing import Tuple, Dict
def train_decision_tree (features: Dict, project_id: str, model_repo: str) -> Dict:
    '''train a Decision Tree with default parameters'''
    import pandas as pd
    from google.cloud import storage
    from sklearn.tree import DecisionTreeClassifier
    import json
    import logging 
    import sys
    import os
    import joblib
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df = pd.DataFrame.from_dict(features)  
    
    logging.info(df.columns)
        
    # split into input (X) and output (Y) variables
    X = df.loc[:, ['SepalLengthCm','SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']].values
    y = df.loc[:, ['Species']].values
    
    # define model
    decision_tree_classifier = DecisionTreeClassifier(criterion = 'gini')
    decision_tree_classifier.fit(X, y)

    # evaluate the model
    score = decision_tree_classifier.score(X, y)
    logging.info("accuracy: " + str(score))
    metrics = {
        "accuracy": score,
    }
   
    # Save the model localy
    local_file = '/tmp/DecisionTree.pkl'
    joblib.dump(decision_tree_classifier, local_file)
     # write out output
  
    # Save to GCS as model.h5
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('DecisionTree.pkl')
    # Upload the locally saved model
    blob.upload_from_filename(local_file)

    print("Saved the model to GCP bucket : " + model_repo)
    return metrics

In [79]:
train_decision_tree_com = kfp.components.create_component_from_func(
    train_decision_tree, output_component_file='training_decision_tree.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'joblib', 'scikit-learn'])

In [80]:
from typing import Tuple, Dict
def train_random_forest (features: Dict, project_id: str, model_repo: str) -> Dict:
    '''train a Random Forest with default parameters'''
    import pandas as pd
    from google.cloud import storage
    from sklearn.ensemble import RandomForestClassifier
    import json
    import logging 
    import sys
    import os
    import joblib
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df = pd.DataFrame.from_dict(features)  
    
    logging.info(df.columns)
        
    # split into input (X) and output (Y) variables
    X = df.loc[:, ['SepalLengthCm','SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']].values
    y = df.loc[:, ['Species']].values
    
    # define model
    random_forest_classifier = RandomForestClassifier()
    random_forest_classifier.fit(X, y)

    # evaluate the model
    score = random_forest_classifier.score(X, y)
    logging.info("accuracy: " + str(score))
    metrics = {
        "accuracy": score,
    }
   
    # Save the model localy
    local_file = '/tmp/RandomForest.pkl'
    joblib.dump(random_forest_classifier, local_file)
     # write out output
  
    # Save to GCS as model.h5
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('RandomForest.pkl')
    # Upload the locally saved model
    blob.upload_from_filename(local_file)

    print("Saved the model to GCP bucket : " + model_repo)
    return metrics

In [81]:
train_random_forest_com = kfp.components.create_component_from_func(
    train_random_forest, output_component_file='training_random_forest.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'joblib', 'scikit-learn'])

#### Compare models

In [82]:
from typing import NamedTuple, Dict

def compare_model(dt_metric: Dict, rf_metric: Dict) -> str:
    import logging
    import json
    import sys
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    logging.info(dt_metric)
    logging.info(rf_metric)
    if dt_metrics.get("accuracy") > rf_metrics.get("accuracy"):
        return "DT"
    else :
        return "RF"

In [83]:
# create a KFP component for selecting between MLP and LR
compare_model_com = kfp.components.create_component_from_func(
    compare_model, output_component_file='model_selecion_com.yaml')

#### Prediction 

In [84]:
from typing import Tuple, Dict

def predict_decision_tree(project_id: str, model_repo: str, features: Dict) -> Dict:
    import pandas as pd
    import joblib
    from google.cloud import storage
    from sklearn.metrics import accuracy_score
    import json
    import logging
    import sys
    import os
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df = pd.DataFrame.from_dict(features)    
    
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('DecisionTree.pkl')
    filename = '/tmp/DecisionTree.pkl'
    blob.download_to_filename(filename)
        
    #Loading the saved model with joblib
    model = joblib.load(filename)

    xNew = df[['SepalLengthCm','SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']]

    dfcp = df.copy()   
    y_classes = model.predict(xNew)
    logging.info(y_classes)
    
    accuracy = accuracy_score(df['Species'], y_classes)
    logging.info("accuracy: " + str(accuracy))
    metrics = {
        "accuracy": accuracy,
    }

    return metrics

In [85]:
# create a KFP component for prediction LR 
prediction_decision_tree_com = kfp.components.create_component_from_func(
    predict_decision_tree, output_component_file='prediction_decision_tree.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'joblib', 'scikit-learn'])

In [86]:
from typing import Tuple, Dict

def predict_random_forest(project_id: str, model_repo: str, features: Dict) -> Dict:
    import pandas as pd
    from sklearn.metrics import accuracy_score
    import joblib
    from google.cloud import storage
    import json
    import logging
    import sys
    import os
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df = pd.DataFrame.from_dict(features)    
    
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('RandomForest.pkl')
    filename = '/tmp/RandomForest.pkl'
    blob.download_to_filename(filename)
        
    #Loading the saved model with joblib
    model = joblib.load(filename)

    xNew = df[['SepalLengthCm','SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']]

    dfcp = df.copy()   
    y_classes = model.predict(xNew)
    logging.info(y_classes)
    
    accuracy = accuracy_score(df['Species'], y_classes)
    
    logging.info("accuracy: " + str(accuracy))
    metrics = {
        "accuracy": accuracy,
    }

    return metrics

In [87]:
# create a KFP component for prediction LR 
prediction_random_forest_com = kfp.components.create_component_from_func(
    predict_random_forest, output_component_file='prediction_random_forest.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'joblib', 'scikit-learn'])

In [88]:
from typing import Dict

def save_best_model(project_id: str, model_repo: str, best_model: str) -> None:
    from google.cloud import storage
    import sys
    import os
    
    if best_model == "RF":
        
        client = storage.Client(project=project_id)
        bucket = client.get_bucket(model_repo)
        blob = bucket.blob('RandomForest.pkl')
        filename = '/tmp/RandomForest.pkl'
        blob.download_to_filename(filename)
        
        # Save to GCS
        client = storage.Client(project=project_id)
        bucket = client.get_bucket(model_repo)
        blob = bucket.blob('BestModel.pkl')
        blob.upload_from_filename(filename)
        
    if best_model == "DT":
        client = storage.Client(project=project_id)
        bucket = client.get_bucket(model_repo)
        blob = bucket.blob('DecisionTree.pkl')
        filename = '/tmp/DecisionTree.pkl'
        blob.download_to_filename(filename)
        
        # Save to GCS
        client = storage.Client(project=project_id)
        bucket = client.get_bucket(model_repo)
        blob = bucket.blob('BestModel.pkl')
        blob.upload_from_filename(filename)

In [89]:
save_best_model_com = kfp.components.create_component_from_func(
    save_best_model, output_component_file='save_best_model.yaml', packages_to_install=['google-cloud-storage'])

#### Define The Pipeline

In [90]:
@kfp.dsl.pipeline(
    name="iris-predictor-pipeline",
    pipeline_root=pipeline_root_path)
def pipeline(project_id: str, data_bucket: str, trainset_filename: str, model_repo: str):
    
    
    di_op = data_ingestion_comp(
        project_id=project_id,
        bucket=data_bucket,
        file_name=trainset_filename
    )

    train_test_split = train_test_split_comp(
        input_data = di_op.output
    ).after(di_op)

    training_dt_job_run_op = train_decision_tree_com(
        project_id=project_id,
        model_repo=model_repo,       
        features=train_test_split.outputs['train_data']
    )
    
    training_rf_job_run_op = train_random_forest_com(
        project_id=project_id,
        model_repo=model_repo,       
        features=train_test_split.outputs['train_data']
    )
    
    predict_dt_op = prediction_decision_tree_com(
        project_id=project_id,
        model_repo=model_repo,       
        features=train_test_split.outputs['test_data']  
    ).after(training_dt_job_run_op)
    
    predict_rf_job_run_op = prediction_random_forest_com(
        project_id=project_id,
        model_repo=model_repo,       
        features=train_test_split.outputs['test_data']
    ).after(training_rf_job_run_op)
    
    compare_model_op = compare_model_com(
        dt_metric=predict_dt_op.output,
        rf_metric=predict_rf_job_run_op.output
    )
    
    save_best_model_op = save_best_model_com(
        project_id=project_id,
        model_repo=model_repo,
        best_model=compare_model_op.output
    )
    

In [91]:
from kfp.v2 import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='iris_predictor_training_pipeline.json')

##### Run pipeline

In [92]:
import google.cloud.aiplatform as aip

job = aip.PipelineJob(
    display_name="iris-predictor",
    enable_caching=False,
    template_path="iris_predictor_training_pipeline.json",
    pipeline_root=pipeline_root_path,
    parameter_values={
        'project_id': project_id, # makesure to use your project id 
        'data_bucket': 'data-group1',  # makesure to use your data bucket name 
        'trainset_filename': 'Iris.csv',     # makesure to upload these to your data bucket from DE2022/lab4/data
     #   'testset_filename': 'prediction_set.csv',    # makesure to upload these to your data bucket from DE2022/lab4/data
        'model_repo':'model-repository-group1' # makesure to use your model bucket name 
    }
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/406096928318/locations/us-central1/pipelineJobs/iris-predictor-pipeline-20221024143047
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/406096928318/locations/us-central1/pipelineJobs/iris-predictor-pipeline-20221024143047')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/iris-predictor-pipeline-20221024143047?project=406096928318
PipelineJob projects/406096928318/locations/us-central1/pipelineJobs/iris-predictor-pipeline-20221024143047 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/406096928318/locations/us-central1/pipelineJobs/iris-predictor-pipeline-20221024143047 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/406096928318/locations/us-central1/pipelineJobs/iris-predictor-pipeline-20221024143047 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/40

RuntimeError: Job failed with:
code: 9
message: "The DAG failed because some tasks failed. The failed tasks are: [compare-model].; Job (project_id = deassignement1, job_id = 2688025554447237120) is failed due to the above error.; Failed to handle the job: {project_number = 406096928318, job_id = 2688025554447237120}"
