## Objective
Create a pipeline which:
- Loads data from a Cloud Storage bucket
- Train a scikit-learn model
- Perform validation
- Upload model
- Deploy model

## Setup
- Using workbench with `Pytorch:2.0` image
- Create bucket (ideally in same region)
`gsutil mb -l europe-west4 gs://<BUCKET NAME>`
- Download the test file to upload to bucket
`wget http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-white.csv`
- Upload training file to bucket
`gcloud storage cp winequality-white.csv gs://<BUCKET NAME>/data/wine.csv`
- Upload test file to bucket
`gcloud storage cp test.csv gs://<BUCKET NAME>/data/test.csv`
- Install `google_cloud_pipeline_components` (1.0.44)
`pip install -q --upgrade "google-cloud-pipeline-components<2"`

## Dependencies

In [1]:
REGION="europe-west4"
PROJECT_ID="<PROJECT ID>"
BUCKET_NAME="gs://<BUCKET NAME>"
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"

In [2]:
from typing import NamedTuple
import typing
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)

from kfp.v2 import compiler
from google.cloud import bigquery
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components import aiplatform as gcc_aip

## Step 1 - Read from cloud storage bucket

In [12]:
@component(
    base_image="europe-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
)
def load_data_from_bucket(
    url: str,
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset]
):
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    
    df = pd.read_csv(url, delimiter=";")
    df['best_quality'] = [ 1 if x >= 7 else 0 for x in df.quality] 
    df['target'] = df.best_quality
    df = df.drop(['quality', 'total sulfur dioxide', 'best_quality'], axis=1)
   
    train, test = train_test_split(df, test_size=0.3, random_state=42)
    train.to_csv(dataset_train.path + ".csv" , index=False, encoding='utf-8-sig')
    test.to_csv(dataset_test.path + ".csv" , index=False, encoding='utf-8-sig')

## Step 2 - Train model

In [13]:
@component(
    base_image="europe-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
)
def train_model(
    dataset: Input[Dataset],
    model: Output[Model], 
):
    
    from sklearn.ensemble import RandomForestClassifier
    import pandas as pd
    import pickle

    data = pd.read_csv(dataset.path + ".csv")
    model_rf = RandomForestClassifier(n_estimators=10, random_state=42)
    model_rf.fit(data.drop(columns=["target"]), data.target)
    model.metadata["model_type"] = "RandomForestClassifier"
    file_name = model.path + f".pkl"
    with open(file_name, 'wb') as file:
        pickle.dump(model_rf, file)

## Step 3 - Validate the trained model

In [14]:
@component(
    base_image="europe-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
)
def validate_model(
    test_set: Input[Dataset],
    trained_model: Input[Model],
    thresholds_dict_str: str,
    metrics: Output[ClassificationMetrics],
    kpi: Output[Metrics]
) -> NamedTuple("output", [("deploy", str)]):

    from sklearn.ensemble import RandomForestClassifier
    import pandas as pd
    import logging 
    import pickle
    from sklearn.metrics import roc_auc_score, accuracy_score
    import json
    import typing

    def threshold_check(val1, val2):
        cond = "false"
        if val1 >= val2 :
            cond = "true"
        return cond

    data = pd.read_csv(test_set.path + ".csv")
    model = RandomForestClassifier()
    file_name = trained_model.path + ".pkl"
    with open(file_name, 'rb') as file:  
        model = pickle.load(file)
    
    y_test = data.drop(columns=["target"])
    y_target = data.target
    y_pred = model.predict(y_test)
    
    y_scores =  model.predict_proba(data.drop(columns=["target"]))[:, 1]

    print("calculating roc")
    roc_auc = roc_auc_score(y_target, y_scores)
    trained_model.metadata["roc_auc"] = float(roc_auc)
    
    print("calculating accuracy score")
    accuracy = accuracy_score(data.target, y_pred.round())
    trained_model.metadata["accuracy"] = float(accuracy)
    kpi.log_metric("accuracy", float(accuracy))
    
    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = threshold_check(float(roc_auc), int(thresholds_dict['roc']))
    return (deploy,)

## Step 4 - Deploy model

In [27]:
@component(
    packages_to_install=["google-cloud-aiplatform<2"],
    base_image="europe-docker.pkg.dev/vertex-ai/training/sklearn-cpu.1-0:latest",
)
def deploy_model(
    model: Input[Model],
    model_name: str,
    project: str,
    region: str,
    serving_container_image_uri : str, 
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model]
):
    from google.cloud import aiplatform
    aiplatform.init(project=project, location=region)

    DISPLAY_NAME = model_name
    MODEL_NAME = f"{model_name}_rf"
    ENDPOINT_NAME = f"{model_name}_endpoint"
    
    def create_endpoint():
        endpoints = aiplatform.Endpoint.list(
            filter='display_name="{}"'.format(ENDPOINT_NAME),
            order_by='create_time desc',
            project=project,
            location=region
        )
        if len(endpoints) > 0:
            endpoint = endpoints[0]  # most recently created
        else:
            endpoint = aiplatform.Endpoint.create(
                display_name=ENDPOINT_NAME, project=project, location=region
            )
    endpoint = create_endpoint()   
    
    #Import a model programmatically
    model_upload = aiplatform.Model.upload(
        model_id = DISPLAY_NAME,
        artifact_uri = model.uri[:-5],
        serving_container_image_uri = serving_container_image_uri,
        serving_container_health_route=f"/v1/models/{MODEL_NAME}",
        serving_container_predict_route=f"/v1/models/{MODEL_NAME}:predict",
        serving_container_environment_variables={"MODEL_NAME": MODEL_NAME}
    )
    model_deploy = model_upload.deploy(
        machine_type="n1-standard-4", 
        endpoint=endpoint,
        traffic_split={"0": 100},
        deployed_model_display_name=DISPLAY_NAME
    )

    # Save data to the output params
    vertex_model.uri = model_deploy.resource_name

## Creating the pipeline

In [33]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
DISPLAY_NAME = 'pipeline-test-job{}'.format(TIMESTAMP)

In [34]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline. Use to determine the pipeline Context.
    name="pipeline-test"
)
def pipeline(
    url: str = "/gcs/<BUCKET NAME>/data/wine.csv",
    project: str = PROJECT_ID,
    region: str = REGION,
    thresholds_dict_str: str = '{"roc":0.8}',
    serving_container_image_uri: str = "europe-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest"
    ):
    
    data_op = load_data_from_bucket(url)
    train_model_op = train_model(data_op.outputs["dataset_train"])
    model_evaluation_op = validate_model(
        test_set=data_op.outputs["dataset_test"],
        trained_model=train_model_op.outputs["model"],
        thresholds_dict_str=thresholds_dict_str,
    )
    
    with dsl.Condition(
        model_evaluation_op.outputs["deploy"]=="true",
        name="deploy-test",
    ):     
        deploy_model_op = deploy_model(
            model=train_model_op.outputs['model'],
            model_name="pipeline-test",
            project=project,
            region=region,
            serving_container_image_uri = serving_container_image_uri
        )

## Compile and run the pipeline

In [35]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path='pipeline_model.json'
)

In [36]:
job = pipeline_jobs.PipelineJob(
    display_name="pipeline-test",
    template_path="pipeline_model.json",
    enable_caching=False,
    location=REGION
)

In [37]:
job.run(sync=False)

## Check results

In [38]:
! gcloud ai models list --region={REGION} --filter="pipeline-test"

Using endpoint [https://europe-west4-aiplatform.googleapis.com/]
MODEL_ID             DISPLAY_NAME
3730097595276591104  pipeline-test
5058659485350887424  pipeline-test
5538855795619266560  pipeline-test


## Batch prediction

In [39]:
# Define variables 
job_display_name = "pipeline-batch-prediction-job"
MODEL_NAME="pipeline-test"
ENDPOINT_NAME=f"{MODEL_NAME}_endpoint"
BUCKET_URI="gs://<BUCKET NAME>/data"
input_file_name="test.csv"

# Get model id
MODEL_ID=!(gcloud ai models list --region=$REGION \
           --filter=display_name=$MODEL_NAME)
MODEL_ID=MODEL_ID[2].split(" ")[0]

model_resource_name = f"projects/{PROJECT_ID}/locations/{REGION}/models/{MODEL_ID}"
gcs_source= [f"{BUCKET_URI}/{input_file_name}"]
gcs_destination_prefix=f"{BUCKET_URI}/output"

def batch_prediction_job(
    project: str,
    location: str,
    model_resource_name: str,
    job_display_name: str,
    gcs_source: str,
    gcs_destination_prefix: str,
    machine_type: str,
    starting_replica_count: int = 1, # The number of nodes for this batch prediction job. 
    max_replica_count: int = 1,    
):   
    aiplatform.init(project=project, location=location)

    model = aiplatform.Model(model_resource_name)

    batch_prediction_job = model.batch_predict(
        job_display_name=job_display_name,
        instances_format="csv", #json
        gcs_source=[f"{BUCKET_URI}/{input_file_name}"],
        gcs_destination_prefix=f"{BUCKET_URI}/output",
        machine_type=machine_type, # must be present      
    )
    batch_prediction_job.wait()
    print(batch_prediction_job.display_name)
    print(batch_prediction_job.state)
    return batch_prediction_job

batch_prediction_job(PROJECT_ID, REGION, model_resource_name, job_display_name, gcs_source, gcs_destination_prefix, machine_type="n1-standard-2")

pipeline-batch-prediction-job
JobState.JOB_STATE_SUCCEEDED


<google.cloud.aiplatform.jobs.BatchPredictionJob object at 0x7fbcdab060e0> 
resource name: projects/840635250828/locations/europe-west4/batchPredictionJobs/1160580152850120704

## Using Endpoint for prediction

In [42]:
instance = [[1,2,3,2,1,2,3,6,7,10],
            [6.2,0.33,0.14,4.8,0.052,27,0.99475,3.21,0.48,9.4],
            [7.1,0.21,0.27,8.6,0.056,26,0.9956,2.95,0.52,9.5],
            [6.1,0.16,0.24,1.4,0.046,17,0.99319,3.66,0.57,10.3],
            [6.3,0.29,0.23,14.2,0.037,24,0.99528,3.08,0.38,10.6]
           ]
ENDPOINT_ID = !(gcloud ai endpoints list --region=$REGION \
              --format='value(ENDPOINT_ID)'\
              --filter=display_name=$ENDPOINT_NAME \
              --sort-by=creationTimeStamp | tail -1)
ENDPOINT_ID = ENDPOINT_ID[1]

def endpoint_predict(
    project: str, location: str, instances: list, endpoint: str
):
    aiplatform.init(project=project, location=location)

    endpoint = aiplatform.Endpoint(endpoint)

    prediction = endpoint.predict(instances=instances)
    return prediction

endpoint_predict(PROJECT_ID, REGION, instance, ENDPOINT_ID)

Prediction(predictions=[0.0, 0.0, 0.0, 0.0, 0.0], deployed_model_id='6636392700511780864', model_version_id='1', model_resource_name='projects/840635250828/locations/europe-west4/models/3730097595276591104', explanations=None)