In [1]:
USER_FLAG = "--user"
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.18.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.10 google-cloud-pipeline-components==1.0

Collecting google-cloud-aiplatform==1.18.0
  Downloading google_cloud_aiplatform-1.18.0-py2.py3-none-any.whl (2.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m26.2 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
Collecting protobuf<5.0.0dev,>=3.20.2
  Downloading protobuf-3.20.3-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m64.8 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: protobuf, google-cloud-aiplatform
[0mSuccessfully installed google-cloud-aiplatform-1.18.0 protobuf-3.20.3
Collecting kfp==1.8.10
  Downloading kfp-1.8.10.tar.gz (298 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m298.3/298.3 kB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting google-cloud-pipeline-components==1.0
  Downloading google_cloud_pipeline_components-1.0.0-py

In [1]:
#Restart Kernel after the packages are installed. 
import os
if not os.getenv("IS_TESTING"):
    import IPython
    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [1]:
#Importing the packages for pipelline creation
import kfp
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics
from typing import NamedTuple
from google_cloud_pipeline_components import aiplatform as gcc_aip
from google.cloud import aiplatform

In [2]:
#To set the project ID
import os
PROJECT_ID = ""
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID is set to : ", PROJECT_ID)
if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "vertex-ai-gcp-1"
    print("Project ID is set manually")

Project ID is set to :  vertex-ai-gcp-1


In [3]:
#Defining bucket to store the artifacts
bucket_name_arti="gs://" + PROJECT_ID + "-pipeline-automl-artifacts"
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"
pipeline_folder = f"{bucket_name_arti}/pipeline_automl/"
pipeline_folder

env: PATH=/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin


'gs://vertex-ai-gcp-1-pipeline-automl-artifacts/pipeline_automl/'

In [33]:
@component(base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-5:latest",output_component_file="model_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def image_classification_model_eval_metrics(
    project: str,
    location: str,
    api_endpoint: str,
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metrics_classification: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):

    import json
    import logging

    from google.cloud import aiplatform as aip

    #  fetch_eval_info function fetches the evaluation information from the trained model.  
    def fetch_eval_info(client_name, model_name):
        from google.protobuf.json_format import MessageToDict
        metrics_list_value = []
        metrics_list_string = []
        resp = client_name.list_model_evaluations(parent=model_name)

        for model_eval in resp:
            print("trained model evaluation")
            print("metric name:", model_eval.name)
            print(" metrics_schema_uri:", model_eval.metrics_schema_uri)
            model_metrics = MessageToDict(model_eval._pb.metrics)
            for metric in model_metrics.keys():logging.info("metric: %s, value: %s", metric, model_metrics[metric])
            metrics_list_value.append(model_metrics)
            metrics_list_string.append(json.dumps(model_metrics))

        return (model_eval.name,metrics_list_value,metrics_list_string)
#
    def metrics_log_check(metrics_list_value, metrics_classification,thresholds_dict_str):
        test_confusion_matrix = metrics_list_value[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        false_pos_rate = []
        true_pos_rate = []
        thresholds = []
        for item in metrics_list_value[0]["confidenceMetrics"]:
            false_pos_rate.append(item.get("falsePositiveRate", 0.0))
            true_pos_rate.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"false_pos_rate: {false_pos_rate}")
        print(f"true_pos_rate: {true_pos_rate}")
        print(f"thresholds: {thresholds}")
        metrics_classification.log_roc_curve(false_pos_rate, true_pos_rate, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metrics_classification.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list_value[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list_value[0][metric])
                metrics.log_metric(metric, val_string)
        
        thresholds_dict = json.loads(thresholds_dict_str)
        for key, value in thresholds_dict.items():
            logging.info("key {}, value {}".format(key, value))
            if key in ["auRoc", "auPrc"]:  # higher is better
                if metrics_list_value[0][key] < value:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_list_value[0][key], value))
                    return False
        logging.info("threshold checks passed.")
        return True
    
    #Calling all the functions

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    #To fetch the evaluation information for the specific models
    eval_name, metrics_list_value, metrics_str_list = fetch_eval_info(client, model_resource_path)
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list_value)
    
    #To log the confusion matrix.
    #log_metrics(metrics_list_value, metrics_classification,thresholds_dict_str)

    #thresholds_dict = json.loads(thresholds_dict_str)
    deploy = metrics_log_check(metrics_list_value, metrics_classification,thresholds_dict_str)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

In [34]:
DISPLAY_NAME = 'image_boat_classification'
@kfp.dsl.pipeline(name="image-classification",pipeline_root=pipeline_folder)
def pipeline(
    gcs_source: str = "gs://pipeline_automl/class_labels.csv",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auPrc": 0.60}',
):
    #First component
    dataset_create_op = gcc_aip.ImageDatasetCreateOp(project=project, display_name=display_name, gcs_source=gcs_source,import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification)
    
    #Second component
    training_op = gcc_aip.AutoMLImageTrainingJobRunOp(
        project=project,
        display_name=display_name,
        prediction_type="classification",
        budget_milli_node_hours=8000,
        dataset=dataset_create_op.outputs["dataset"],
    )
    #Third component
    model_eval_task = image_classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )
    
    
    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):
        #Fourth component is end point creation only if the condition is met
        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-vision",
        )
        #Fifth component of the pipelin is deploying model on the endpoint created. 
        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            automatic_resources_min_replica_count=1,
            automatic_resources_max_replica_count=1,
        )

In [35]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="image_classif_pipeline.json")

In [36]:
ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-image-training",
    template_path="image_classif_pipeline.json",
    pipeline_root=pipeline_folder,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

In [37]:
ml_pipeline_job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/953811166431/locations/us-central1/pipelineJobs/image-classification-20221029190409
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/953811166431/locations/us-central1/pipelineJobs/image-classification-20221029190409')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/image-classification-20221029190409?project=953811166431
