In [None]:
#meta 9/13/2022 G CodeLab: Intro to Vertex Pipelines
#origin: G Community event 9/12/2022 MLOps with Skander
#refer to https://codelabs.developers.google.com/vertex-pipelines-intro?hl=en 
#Written by Sara Robinson, Last updated Jul 18, 2022

#infra: Qwiklabs -> Vertex AI
# User-managed nb ->  TensorFlow Enterprise 2.3 (with LTS) instance type without GPUs

#history
# 9/13/2022 CODELAB
#      Refer to full Codelab at
#  https://onedrive.live.com/redir?resid=838B6473B6EE9788%211476&page=Edit&wd=target%28GCommunity_202206.one%7C5c4c7ff9-c092-42dc-abfd-991b638f83fa%2FMLOps%20Codelab%20Intro%20to%20Vertex%20Pipelines%7C4b48ed42-71a1-43fb-bda7-81b5b80ed28e%2F%29&wdorigin=703

# Intro to Vertex Pipelines

In this lab, you will learn how to create and run ML pipelines with Vertex Pipelines.

What you learn
You'll learn how to:

- Use the Kubeflow Pipelines SDK to build scalable ML pipelines  
- Create and run a 3-step intro pipeline that takes text input  
- Create and run a pipeline that trains, evaluates, and deploys an AutoML classification model  
- Use pre-built components for interacting with Vertex AI services, provided through the google_cloud_pipeline_components library  
- Schedule a pipeline job with Cloud Scheduler

## 4. Vertex Pipelines setup

### Step 1: Create Python notebook and install libraries

In [2]:
USER_FLAG = "--user"

In [3]:
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

Collecting google-cloud-pipeline-components==0.2.0
  Downloading google_cloud_pipeline_components-0.2.0-py3-none-any.whl (135 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.0/135.0 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
Collecting google-cloud-notebooks>=0.4.0
  Downloading google_cloud_notebooks-1.4.2-py2.py3-none-any.whl (355 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m355.3/355.3 kB[0m [31m19.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting google-api-core<2dev,>=1.26.0
  Downloading google_api_core-1.33.0-py3-none-any.whl (115 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m115.2/115.2 kB[0m [31m20.2 MB/s[0m eta [36m0:00:00[0m
Collecting protobuf<4,>=3.13.0
  Downloading protobuf-3.20.1-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m50.0 MB/s[0m eta [36m0:00:00[0m
Collecting proto-plus>=1.10.1
  Download

In [4]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [2]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.9
google_cloud_pipeline_components version: 0.2.0


### Step 2: Set your project ID and bucket

In [3]:
import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  qwiklabs-gcp-02-3b82046ef315


In [4]:
if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

In [9]:
BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

### Step 3: Import libraries

In [None]:
import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

### Step 4: Define constants
`PIPELINE_ROOT` is the Cloud Storage path where the artifacts created by our pipeline will be written

In [10]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin:/home/jupyter/.local/bin


'gs://qwiklabs-gcp-02-3b82046ef315-bucket/pipeline_root/'

## 5. Creating your first pipeline
Create a short pipeline using the KFP SDK (this pipeline doesn't do anything ML related)

- How to create custom components in the KFP SDK
- How to run and monitor a pipeline in Vertex Pipelines

### Step 1: Create a Python function based component
 3 components in our first pipeline
 
 1. `product_name` component, which simply takes a string as input and returns that string

In [11]:
@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

If you wanted to share this component with someone, you could send them the generated yaml file and have them load it with the following:

In [12]:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')

### Step 2: Create two additional components
To complete our pipeline, we'll create two more components. 

2. `emoji` takes a string as input, and converts this string to its corresponding emoji if there is one. It returns a tuple with the input text passed, and the resulting emoji:

In [13]:
@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

3. `build_sentence` consume the output of the first two and combine them to return a string

In [14]:
@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

### Step 3: Putting the components together into a pipeline

In [None]:
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

### Step 4: Compile and run the pipeline

In [16]:
compiler.Compiler().compile(
    piapeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)



In [17]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [18]:
job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

Create a new pipeline execution and see logs with a link to view the pipeline run in your console  
(5-6 minutes to run)

In [19]:
job.submit()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/105288858048/locations/us-central1/pipelineJobs/hello-world-pipeline-20220913182929
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/105288858048/locations/us-central1/pipelineJobs/hello-world-pipeline-20220913182929')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/hello-world-pipeline-20220913182929?project=105288858048


Now that you're familiar with how the KFP SDK and Vertex Pipelines works, you're ready to build a pipeline that creates and deploys an ML model using other Vertex AI services. Let's dive in!

## 6. Creating an end-to-end ML pipeline
This ML pipeline will:

- Create a Dataset in Vertex AI  
- Train a tabular classification model with AutoML  
- Get evaluation metrics on this model  
- Based on the evaluation metrics, decide whether to deploy the model using conditional logic in Vertex Pipelines  
- Deploy the model to an endpoint using Vertex Prediction  

Dataset (tabular):  UCI Machine Learning Dry beans dataset, from: KOKLU, M. and OZKAN, I.A., (2020), "Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques."In Computers and Electronics in Agriculture, 174, 105507. DOI.

### Step 1: A custom component for model evaluation
The custom component we'll define will be used towards the end of our pipeline once model training has completed. This component will do a few things:

- Get the evaluation metrics from the trained AutoML classification model  
- Parse the metrics and render them in the Vertex Pipelines UI  
- Compare the metrics to a threshold to determine whether the model should be deployed  

In [20]:
@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

### Step 2: Adding Google Cloud pre-built components

In [None]:
import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

automl-beans1663093982


In [22]:
@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

### Step 3: Compile and run the end-to-end ML pipeline
a little over an hour to run

In [23]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

In [24]:
ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

In [25]:
ml_pipeline_job.submit()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/105288858048/locations/us-central1/pipelineJobs/automl-tab-beans-training-v2-20220913183332
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/105288858048/locations/us-central1/pipelineJobs/automl-tab-beans-training-v2-20220913183332')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/automl-tab-beans-training-v2-20220913183332?project=105288858048


#### More
View and analyze your Vertex Pipelines graph in the console

Refer to https://onedrive.live.com/redir?resid=838B6473B6EE9788%211476&page=Edit&wd=target%28GCommunity_202206.one%7C5c4c7ff9-c092-42dc-abfd-991b638f83fa%2FMLOps%20Codelab%20Intro%20to%20Vertex%20Pipelines%7C4b48ed42-71a1-43fb-bda7-81b5b80ed28e%2F%29&wdorigin=703  
Examples of:  
- pipeline (graph)  
- metrics -> confusion matrix  
- lineage

### Step 4: Comparing metrics across pipeline runs

In [26]:
pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

Unnamed: 0,pipeline_name,run_name,param.input:gcp_region,param.input:api_endpoint,param.input:thresholds_dict_str,param.input:project,param.input:display_name,param.input:bq_source
0,automl-tab-beans-training-v2,automl-tab-beans-training-v2-20220913183332,us-central1,us-central1-aiplatform.googleapis.com,"{""auRoc"": 0.95}",qwiklabs-gcp-02-3b82046ef315,automl-beans1663093982,bq://aju-dev-demos.beans.beans1


### Done
You've learned how to use Vertex AI to:

- Use the Kubeflow Pipelines SDK to build end-to-end pipelines with custom components  
- Run your pipelines on Vertex Pipelines and kick off pipeline runs with the SDK  
- View and analyze your Vertex Pipelines graph in the console  
- Use pre-built pipeline components to add Vertex AI services to your pipeline  
- Schedule recurring pipeline jobs