# E2E ML Vertex KFP Pipeline with AutoML

### Daataset
UCI Machine Learning [Dry beans dataset](https://archive.ics.uci.edu/dataset/602/dry+bean+dataset)

### Objectives:

- Create a Dataset in {{vertex_ai_name}}
- Train a tabular classification model with AutoML
- Get evaluation metrics on this model
- Based on the evaluation metrics, decide whether to deploy the model using conditional logic in Vertex Pipelines
- Deploy the model to an endpoint using Vertex Prediction

In [None]:
YOUR_NAME="your-name" # TO-DO: store your name in this var

In [None]:
!pip install --user google-cloud-aiplatform==1.50.0 kfp==2.7.0

In [None]:
!pip install --user google-cloud-pipeline-components --upgrade

In [None]:
import kfp

from kfp import compiler, dsl
from kfp.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from typing import NamedTuple

In [None]:
PROJECT_ID='mlops-421501'
BUCKET=f"gs://{PROJECT_ID}-{YOUR_NAME}-bucket"

In [None]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET}/pipeline_root/"
PIPELINE_ROOT

In [None]:
from google_cloud_pipeline_components import v1 as gcc_aip

### Build Custom Component for Model Evaluation


Function of the component:
- Get the evaluation metrics from the trained AutoML classification model
- Parse the metrics and render them in the Vertex Pipelines UI
- Compare the metrics to a threshold to determine whether the model should be deployed

In [None]:
@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

### Define Pipeline

In [None]:
import time
DISPLAY_NAME = '{}-automl-beans{}'.format(YOUR_NAME,str(int(time.time())))
print(DISPLAY_NAME)

In [None]:
PIPELINE_NAME= f"{YOUR_NAME}-automl-tab-beans-training"

In [None]:
@kfp.dsl.pipeline(name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str,
    DATASET_DISPLAY_NAME: str,
    TRAINING_DISPLAY_NAME: str,
    MODEL_DISPLAY_NAME: str,
    ENDPOINT_DISPLAY_NAME: str,
    MACHINE_TYPE: str,
    project: str,
    gcp_region: str,
    thresholds_dict_str: str,
):
    from google_cloud_pipeline_components.v1.automl.training_job import \
        AutoMLTabularTrainingJobRunOp
    from google_cloud_pipeline_components.v1.dataset.create_tabular_dataset.component import \
        tabular_dataset_create as TabularDatasetCreateOp
    from google_cloud_pipeline_components.v1.endpoint.create_endpoint.component import \
        endpoint_create as EndpointCreateOp
    from google_cloud_pipeline_components.v1.endpoint.deploy_model.component import \
        model_deploy as ModelDeployOp

    dataset_create_op = TabularDatasetCreateOp(
        project=project,
        location=gcp_region,
        display_name=DATASET_DISPLAY_NAME,
        bq_source=bq_source,
    ).set_display_name("Create Tabular Dataset")

    training_op = AutoMLTabularTrainingJobRunOp(
        project=project,
        location=gcp_region,
        display_name=TRAINING_DISPLAY_NAME,
        optimization_prediction_type="classification",
        optimization_objective="minimize-log-loss",
        budget_milli_node_hours=1000,
        model_display_name=MODEL_DISPLAY_NAME,
        column_specs={
            "Area": "numeric",
            "Perimeter": "numeric",
            "MajorAxisLength": "numeric",
            "MinorAxisLength": "numeric",
            "AspectRation": "numeric",
            "Eccentricity": "numeric",
            "ConvexArea": "numeric",
            "EquivDiameter": "numeric",
            "Extent": "numeric",
            "Solidity": "numeric",
            "roundness": "numeric",
            "Compactness": "numeric",
            "ShapeFactor1": "numeric",
            "ShapeFactor2": "numeric",
            "ShapeFactor3": "numeric",
            "ShapeFactor4": "numeric",
            "Class": "categorical",
        },
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    ).set_display_name("AutoML Training Job")

    model_eval_task = classification_model_eval_metrics(
        project=project,
        location="us-central1",
        thresholds_dict_str=thresholds_dict_str,
        model=training_op.outputs["model"],
        api_endpoint="us-central1-aiplatform.googleapis.com"
    ).set_display_name("Evaluation Metrics")

    with dsl.If(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name=ENDPOINT_DISPLAY_NAME,
        ).set_display_name("Create Endpoint")

        ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type=MACHINE_TYPE,
        ).set_display_name("Deploy to Endpoint")

### Compile Pipeline

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

### Submit Pipeline Job

In [None]:
bq_source="bq://aju-dev-demos.beans.beans1"
gcp_region= "us-central1"
api_endpoint="us-central1-aiplatform.googleapis.com"
thresholds_dict_str='{"auRoc": 0.95}'
DATASET_DISPLAY_NAME=f"{YOUR_NAME}_dataset_beans"
TRAINING_DISPLAY_NAME=f"{YOUR_NAME}_automl_training_beans"
MODEL_DISPLAY_NAME=f"{YOUR_NAME}_model_beans"
ENDPOINT_DISPLAY_NAME=f"{YOUR_NAME}_endpoint_beans"
MACHINE_TYPE="n1-standard-4"

In [None]:
ml_pipeline_job = aiplatform.PipelineJob(
    display_name=f"{YOUR_NAME}automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        "project": PROJECT_ID,
        "gcp_region": gcp_region,
        "bq_source": bq_source,
        "thresholds_dict_str": '{"auRoc": 0.95}',
        "DATASET_DISPLAY_NAME": DATASET_DISPLAY_NAME,
        "TRAINING_DISPLAY_NAME": TRAINING_DISPLAY_NAME,
        "MODEL_DISPLAY_NAME": MODEL_DISPLAY_NAME,
        "ENDPOINT_DISPLAY_NAME": ENDPOINT_DISPLAY_NAME,
        "MACHINE_TYPE": MACHINE_TYPE,
    },
    enable_caching=True
)

In [None]:
ml_pipeline_job.submit()