<a href="https://colab.research.google.com/github/03sarath/vertex-ai-mlops-kfp2/blob/main/Vertex_AI_kfp2_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Copyright & License (click to expand)



In [1]:
# @title Copyright & License (click to expand)
# Copyright 2023 Psitron Technologies Pvt Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

## Overview

This tutorial uses the Vertex AI Pipelines with KFP 2.x.


### Objective: **üç∑** **Predicting wine quality**

In this tutorial, you learn to use `Vertex AI Pipelines` and KFP 2.x version of `Google Cloud Pipeline Components` to train and deploy an RandomForestClassifier model.


This tutorial uses the following Google Cloud ML services:

- `Vertex AI Pipelines`
- `Google Cloud Pipeline Components`

The steps performed include:

- Create a KFP pipeline:
    - Data preprocessing.
    - Train an RandomForestClassifier `Model`.
    - Evaluation an `Model`.
    - Create an `Endpoint` resource.
    - Deploys the `Model` resource to the `Endpoint` resource.
- Compile the KFP pipeline.
- Execute the KFP pipeline using `Vertex AI Pipelines`

The components are [documented here](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-1.0.0/google_cloud_pipeline_components.aiplatform.html).

### Install Vertex AI SDK for Python and other required packages

In [None]:
! pip3 install --no-cache-dir --upgrade "kfp>2" \
                                        google-cloud-aiplatform

Check the KFP SDK version.

In [None]:

! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform

KFP SDK version: 2.9.0
google-cloud-aiplatform==1.66.0


### Restart runtime (Colab only)

To use the newly installed packages, you must restart the runtime on Google Colab.

In [None]:
import sys

if "google.colab" in sys.modules:

    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

### Authenticate your notebook environment (Colab only)

Authenticate your environment on Google Colab.

In [None]:

import sys

if "google.colab" in sys.modules:

    from google.colab import auth

    auth.authenticate_user()

### Set Google Cloud project information

Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [None]:

PROJECT_ID = "vertexai-435412"  # @param {type:"string"}
LOCATION = "us-central1"
BQ_LOCATION = LOCATION.split("-")[0].upper()

### Create a Cloud Storage bucket

Create a storage bucket to store intermediate artifacts such as datasets.

In [None]:
BUCKET_URI = f"gs://your-bucket-name-{PROJECT_ID}-unique"  # @param {type:"string"}


In [None]:
# Create bucket
PIPELINE_ROOT = f"{BUCKET_URI}/pipeline_root_wine/"
PIPELINE_ROOT

'gs://your-bucket-name-vertexai-435412-unique/pipeline_root_wine/'

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l $LOCATION -p $PROJECT_ID $BUCKET_URI

Creating gs://your-bucket-name-vertexai-435412-unique/...
ServiceException: 409 A Cloud Storage bucket named 'your-bucket-name-vertexai-435412-unique' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


#### Service Account

You use a service account to create Vertex AI Pipeline jobs.

In [None]:
SERVICE_ACCOUNT = "[your-service-account]"  # @param {type:"string"}

In [None]:
import sys

IS_COLAB = "google.colab" in sys.modules
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    else:  # IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

Service Account: 77267352267-compute@developer.gserviceaccount.com


#### Set service account access for Vertex AI Pipelines

Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step. You only need to run this step once per service account.

In [None]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

### Import libraries and define constants

In [None]:
import google.cloud.aiplatform as aiplatform
import kfp
from kfp import compiler, dsl
from kfp.dsl import Artifact, Dataset, Input, Metrics, Model, Output, component, ClassificationMetrics
from typing import NamedTuple
from google.cloud.aiplatform import pipeline_jobs


## Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [None]:
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

### Define Data preparation component


In [None]:
@component(
    packages_to_install=["pandas==1.3.5", "pyarrow", "scikit-learn==1.0.2"],
)

def get_wine_data(
    url: str,
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset]
):
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split as tts

    df_wine = pd.read_csv(url, delimiter=";")
    df_wine['best_quality'] = [ 1 if x>=7 else 0 for x in df_wine.quality]
    df_wine['target'] = df_wine.best_quality
    df_wine = df_wine.drop(['quality', 'total sulfur dioxide', 'best_quality'], axis=1)


    train, test = tts(df_wine, test_size=0.3)
    train.to_csv(dataset_train.path + ".csv" , index=False, encoding='utf-8-sig')
    test.to_csv(dataset_test.path + ".csv" , index=False, encoding='utf-8-sig')


  return component_factory.create_component_from_func(


### Define RandomForestClassifier Model training component


In [None]:
@component(
    packages_to_install = [
        "pandas==1.3.5",
        "scikit-learn==1.0.2"
    ]
)
def train_winequality(
    dataset:  Input[Dataset],
    model: Output[Model],
):

    from sklearn.ensemble import RandomForestClassifier
    import pandas as pd
    import pickle

    data = pd.read_csv(dataset.path+".csv")
    model_rf = RandomForestClassifier(n_estimators=10)
    model_rf.fit(
        data.drop(columns=["target"]),
        data.target,
    )
    model.metadata["framework"] = "RF"
    file_name = model.path + f".pkl"
    with open(file_name, 'wb') as file:
        pickle.dump(model_rf, file)

### Define Model evaluation component


In [None]:
@component(
    packages_to_install = [
        "pandas",
        "scikit-learn==1.0.2"
    ]
)
def winequality_evaluation(
    test_set:  Input[Dataset],
    rf_winequality_model: Input[Model],
    thresholds_dict_str: str,
    metrics: Output[ClassificationMetrics],
    kpi: Output[Metrics]
) -> NamedTuple("output", [("deploy", str)]):

    from sklearn.ensemble import RandomForestClassifier
    import pandas as pd
    import logging
    import pickle
    from sklearn.metrics import roc_curve, confusion_matrix, accuracy_score
    import json
    import typing


    def threshold_check(val1, val2):
        cond = "false"
        if val1 >= val2 :
            cond = "true"
        return cond

    data = pd.read_csv(test_set.path+".csv")
    model = RandomForestClassifier()
    file_name = rf_winequality_model.path + ".pkl"
    with open(file_name, 'rb') as file:
        model = pickle.load(file)

    y_test = data.drop(columns=["target"])
    y_target=data.target
    y_pred = model.predict(y_test)


    y_scores =  model.predict_proba(data.drop(columns=["target"]))[:, 1]
    fpr, tpr, thresholds = roc_curve(
         y_true=data.target.to_numpy(), y_score=y_scores, pos_label=True
    )
    metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())

    metrics.log_confusion_matrix(
       ["False", "True"],
       confusion_matrix(
           data.target, y_pred
       ).tolist(),
    )

    accuracy = accuracy_score(data.target, y_pred.round())
    thresholds_dict = json.loads(thresholds_dict_str)
    rf_winequality_model.metadata["accuracy"] = float(accuracy)
    kpi.log_metric("accuracy", float(accuracy))
    deploy = threshold_check(float(accuracy), int(thresholds_dict['roc']))
    return (deploy,)


### Define deploying the model component

Finally, you define a component to deploy the model.

In [None]:
@component(
    packages_to_install=["google-cloud-aiplatform", "scikit-learn==1.0.2",  "kfp"],
    output_component_file="model_winequality_coponent.yml"
)
def deploy_winequality(
    model: Input[Model],
    project: str,
    region: str,
    serving_container_image_uri : str,
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model]
):
    from google.cloud import aiplatform
    aiplatform.init(project=project, location=region)

    DISPLAY_NAME  = "winequality"
    MODEL_NAME = "winequality-rf"
    ENDPOINT_NAME = "winequality_endpoint"

    def create_endpoint():
        endpoints = aiplatform.Endpoint.list(
        filter='display_name="{}"'.format(ENDPOINT_NAME),
        order_by='create_time desc',
        project=project,
        location=region,
        )
        if len(endpoints) > 0:
            endpoint = endpoints[0]  # most recently created
        else:
            endpoint = aiplatform.Endpoint.create(
            display_name=ENDPOINT_NAME, project=project, location=region
        )
    endpoint = create_endpoint()


    #Import a model programmatically
    model_upload = aiplatform.Model.upload(
        display_name = DISPLAY_NAME,
        artifact_uri = model.uri.replace("model", ""),
        serving_container_image_uri =  serving_container_image_uri,
        serving_container_health_route=f"/v1/models/{MODEL_NAME}",
        serving_container_predict_route=f"/v1/models/{MODEL_NAME}:predict",
        serving_container_environment_variables={
        "MODEL_NAME": MODEL_NAME,
    },
    )
    model_deploy = model_upload.deploy(
        machine_type="n1-standard-4",
        endpoint=endpoint,
        traffic_split={"0": 100},
        deployed_model_display_name=DISPLAY_NAME,
    )

    # Save data to the output params
    vertex_model.uri = model_deploy.resource_name

  @component(
  def deploy_winequality(


In [None]:
from datetime import datetime

TIMESTAMP =datetime.now().strftime("%Y%m%d%H%M%S")
DISPLAY_NAME = 'pipeline-winequality-job{}'.format(TIMESTAMP)

## Construct the RandomForestClassifier training pipeline

Now you define the pipeline, with the following steps:

- Data preparation.
- Train the model.
- Evaluate the model.
- Deploy the model.

In [None]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline. Use to determine the pipeline Context.
    name="pipeline-winequality",

)
def pipeline(
    url: str = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-white.csv",
    project: str = PROJECT_ID,
    region: str = LOCATION,
    display_name: str = DISPLAY_NAME,
    api_endpoint: str = LOCATION+"-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"roc":0.8}',
    serving_container_image_uri: str = "europe-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest"
    ):

    data_op = get_wine_data(url=url)
    train_model_op = train_winequality(dataset=data_op.outputs["dataset_train"])
    model_evaluation_op = winequality_evaluation(
        test_set=data_op.outputs["dataset_test"],
        rf_winequality_model=train_model_op.outputs["model"],
        thresholds_dict_str = thresholds_dict_str, # I deploy the model anly if the model performance is above the threshold
    )

    with dsl.Condition(
        model_evaluation_op.outputs["deploy"]=="true",
        name="deploy-winequality",
    ):

        deploy_model_op = deploy_winequality(
        model=train_model_op.outputs['model'],
        project=project,
        region=region,
        serving_container_image_uri = serving_container_image_uri,
        )


  with dsl.Condition(


### Compile the pipeline

Next, you compile the pipeline.

In [None]:
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='ml_winequality.json')

### Run the pipeline using Vertex AI Pipelines

Now, run the compiled pipeline.

In [None]:
start_pipeline = pipeline_jobs.PipelineJob(
    display_name="winequality-pipeline",
    template_path="ml_winequality.json",
    enable_caching=False,
    location=LOCATION,
)

In [None]:
start_pipeline.run()


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/77267352267/locations/us-central1/pipelineJobs/pipeline-winequality-20240912143357
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/77267352267/locations/us-central1/pipelineJobs/pipeline-winequality-20240912143357')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipeline-winequality-20240912143357?project=77267352267
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/77267352267/locations/us-central1/pipelineJobs/pipeline-winequality-20240912143357 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/77267352267/locations/us-central1/pip

# ‚è∞ **Schedule** your pipelines to run recurrently

In [None]:
#Schedule your pipelines to run recurrently

from google.cloud import aiplatform

pipeline_job = aiplatform.PipelineJob(
  template_path="ml_winequality.json",
  pipeline_root=PIPELINE_ROOT,
  display_name="ml_winequality",
)

pipeline_job_schedule = aiplatform.PipelineJobSchedule(
  pipeline_job=pipeline_job,
  display_name="ml_winequality"
)

pipeline_job_schedule.create(
  cron="0 0 * * *",
  max_concurrent_run_count=1,
  max_run_count=10,
)

INFO:google.cloud.aiplatform.pipeline_job_schedules:Creating PipelineJobSchedule
INFO:google.cloud.aiplatform.pipeline_job_schedules:PipelineJobSchedule created. Resource name: projects/1027839402068/locations/us-central1/schedules/7235938798951989248
INFO:google.cloud.aiplatform.pipeline_job_schedules:To use this PipelineJobSchedule in another session:
INFO:google.cloud.aiplatform.pipeline_job_schedules:schedule = aiplatform.PipelineJobSchedule.get('projects/1027839402068/locations/us-central1/schedules/7235938798951989248')
INFO:google.cloud.aiplatform.pipeline_job_schedules:View Schedule:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/schedules/7235938798951989248?project=1027839402068


#‚è∞List your all scheduled pipelines


In [None]:
#List your all scheduled pipelines
from google.cloud import aiplatform

aiplatform.PipelineJobSchedule.list(
  filter='display_name="ml_winequality"',
  order_by='create_time desc'
)

[<google.cloud.aiplatform.pipeline_job_schedules.PipelineJobSchedule object at 0x7fa2bcc226b0> 
 resource name: projects/1027839402068/locations/us-central1/schedules/7235938798951989248]

#Invoke **ML model deployed in vertex ai endpoint**

In [None]:
import subprocess
from google.cloud import aiplatform

ENDPOINT_NAME = "winequality_endpoint"

#Bad Qty Wine Data üëé
instance = [[7.2, 0.23, 0.32, 8.5, 0.058, 47, 132, 0.993, 3.12, 0.46]]

#Good Qty Wine Data üëç
#instance = [[6.7,0.23,0.31,2.1,0.046,30,0.9926,3.33,0.64,10.7]]


def endpoint_predict(project: str, location: str, instances: list, endpoint: str):
    aiplatform.init(project=project, location=location)
    endpoint = aiplatform.Endpoint(endpoint)
    prediction = endpoint.predict(instances=instances)
    return prediction

#prediction = endpoint_predict('<project_id>', 'us-central1', instance, '<endpoint_id>')
prediction = endpoint_predict('vertex-435406', 'us-central1', instance, '8295139031918837760')
print("Prediction from Deployed model_id:", prediction[1])
print("Prediction from Deployed model version_id:", prediction[3])
print("Predicted wine quality üç∑üëçüëé:", prediction[0])

Prediction from Deployed model_id: 2807054285223755776
Prediction from Deployed model version_id: 1
Predicted wine quality üç∑üëçüëé: [0.0]
