## 02 - Custom model training on Vertex Pipelines

This demo uses the Vertex AI, our end-to-end managed ML platform on Google Cloud. Vertex AI integrates Google's ML offerings across Google Cloud into a seamless development experience. In addition to model training and deployment services, Vertex AI also includes a variety of MLOps products, including Vertex Pipelines (the focus of this lab), Model Monitoring, Feature Store, and more. You can see all Vertex AI product offerings in the diagram below.

<img src="images/vertex-overview_1920.png"/>

#### Why are ML pipelines useful?

Before we dive in, let's first understand why you would want to use a pipeline. Imagine you're building out a ML workflow that includes processing data, training a model, hyperparameter tuning, evaluation, and model deployment. Each of these steps may have different dependencies, which may become unwieldy if you treat the entire workflow as a monolith. As you begin to scale your ML process, you might want to share your ML workflow with others on your team so they can run it and contribute code. Without a reliable, reproducible process, this can become difficult. With pipelines, each step in your ML process is its own container. This lets you develop steps independently and track the input and output from each step in a reproducible way. You can also schedule or trigger runs of your pipeline based on other events in your Cloud environment, like kicking off a pipeline run when new training data is available.

#### Vertex Pipelines setup

There are a few additional libraries we'll need to install in order to use Vertex Pipelines:

- __Kubeflow Pipelines__: This is the SDK we'll be using to build our pipeline. Vertex Pipelines supports running pipelines built with both Kubeflow Pipelines or TFX.
- __Google Cloud Pipeline Components__: This library provides pre-built components that make it easier to interact with Vertex AI services from your pipeline steps.

In [None]:
USER_FLAG = "--user"

In [None]:
# !pip3 install {USER_FLAG} google-cloud-aiplatform --upgrade

In [None]:
# !pip3 install {USER_FLAG} google-cloud-pipeline-components --upgrade

In [None]:
# !pip3 install {USER_FLAG} kfp --upgrade

In [None]:
'''
!pip3 install {USER_FLAG} google-cloud-aiplatform --upgrade
!pip3 install {USER_FLAG} kfp google-cloud-pipeline-components --upgrade
'''

In [None]:
'''
pip install --upgrade pip
'''

__Note:__ you may need to restart the kernel to use updated packages.

#### Set your project ID and bucket

In [None]:
PROJECT_ID = "ibnd-argls-cstmr-demos"
BUCKET_NAME="gs://ibnd-argls-ml-demos-storage/02_mlops_scikit_demo"

In [None]:
import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
# from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

In [None]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [None]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/{TIMESTAMP}/pipeline_root/"
PIPELINE_ROOT

#### Create a Python function based component
Using the KFP SDK, we can create components based on Python functions. We'll use that for the 3 components in our first pipeline. We'll first build the product_name component, which simply takes a string as input and returns that string.

In [None]:
@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

Let's take a closer look at the syntax here:

- The `@component` decorator compiles this function to a component when the pipeline is run. You'll use this anytime you write a custom component.
- The `base_image` parameter specifies the container image this component will use.
- The `output_component_file` parameter is optional, and specifies the yaml file to write the compiled component to. After running the cell you should see that file written to your notebook instance. If you wanted to share this component with someone, you could send them the generated yaml file and have them load it with the following:

In [None]:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')

- The `-> str` after the function definition specifies the output type for this component.

#### Create two additional components
To complete our pipeline, we'll create two more components. The first one we'll define takes a string as input, and converts this string to its corresponding emoji if there is one. It returns a tuple with the input text passed, and the resulting emoji:

In [None]:
@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

This component is a bit more complex than our previous one. Let's break down what's new:

- The `packages_to_install` paramater tells the component any external library dependencies for this container. In this case, we're using a library called emoji.
- This component returns a `NamedTuple` called `Outputs`. Notice that each of the strings in this tuple have keys: `emoji_text` and `emoji`. We'll use these in our next component to access the output.

The final component in this pipeline will consume the output of the first two and combine them to return a string:

In [None]:
@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

#### Putting the components together into a pipeline
The component definitions we defined above created factory functions that can be used in a pipeline definition to create steps. To set up a pipeline, use the `@pipeline` decorator, give the pipeline a name and description, and provide the root path where your pipeline's artifacts should be written. By artifacts, we mean any output files generated by your pipeline. This intro pipeline doesn't generate any, but our next pipeline will.

In the next block of code we define an `intro_pipeline` function. This is where we specify the inputs to our initial pipeline steps, and how steps connect to each other:

- `product_task` takes a product name as input. Here we're passing "Vertex Pipelines" but you can change this to whatever you'd like.
- `emoji_task` takes the text code for an emoji as input. You can also change this to whatever you'd like. For example, "party_face" refers to the 🥳 emoji. Note that since both this and the product_task component don't have any steps that feed input into them, we manually specify the input for these when we define our pipeline.
- The last step in our pipeline - `consumer_task` has three input parameters:
> - The output of `product_task`. Since this step only produces one output, we can reference it via `product_task.output`.
> - The `emoji` output of our `emoji_task` step. See the `emoji` component defined above where we named the output parameters.
> - Similarly, the `emoji_text` named output from the `emoji` component. In case our pipeline is passed text that doesn't correspond with an emoji, it'll use this text to construct a sentence.

In [None]:
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

#### Compile and run the pipeline
With your pipeline defined, you're ready to compile it. The following will generate a JSON file that you'll use to run the pipeline:

In [None]:
compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

Then define your pipeline job:

In [None]:
job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

Finally, run the job to create a new pipeline execution:

In [None]:
job.submit()

<img src="images/Screenshot_2024-02-01.png" />

## 02 - End-to-End ML Pipeline

In this pipeline, we'll use the UCI Machine Learning Dry beans dataset, from: KOKLU, M. and OZKAN, I.A., (2020), "Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques."In Computers and Electronics in Agriculture, 174, 105507. DOI.

This is a tabular dataset, and in our pipeline we'll use the dataset to train, evaluate, and deploy an AutoML model that classifies beans into one of 7 types based on their characteristics.

This pipeline will:

- Create a Dataset in Vertex AI
- Train a tabular classification model with AutoML
- Get evaluation metrics on this model
- Based on the evaluation metrics, decide whether to deploy the model using conditional logic in Vertex Pipelines
- Deploy the model to an endpoint using Vertex Prediction

Each of the steps outlined will be a component. Most of the pipeline steps will use pre-built components for Vertex AI services via the `google_cloud_pipeline_components` library we imported earlier in this codelab. In this section, we'll define one custom component first. Then, we'll define the rest of the pipeline steps using pre-built components. Pre-built components make it easier to access Vertex AI services, like model training and deployment.

#### A custom component for model evaluation
The custom component we'll define will be used towards the end of our pipeline once model training has completed. This component will do a few things:

- Get the evaluation metrics from the trained AutoML classification model
- Parse the metrics and render them in the Vertex Pipelines UI
- Compare the metrics to a threshold to determine whether the model should be deployed

Before we define the component, let's understand its input and output parameters. As input, this pipeline takes some metadata on our Cloud project, the resulting trained model (we'll define this component later), the model's evaluation metrics, and a `thresholds_dict_str`. The `thresholds_dict_str` is something we'll define when we run our pipeline. In the case of this classification model, this will be the area under the ROC curve value for which we should deploy the model. For example, if we pass in 0.95, that means we'd only like our pipeline to deploy the model if this metric is above 95%.

Our evaluation component returns a string indicating whether or not to deploy the model. Add the following in a notebook cell to create this custom component:

In [None]:
@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

#### Adding Google Cloud pre-built components
In this step we'll define the rest of our pipeline components and see how they all fit together. First, define the display name for your pipeline run using a timestamp:

In [None]:
import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

In [None]:
@kfp.dsl.pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://ibnd-argls-cstmr-demos.mlops_demo.beans",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

Let's see what's happening in this code:

- First, just as in our previous pipeline, we define the input parameters this pipeline takes. We need to set these manually since they don't depend on the output of other steps in the pipeline.
- The rest of the pipeline uses a few pre-built components for interacting with Vertex AI services:
> - `TabularDatasetCreateOp` creates a tabular dataset in Vertex AI given a dataset source either in Cloud Storage or BigQuery. In this pipeline, we're passing the data via a BigQuery table URL
> - `AutoMLTabularTrainingJobRunOp` kicks off an AutoML training job for a tabular dataset. We pass a few configuration parameters to this component, including the model type (in this case, classification), some data on the columns, how long we'd like to run training for, and a pointer to the dataset. Notice that to pass in the dataset to this component, we're providing the output of the _previous component_ via `dataset_create_op.outputs["dataset"]`
> - `EndpointCreateOp` creates an endpoint in Vertex AI. The endpoint created from this step will be passed as input to the next component
> - `ModelDeployOp` deploys a given model to an endpoint in Vertex AI. In this case, we're using the endpoint created from the previous step. There are additional configuration options available, but here we're providing the endpoint machine type and model we'd like to deploy. We're passing in the model by accessing the outputs of the training step in our pipeline
- This pipeline also makes use of conditional logic, a feature of Vertex Pipelines that lets you define a condition, along with different branches based on the result of that condition. Remember that when we defined our pipeline we passed a `thresholds_dict_str` parameter. This is the accuracy threshold we're using to determine whether to deploy our model to an endpoint. To implement this, we make use of the `Condition` class from the KFP SDK. The condition we pass in is the output of the custom eval component we defined earlier. If this condition is true, the pipeline will continue to execute the `deploy_op` component. If accuracy doesn't meet our predefined threshold, the pipeline will stop here and won't deploy a model.

#### Compile and run the end-to-end ML pipeline
With our full pipeline defined, it's time to compile it:

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

Next, define the job:

In [None]:
ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

And finally, run the job:

In [None]:
ml_pipeline_job.submit()

<img src="images/Screenshot_2024-02-01_12.14.02 _AM.png" />

#### Comparing metrics across pipeline runs
If you run this pipeline multiple times, you may want to compare metrics across runs. You can use the aiplatform.get_pipeline_df() method to access run metadata. Here, we'll get metadata for all runs of this pipeline and load it into a Pandas DataFrame:

In [None]:
pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df