In [25]:
import os

# The Vertex AI Workbench Notebook product has specific requirements
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME") and not os.getenv("VIRTUAL_ENV")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists(
    "/opt/deeplearning/metadata/env_version"
)

# Vertex AI Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_WORKBENCH_NOTEBOOK:
    USER_FLAG = "--user"

! pip3 install --upgrade google-cloud-aiplatform {USER_FLAG} -q
! pip3 install -U google-cloud-storage {USER_FLAG} -q
! pip3 install {USER_FLAG} kfp google-cloud-pipeline-components --upgrade -q

ERROR: Could not install packages due to an OSError: [WinError 5] Access is denied: 'C:\\Users\\ceesm\\anaconda3\\Lib\\site-packages\\~aml\\_yaml.cp39-win_amd64.pyd'
Consider using the `--user` option or check the permissions.



In [26]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [1]:
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import component
from kfp.v2.dsl import (
    Input,
    Output,
    Artifact,
    Dataset,
)

ModuleNotFoundError: No module named 'kfp'

In [None]:
#The Google Cloud project that this pipeline runs in.
project_id = "data-engineering-jads"
# The region that this pipeline runs in
region = "us-west1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
pipeline_root_path = "gs://jads_temp695341"

# Pipeline component: Data ingestion

In [None]:
from typing import Dict

def download_data(project_id: str, bucket: str, file_name: str) -> Dict:
    '''download data'''
    from google.cloud import storage
    import pandas as pd
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Downloaing the file from a google bucket 
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(bucket)
    blob = bucket.blob(file_name)
    local_path = '/tmp/'+ file_name
    blob.download_to_filename(local_path)
    logging.info('Downloaded Data!')
    
    # Convert the data to a dictiory object    
    dict_from_csv = pd.read_csv(local_path, index_col=None, squeeze=True).to_dict()
    logging.info('Returning Data as Dictionary Object!')
    return dict_from_csv

In [None]:
# create a KFP component for data ingestion
data_ingestion_component = kfp.components.create_component_from_func(
    download_data, output_component_file='data_ingestion.yaml', packages_to_install=['google-cloud-storage', 'pandas'])

# Pipeline component: Training SVC

In [None]:
from typing import NamedTuple, Dict

def train_svc (features: Dict, project_id: str, model_repo: str ) -> Dict:
    import pandas as pd
    import numpy as np
    import logging
    import sys
    import os
    import joblib
    from google.cloud import storage
    from sklearn import metrics
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import RobustScaler
    from sklearn.svm import SVC
    from sklearn.metrics import accuracy_score


    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # reading data
    df = pd.DataFrame.from_dict(features)
    cat_cols = ['sex','exng','caa','cp','fbs','restecg','slp','thall']
    con_cols = ["age","trtbps","chol","thalachh","oldpeak"]

    # creating copy of df and encode categorical columns
    df1 = df
    df1 = pd.get_dummies(df1, columns = cat_cols, drop_first = True)

    # defining features and target
    y = df1[['output']]
    X = df1.drop(['output'],axis=1)

    # instantiate the scaler
    scaler = RobustScaler()

    # scaling the continuous features
    X[con_cols] = scaler.fit_transform(X[con_cols])

    # train test split and fit
    X_train, X_test, y_train, y_test = train_test_split(X,y, test_size = 0.3, random_state = 42)
    clf = SVC(kernel='linear', C=1, random_state=42).fit(X_train,y_train)
    y_pred = clf.predict(X_test)

    # Accuracy logging
    metrics = {
        "accuracy": accuracy_score(y_test, y_pred)
    }
    logging.info(metrics)

    # Save the model localy
    local_file = '/tmp/local_model.pkl'
    joblib.dump(clf, local_file)

    # Save to GCS as model.pkl
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('svc_model.h5')
    # Upload the locally saved model
    blob.upload_from_filename(local_file)

    print("Saved the model to GCP bucket : " + model_repo)
    return metrics

In [None]:
# create a KFP component for training SVC model
train_svc_component = kfp.components.create_component_from_func(
    train_svc, output_component_file='train_svc_model.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'joblib', 'scikit-learn'])

# Pipeline Component: Logistic regression

In [None]:
from typing import NamedTuple, Dict

def train_LR (features: Dict, project_id: str, model_repo: str ) -> Dict:
    import pandas as pd
    import numpy as np
    import logging
    import sys
    import os
    import joblib
    from google.cloud import storage
    from sklearn import metrics
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import RobustScaler
    from sklearn.linear_model import LogisticRegression
    from sklearn.metrics import accuracy_score


    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # reading data
    df = pd.DataFrame.from_dict(features)
    cat_cols = ['sex','exng','caa','cp','fbs','restecg','slp','thall']
    con_cols = ["age","trtbps","chol","thalachh","oldpeak"]

    # creating copy of df and encode categorical columns
    df1 = df
    df1 = pd.get_dummies(df1, columns = cat_cols, drop_first = True)

    # defining features and target
    y = df1[['output']]
    X = df1.drop(['output'],axis=1)

    # instantiate the scaler
    scaler = RobustScaler()

    # scaling the continuous features
    X[con_cols] = scaler.fit_transform(X[con_cols])

    # train test split and fit
    X_train, X_test, y_train, y_test = train_test_split(X,y, test_size = 0.3, random_state = 42)
    
    # instantiating the object
    logreg = LogisticRegression()

    # fitting the object
    logreg.fit(X_train, y_train)
    
    # calculating the probabilities
    y_pred_proba = logreg.predict_proba(X_test)

    # finding the predicted valued
    y_pred = np.argmax(y_pred_proba,axis=1)

    # Accuracy logging
    metrics = {
        "accuracy": accuracy_score(y_test, y_pred)
    }
    logging.info(metrics)

    # Save the model localy
    local_file = '/tmp/local_model.pkl'
    joblib.dump(logreg, local_file)

    # Save to GCS as model.pkl
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('logreg_model.h5')
    # Upload the locally saved model
    blob.upload_from_filename(local_file)

    print("Saved the model to GCP bucket : " + model_repo)
    return metrics

In [None]:
# create a KFP component for training logreg model
train_lr_component = kfp.components.create_component_from_func(
    train_LR, output_component_file='train_LR_model.yaml', packages_to_install=['google-cloud-storage', 'pandas', 'joblib', 'scikit-learn'])

# Model comparison component

In [None]:
from typing import NamedTuple, Dict

def compare_model_result(svc_metrics: Dict, lr_metrics: Dict) -> str:
    import logging
    import json
    import sys
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    logging.info(svc_metrics)
    logging.info(lr_metrics)
    if svc_metrics.get("accuracy") > lr_metrics.get("accuracy"):
        return "SVC"
    else :
        return "LR"

In [None]:
# component for selecting between SVC and LR
compare_model_comp = kfp.components.create_component_from_func(
    compare_model_result, output_component_file='model_comparison_result.yaml')

# Define pipeline

In [None]:
@kfp.dsl.pipeline(
    name="heart-attack-training-pipeline",
    pipeline_root=pipeline_root_path)

def pipeline(project_id: str, data_bucket: str, dataset_filename: str, model_repo: str, ):
    
    data_ingestion_op = data_ingestion_component(
    project_id = project_id,
    bucket = data_bucket,
    file_name = dataset_filename
    )
    
    training_svc_job_run_op = train_svc_component(
    project_id = project_id,
    model_repo = model_repo,
    features = data_ingestion_op.output
    )
    
    training_lr_job_run_op = train_lr_component(
    project_id = project_id,
    model_repo = model_repo,
    features = data_ingestion_op.output
    )
    
    compare_model_op = compare_model_comp(training_svc_job_run_op.output,
                                   training_lr_job_run_op.output).after(training_svc_job_run_op, training_lr_job_run_op)

# Compile pipeline to JSON

In [None]:
from kfp.v2 import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='heart_attack_training_pipeline.json')

# Submit pipeline run

In [None]:
import google.cloud.aiplatform as aip

job = aip.PipelineJob(
    display_name="heart_attack_training_pipeline",
    enable_caching=False,
    template_path="heart_attack_training_pipeline.json",
    pipeline_root=pipeline_root_path,
    parameter_values={
        'project_id': project_id, 
        'data_bucket': 'data_695341',  
        'dataset_filename': 'heart.csv',     
        'model_repo':'model_repo_de695341'
    }
)

job.run()