# Create & Deploy Vertex-AI Pipeline w/ Kubeflow

Install the needed libraries in order to run the code locally

In [1]:
# !pip3 install google-cloud-aiplatform==1.0.0 --upgrade --user
# !pip install kfp --upgrade --user
# !pip3 install kfp google-cloud-pipeline-components==0.1.1 --upgrade --user
# !pip3 install scikit-learn --user
# !pip3 install google-cloud-aiplatform --upgrade --user
# !pip3 install pandas --user
# !pip3 install python-dotenv --user

Might need to restart kernel after initial installation of the cell above

In [56]:
import uuid
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, component, ClassificationMetrics, Metrics)
from kfp.v2.google.client import AIPlatformClient
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple


Getting some preset environment variables save to a local file. Create one of your own by following these instructions: https://stackoverflow.com/a/54028874

In [57]:
#https://stackoverflow.com/a/54028874
%load_ext dotenv
%dotenv

import os
PROJECT_ID = "deep-learning-dlhlp" #os.environ['PROJECT_ID']
BUCKET_NAME = "deep-learning-dlhlp" #os.environ['BUCKET']

PIPELINE_ROOT = 'gs://{}/pipeline_root'.format(BUCKET_NAME)
REGION = 'us-central1'

print(PROJECT_ID)
print(BUCKET_NAME)
print(PIPELINE_ROOT)

The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv
cannot find .env file
deep-learning-dlhlp
deep-learning-dlhlp
gs://deep-learning-dlhlp/pipeline_root


## 1. Create a component for reading data from BQ into CSV

In [90]:
@component(packages_to_install=["pandas", "google-cloud-aiplatform", "google-cloud-bigquery-storage","google-cloud-bigquery","pyarrow"], output_component_file="preprocess.yaml")
def preprocess(output_csv_path: OutputPath('CSV')):
    #1
    from google.cloud import bigquery
    import google.auth
    
    creds, project = google.auth.default()
    client = bigquery.Client(project='deep-learning-dlhlp', credentials=creds)

    query =     """
            SELECT * FROM `deep-learning-dlhlp.telco.churn`
    """
    print(query)
    
    dataframe = client.query(query).to_dataframe()
    print(dataframe.head())
    
    dataframe.to_csv(output_csv_path)
    print("done")

## 2. Create a component to train

In [91]:
@component(packages_to_install=["pandas", "imbalanced-learn", "google-cloud-aiplatform", "pyarrow"])
def train(wmetrics: Output[ClassificationMetrics], input_csv_path: InputPath('CSV'), saved_model: Output[Model], artifact_uri: OutputPath(str), accuracy: Output[Metrics], model_type: str, project_id: str, bucket: str):
    from google.cloud import aiplatform
    from typing import NamedTuple
    #Train
    import pandas as pd
    df = pd.read_csv(input_csv_path)
    print(len(df))
    
    from sklearn.preprocessing import LabelEncoder
    for c in df.columns:
        if df[c].dtype=='object':    #Since we are encoding object datatype to integer/float
            lbl = LabelEncoder()
            lbl.fit(list(df[c].values))
            df[c] = lbl.transform(df[c].values)
    print(df.head())  #To check if properly encoded
    
    X = df[['Contract', 'tenure', 'TechSupport', 'OnlineSecurity', 'TotalCharges', 'PaperlessBilling',
       'DeviceProtection', 'Dependents', 'OnlineBackup', 'SeniorCitizen', 'MonthlyCharges',
       'PaymentMethod', 'Partner', 'PhoneService']] #taking only relevant columns
    y = df['Churn']


    # Scaling all the variables to a range of 0 to 1
    from sklearn.preprocessing import MinMaxScaler
    features = X.columns.values
    scaler = MinMaxScaler(feature_range = (0,1))
    scaler.fit(X)
    X = pd.DataFrame(scaler.transform(X))
    X.columns = features
    
    from sklearn.model_selection import train_test_split
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=101)

    #Choose which model to train
    if model_type == 'mlp':
        from sklearn.neural_network import MLPClassifier
        #from sklearn.linear_model import LogisticRegression
        model = MLPClassifier(alpha=1, max_iter=1000)
        
    elif model_type == 'random_forest':
        from sklearn.ensemble import RandomForestClassifier
        model = RandomForestClassifier(max_depth=5, n_estimators=10, max_features=1)
        
    elif model_type == 'decision_tree':
        from sklearn.tree import DecisionTreeClassifier
        model = DecisionTreeClassifier()
        
    model.fit(X_train, y_train)
    
    #Save the model to disk and also automatically to GCS
    import joblib
    
    joblib.dump(model, os.path.join(saved_model.path.replace("saved_model",""), 'model.joblib'))
    print(" saved_model.path: "+ saved_model.path)
    print(" saved_model.uri: "+ saved_model.uri)
    with open(artifact_uri, 'w') as f:
        f.write(saved_model.uri.replace("saved_model",""))
    
    print(saved_model.uri)
    
    accuracy.log_metric('accuracy', model.score(X_test, y_test)) # 71
    
    if model_type == 'decision_tree':
        #Adding roc curve
        from sklearn.metrics import roc_curve
        from sklearn.model_selection import cross_val_predict, train_test_split

        y_scores = cross_val_predict(model, X_train, y_train, cv=3, method="predict_proba")
        fpr, tpr, thresholds = roc_curve(
            y_true=y_train, y_score=y_scores[:, 1], pos_label=True
        )
        wmetrics.log_roc_curve(fpr, tpr, thresholds)


## 3. Eval component

In [92]:
@component()
def evalaluation(baseline: float, accuracy: Input[Metrics], accuracy2: Input[Metrics], accuracy3: Input[Metrics]) -> bool:
    isBetter = False
    print("baseline is :", str(baseline))
    print("str(dir(accuracy)):, ", str(dir(accuracy)))
    new_val = float(accuracy.metadata['accuracy'])
    print("new_val is:", str(new_val))
    
    if new_val>baseline:
        isBetter = True
    print("isBetter: "+str(isBetter))
    
    return isBetter

## 4. Predict Endpoint component

In [93]:
#https://github.com/googleapis/python-aiplatform/blob/master/samples/snippets/predict_custom_trained_model_sample.py
#https://cloud.google.com/ai-platform/prediction/docs/online-predict
@component(packages_to_install=["google-cloud-aiplatform"])
def predict_endpoint_test(endpoint_id: Input[Artifact],
                          location: str,
                          project: str,
                          api_endpoint: str = "us-central1-aiplatform.googleapis.com"):
    
    from typing import Dict
    from google.cloud import aiplatform
    from google.protobuf import json_format
    from google.protobuf.struct_pb2 import Value
    
    print(endpoint_id)
    endpoint_id = endpoint_id.uri.split('/')[-1]
    print(endpoint_id)
    
    client_options = {"api_endpoint": api_endpoint}
    
    client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)
    #https://machinelearningmastery.com/make-predictions-scikit-learn/
    instance_dict = [ 1.74481176,  0.86540763, -1.07296862 ,-2.3015387,  -2.06014071, 1.46210794, 0.3190391 , -0.24937038 ,-0.61175641 ,-0.7612069 , -0.38405435, -0.52817175, -0.3224172,   1.62434536]
    
    instance = json_format.ParseDict(instance_dict, Value())
    instances = [instance]
    
    endpoint = client.endpoint_path(
        project=project, location=location, endpoint=endpoint_id
    )
    response = client.predict(
        endpoint=endpoint, instances=instances
    )
    print("response")
    print(" deployed_model_id:", response.deployed_model_id)
    
    predictions = response.predictions
    for prediction in predictions:
        print(" prediction:" + str(prediction))

In [94]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [95]:
@kfp.dsl.pipeline(name="train-scikit" + str(uuid.uuid4()))
def pipeline(
    project: str = PROJECT_ID,
    bucket: str = BUCKET_NAME,
    baseline_accuracy: float = 0.70
):
    preprocess_task = preprocess()
    
    train_task = train(preprocess_task.output, model_type='decision_tree', project_id=PROJECT_ID, bucket=BUCKET_NAME)
    train_task2 = train(preprocess_task.output, model_type='random_forest', project_id=PROJECT_ID, bucket=BUCKET_NAME)
    train_task3 = train(preprocess_task.output, model_type='mlp', project_id=PROJECT_ID, bucket=BUCKET_NAME)
    
    eval_task = evalaluation(baseline_accuracy, train_task.outputs["accuracy"], train_task2.outputs["accuracy"], train_task3.outputs["accuracy"])
    
    with dsl.Condition(eval_task.output == "true", name="eval models"):
        model_upload_op = gcc_aip.ModelUploadOp(
            project=PROJECT_ID,
            display_name="model"+TIMESTAMP, 
    #        artifact_uri="gs://user-group-demo/pipeline_root/141610882258/train-scikitf989f632-b955-4bb1-a72d-0480d1c08627-20210620145355/train_-6780204423378370560/", # GCS location of model
            artifact_uri=train_task.outputs["artifact_uri"], # GCS location of model
            serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest",
        )

        endpoint_create_op = gcc_aip.EndpointCreateOp(
            project=PROJECT_ID,
            display_name="pipelines"+TIMESTAMP,
        )

        model_deploy_op = gcc_aip.ModelDeployOp( 
            project=PROJECT_ID,
            endpoint=endpoint_create_op.outputs["endpoint"],
            model=model_upload_op.outputs["model"],
            deployed_model_display_name="model_display_name",
            machine_type="n1-standard-4",
        )
        
        predict_task = predict_endpoint_test(project=PROJECT_ID, location=REGION, endpoint_id = model_deploy_op.outputs['endpoint'])

In [96]:
from kfp.v2 import compiler

compiler.Compiler().compile(pipeline_func=pipeline, 
                            package_path="dag-"+TIMESTAMP+".json")

In [97]:
from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(
    project_id=PROJECT_ID,
    region=REGION,
)

In [98]:
response = api_client.create_run_from_job_spec(
    "dag-"+TIMESTAMP+".json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={},
)

# Create AutoML training 

Create a managed image dataset from a CSV file and train it using AutoML Tabular Training.

Define the pipeline:

In [58]:
@kfp.dsl.pipeline(name="automl-tab-training-v2")
def pipeline(project: str = PROJECT_ID):

    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name="churn-automl", bq_source=["bq://deep-learning-dlhlp.telco.churn"]  #gcs_source=gcs_csv_path
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name="train-churn-automl_1",
        optimization_prediction_type="classification",
        column_transformations=[            
            {"categorical" : {"column_name": "Contract"}},
            {"numeric" : {"column_name": "tenure"}},
            {"categorical" : {"column_name": "TechSupport"}},
            {"categorical" : {"column_name": "OnlineSecurity"}},
            {"numeric" : {"column_name": "TotalCharges"}},
            {"categorical" : {"column_name": "PaperlessBilling"}},
            {"categorical" : {"column_name": "DeviceProtection"}},
            {"categorical" : {"column_name": "Dependents"}},
            {"categorical" : {"column_name": "OnlineBackup"}},
            {"numeric" : {"column_name": "SeniorCitizen"}},
            {"numeric" : {"column_name": "MonthlyCharges"}},
            {"categorical" : {"column_name": "PaymentMethod"}},
            {"categorical" : {"column_name": "Partner"}},
            {"categorical" : {"column_name": "PhoneService"}},
            {"categorical" : {"column_name": "Churn"}},
        ],
        optimization_objective="minimize-log-loss",
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Churn",
    )

    deploy_op = gcc_aip.ModelDeployOp(  # noqa: F841
        model=training_op.outputs["model"],
        project=project,
        machine_type="n1-standard-4",
    )

# Compile and run the pipeline
Now, you're ready to compile the pipeline:

In [59]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="churn_classification_pipeline.json"
)


The pipeline compilation generates the tab_regression_pipeline.json job spec file.

Next, instantiate an API client object:

In [60]:
from kfp.v2.google.client import AIPlatformClient  # noqa: F811

api_client = AIPlatformClient(project_id=PROJECT_ID, region=REGION)

Then, you run the defined pipeline like this:

In [61]:
response = api_client.create_run_from_job_spec(
    "churn_classification_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID},
)

In [62]:
#text
#categorical