In [None]:
import os

# The Vertex AI Workbench Notebook product has specific requirements
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME") and not os.getenv("VIRTUAL_ENV")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists(
    "/opt/deeplearning/metadata/env_version"
)

# Vertex AI Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_WORKBENCH_NOTEBOOK:
    USER_FLAG = "--user"

! pip3 install --upgrade google-cloud-aiplatform {USER_FLAG} -q
! pip3 install -U google-cloud-storage {USER_FLAG} -q
! pip3 install {USER_FLAG} kfp google-cloud-pipeline-components --upgrade -q

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [58]:
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import component
from kfp.v2.dsl import (
    Input,
    Output,
    Artifact,
    Dataset,
)

In [59]:
#The Google Cloud project that this pipeline runs in.
project_id = "de2022-362608"
# The region that this pipeline runs in
region = "us-west1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
pipeline_root_path = "gs://de_jads_temp_2109377"

In [60]:
from typing import Dict

def download_data(project_id: str, bucket: str, file_name: str) -> Dict:
    '''download data'''
    from google.cloud import storage
    import pandas as pd
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Downloaing the file from a google bucket 
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(bucket)
    blob = bucket.blob(file_name)
    local_path = '/tmp/'+ file_name
    blob.download_to_filename(local_path)
    logging.info('Downloaded Data!')
    
    # Convert the data to a dictiory object    
    dict_from_csv = pd.read_csv(local_path, index_col=None, squeeze=True).to_dict()
    logging.info('Returning Data as Dictionary Object!')
    return dict_from_csv

In [61]:
# create a KFP component for data ingestion
data_ingestion_comp = kfp.components.create_component_from_func(
    download_data, output_component_file='data_ingestion.yaml', packages_to_install=['google-cloud-storage', 'pandas'])

In [62]:
from typing import NamedTuple, Dict
def train_svm (features: Dict, project_id: str, model_repo: str) -> Dict:
    import pandas as pd
    from google.cloud import storage
    import json
    import logging 
    import sys
    import os
    import numpy
    import pickle
    from sklearn.svm import SVC
    from sklearn.metrics import confusion_matrix
    from sklearn.model_selection import cross_val_score

    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df = pd.DataFrame.from_dict(features)  
    
    logging.info(df.columns)
        
    # split into input (X) and output (Y) variables
    X = df.loc[:, ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']].values
    Y = df.loc[:, ['species']].values 
    Y = numpy.ravel(Y)
    
    #define model
    model = SVC(gamma='auto')
    model.fit(X, Y)
           
    # evaluate the model
    #cm = confusion_matrix(y_test, y_pred)
    #print(cm)
    accuracies = cross_val_score(estimator = model, X = X, y = Y, cv = 10)
    #print("Accuracy: {:.2f} %".format(accuracies.mean()*100))
    #print("Standard Deviation: {:.2f} %".format(accuracies.std()*100))
    
    # Save the model localy
    local_file = '/tmp/local_model.sav'
    pickle.dump(model, open(local_file, 'wb'))

    # Save to GCS as model.h5
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('model.sav')
    # Upload the locally saved model
    blob.upload_from_filename(local_file)

    print("Saved the model to GCP bucket : " + model_repo)
    return ("Accuracy: {:.2f} %".format(accuracies.mean()*100))

In [63]:
# create a KFP component for training 
train_svm_com = kfp.components.create_component_from_func(
    train_svm, output_component_file='training_svm.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'keras', 'tensorflow', 'h5py', 'scikit-learn', 'numpy'])

In [64]:
def predict(project_id: str, model_repo: str, features: Dict) -> Dict:
    import pandas as pd
    from keras.models import load_model
    from google.cloud import storage
    import json
    import logging
    import sys
    import os
    import pickle
    from sklearn.metrics import confusion_matrix, accuracy_score 
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df = pd.DataFrame.from_dict(features)
    
    xNew = df.loc[:, ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']].values
    ynew = df.loc[:, ['species']].values 
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('model.sav')
    blob.download_to_filename('/tmp/local_model.sav')
    model = pickle.load(open('/tmp/local_model.sav', 'rb'))
    dfcp = df.copy()
    result = model.predict(xNew)   
    dfcp['class'] = result.tolist()
    dic = dfcp.to_dict(orient='records')
    
    accuracy = accuracy_score(ynew, result)
    
    return ("Accuracy: {:.2f} %".format(accuracy))

In [65]:
# create a KFP component for prediction 
prediction_svm_com = kfp.components.create_component_from_func(
    predict, output_component_file='prediction_svm_com.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'keras', 'tensorflow', 'h5py', 'scikit-learn', 'numpy'])

In [66]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="petal-predictor-training-pipeline",
    pipeline_root=pipeline_root_path)
def pipeline(project_id: str, data_bucket: str, trainset_filename: str, model_repo: str, testset_filename: str, ):
    
    
    di_op = data_ingestion_comp(
        project_id=project_id,
        bucket=data_bucket,
        file_name=trainset_filename
    )

 
    training_svm_job_run_op = train_svm_com(
        project_id=project_id,
        model_repo=model_repo,       
        features=di_op.output
    )
    
    
    pre_di_op = data_ingestion_comp(
        project_id=project_id,
        bucket=data_bucket,
        file_name=testset_filename
    ).after(training_svm_job_run_op)
    

    predict_svm_job_run_op = prediction_svm_com(
        project_id=project_id,
        model_repo=model_repo,       
        features=pre_di_op.output
    )

In [67]:
from kfp.v2 import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='petal_predictor_training_pipeline.json')

In [68]:
import google.cloud.aiplatform as aip

job = aip.PipelineJob(
    display_name="petal-predictor",
    enable_caching=False,
    template_path="petal_predictor_training_pipeline.json",
    pipeline_root=pipeline_root_path,
    parameter_values={
        'project_id': project_id, # makesure to use your project id 
        'data_bucket': 'data_de2022_2109377',  # makesure to use your data bucket name 
        'trainset_filename': 'training_set.csv',     # makesure to upload these to your data bucket from DE2022/lab4/data
        'testset_filename': 'prediction_set.csv',    # makesure to upload these to your data bucket from DE2022/lab4/data
        'model_repo':'model_repo_de2022_2109377' # makesure to use your model bucket name 
    }
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/962669557637/locations/us-central1/pipelineJobs/petal-predictor-training-pipeline-20221024181223
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/962669557637/locations/us-central1/pipelineJobs/petal-predictor-training-pipeline-20221024181223')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/petal-predictor-training-pipeline-20221024181223?project=962669557637
PipelineJob projects/962669557637/locations/us-central1/pipelineJobs/petal-predictor-training-pipeline-20221024181223 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/962669557637/locations/us-central1/pipelineJobs/petal-predictor-training-pipeline-20221024181223 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/962669557637/locations/us-central1/pipelineJobs/petal-predictor-training-pipeline-20221024181223 current state:
