In [1]:
import os

# The Vertex AI Workbench Notebook product has specific requirements
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME") and not os.getenv("VIRTUAL_ENV")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists(
    "/opt/deeplearning/metadata/env_version"
)

# Vertex AI Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_WORKBENCH_NOTEBOOK:
    USER_FLAG = "--user"

! pip3 install --upgrade google-cloud-aiplatform {USER_FLAG} -q
! pip3 install -U google-cloud-storage {USER_FLAG} -q
! pip3 install {USER_FLAG} kfp google-cloud-pipeline-components --upgrade -q
! pip3 install sklearn

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
kfp 1.8.14 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.5.0 which is incompatible.
google-cloud-pipeline-components 1.0.25 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.5.0 which is incompatible.[0m[31m
[0mCollecting sklearn
  Using cached sklearn-0.0-py2.py3-none-any.whl
Installing collected packages: sklearn
Successfully installed sklearn-0.0


In [2]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 1.8.14


In [1]:
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import component
from kfp.v2.dsl import (
    Input,
    Output,
    Artifact,
    Dataset,
)

In [2]:
#The Google Cloud project that this pipeline runs in.
project_id = "de2022-362611"
# The region that this pipeline runs in
region = "us-west1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
pipeline_root_path = "gs://jads_temp_de2022_michimalek"
MODEL_PATH = "model.pkl"

In [3]:
from typing import Dict

def download_data(project_id: str, bucket: str, file_name: str) -> Dict:
    '''download data'''
    from google.cloud import storage
    import pandas as pd
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Downloaing the file from a google bucket 
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(bucket)
    blob = bucket.blob(file_name)
    local_path = '/tmp/'+ file_name
    blob.download_to_filename(local_path)
    logging.info('Downloaded Data!')
    
    # Convert the data to a dictiory object    
    dict_from_csv = pd.read_csv(local_path, index_col=None, squeeze=True).to_dict()
    logging.info('Returning Data as Dictionary Object!')
    return dict_from_csv

In [4]:
# create a KFP component for data ingestion
data_ingestion_comp = kfp.components.create_component_from_func(
    download_data, output_component_file='data_ingestion.yaml', packages_to_install=['google-cloud-storage', 'pandas'])

In [17]:
from typing import NamedTuple, Dict
def train_lr (features: Dict, project_id: str, model_repo: str) -> Dict:
    '''train a LogisticRegression with default parameters'''
    import pandas as pd
    from sklearn.linear_model import LogisticRegression
    from sklearn import metrics
    from sklearn.model_selection import train_test_split
    from google.cloud import storage
    import json
    import logging 
    import sys
    import os
    import joblib
        
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df = pd.DataFrame.from_dict(features)  
    
    logging.info(f"Cols: {df.columns}")        
    
    x_train, x_test, y_train, y_test = train_test_split(df.drop(['target',"Unnamed: 0"],axis=1), 
                                                        df['target'], test_size=0.30, 
                                                        random_state=101)

    model = LogisticRegression(max_iter=1000)
    model.fit(x_train,y_train)

    metrics = {
        "accuracy": model.score(x_test, y_test)
    }
    logging.info(metrics)
   
    # Save the model localy
    local_file = '/tmp/local_model.pkl'
    joblib.dump(model, local_file)
    # write out output
  
    # Save to GCS as model.h5
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('lrmodel.pkl')
   # Upload the locally saved model
    blob.upload_from_filename(local_file)
  
    print("Saved the model to GCP bucket : " + model_repo)
    return metrics

In [18]:
# create a KFP component for training lr model
trail_lr_com = kfp.components.create_component_from_func(
    train_lr, output_component_file='train_lr_model.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'joblib', 'scikit-learn'])

In [19]:
def predict_lr(project_id: str, model_repo: str, features: Dict) -> Dict:
    import pandas as pd
    import joblib
    from google.cloud import storage
    import json
    import logging
    import sys
    import os
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df = pd.DataFrame.from_dict(features)    
    
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('lrmodel.pkl')
    filename = '/tmp/local_model.pkl'
    blob.download_to_filename(filename)
        
    #Loading the saved model with joblib
    model = joblib.load(filename)
    
    logging.info(df.columns)


    xNew = df[["sepal length (cm)","sepal width (cm)","petal length (cm)","petal width (cm)"]]

    dfcp = df.copy()   
    y_classes = model.predict(xNew)
    logging.info(y_classes)
    dfcp['pclass'] = y_classes.tolist()
    dic = dfcp.to_dict(orient='records') 
    return dic

In [20]:
# create a KFP component for prediction LR 
prediction_lr_com = kfp.components.create_component_from_func(
    predict_lr, output_component_file='prediction_lr_com.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'joblib', 'scikit-learn'])

In [21]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="iris-predictor-training-pipeline",
    pipeline_root=pipeline_root_path)
def pipeline(project_id: str, data_bucket: str, trainset_filename: str, model_repo: str, testset_filename: str, ):
    
    
    di_op = data_ingestion_comp(
        project_id=project_id,
        bucket=data_bucket,
        file_name=trainset_filename
    )
     
    training_lr_job_run_op = trail_lr_com(
        project_id=project_id,
        model_repo=model_repo,       
        features=di_op.output
    )
    
    pre_di_op = data_ingestion_comp(
        project_id=project_id,
        bucket=data_bucket,
        file_name=testset_filename
    ).after(training_lr_job_run_op)

    predict_lr_job_run_op = prediction_lr_com(
        project_id=project_id,
        model_repo=model_repo,       
        features=pre_di_op.output
    )

In [22]:
from kfp.v2 import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='iris_predictor_training_pipeline.json')

In [23]:
import google.cloud.aiplatform as aip

job = aip.PipelineJob(
    display_name="iris-predictor",
    enable_caching=False,
    template_path="iris_predictor_training_pipeline.json",
    pipeline_root=pipeline_root_path,
    parameter_values={
        'project_id': project_id, # makesure to use your project id 
        'data_bucket': 'data_de2022_michimalek',  # makesure to use your data bucket name 
        'trainset_filename': 'full_data.csv',     # makesure to upload these to your data bucket from DE2022/lab4/data
        'testset_filename': 'test.csv',    # makesure to upload these to your data bucket from DE2022/lab4/data
        'model_repo': 'model_de2022_michimalek' # makesure to use your model bucket name 
    }
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/26340798564/locations/us-central1/pipelineJobs/iris-predictor-training-pipeline-20221023163513
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/26340798564/locations/us-central1/pipelineJobs/iris-predictor-training-pipeline-20221023163513')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/iris-predictor-training-pipeline-20221023163513?project=26340798564
PipelineJob projects/26340798564/locations/us-central1/pipelineJobs/iris-predictor-training-pipeline-20221023163513 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/26340798564/locations/us-central1/pipelineJobs/iris-predictor-training-pipeline-20221023163513 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/26340798564/locations/us-central1/pipelineJobs/iris-predictor-training-pipeline-20221023163513 current state:
PipelineStat