# Image Classification Pipeline

This is an example of a fairly sophisticated Vertex AI pipeline for training and deploying an image classification model.
The focus lies on training and hyperparameter tuning, and not as much on deployment strategies. 

It is based on Keras, can use any base model from tfhub.dev and be applied to any dataset following some minor conventions.

Running the whole pipeline **will incur costs** (**about 5€ per run with the current config**). Also, make sure to delete all created resources afterwards to avoid additional costs.


## 1. Setup

First we need to install all requirements and configure our environment properly.

### 1.1. Requirements

In [None]:
!pip install kfp==1.8.11
!pip instal google-cloud-aiplatform==1.9.0
!pip install google-cloud-pipeline-components==0.2.6
!pip install tensorflow==2.8.0
!pip install scikit-learn==1.0.2
!pip install cloudml-hypertune==0.1.0.dev6
!pip install tensorflow_hub==0.12.0


### 1.2. Configuration

In [None]:
PROJECT = "YOUR_PROJECT_ID"
LOCATION = "YOUR_REGION"
SERVICE_ACCOUNT_NAME = "vertex-pipeline-sa"

CONTAINER_REGISTRY_BASE = f"eu.gcr.io/{PROJECT}/mlpipelines/vertex"

ARTIFACTS_ROOT_DIR = f"gs://{PROJECT}_vertex-ai-pipelines_artifacts"
COMPILED_JOBS_DIR = f"gs://{PROJECT}_vertex-ai-pipelines_compiled-jobs"
DATA_ROOT_DIR = f"gs://{PROJECT}-data"
MODEL_ROOT_DIR = f"gs://{PROJECT}-model"
TENSORBOARD_LOGS_DIR = f"gs://{PROJECT}-tensorboard-logs"

In [None]:
SERVICE_ACCOUNT = f"{SERVICE_ACCOUNT_NAME}@{PROJECT}.iam.gserviceaccount.com"

In [None]:
TRAINING_IMAGE = f"{CONTAINER_REGISTRY_BASE}/components/training:latest"
SERVING_IMAGE = "europe-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-7:latest"

### 1.3. Setup GCP Infrastructure

Based on the above configurations, we setup the GCP infrastructure. This is for demonstration purposes only. In practice you should consider setting this up with a tool like terraform to make sure this setup is easily created and destroyed in a consistent and repeatable way.

If you are on Colab, this will ask you for a login automatically. If you run this notebook on a Vertex workbench notebook, please configure gcloud beforehand.

We are setting up:

- activation of cloudbuild and aiplatform apis
- a service account to run the pipeline with on Vertex AI
- we add IAM roles for the service account
- we create different GCS buckets for pipeline artifacts, compiled pipeline jobs, data, pretrained models and tensorboard logs

In [None]:
import sys
if "google.colab" in sys.modules:
    from google.colab import auth
    auth.authenticate_user()
else:
    print("Run `gcloud auth login` in your terminal to authenticate")

In [None]:
if "google.colab" in sys.modules:
    !gcloud config set project $PROJECT


In [None]:
!gcloud services enable aiplatform.googleapis.com
!gcloud services enable cloudbuild.googleapis.com
!gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
!gcloud projects add-iam-policy-binding $PROJECT \
    --member serviceAccount:$SERVICE_ACCOUNT \
    --role=roles/aiplatform.admin
!gcloud projects add-iam-policy-binding $PROJECT \
    --member serviceAccount:$SERVICE_ACCOUNT \
    --role=roles/storage.admin
!gcloud projects add-iam-policy-binding $PROJECT \
    --member serviceAccount:$SERVICE_ACCOUNT \
    --role=roles/iam.serviceAccountUser
!gsutil mb $ARTIFACTS_ROOT_DIR
!gsutil mb $COMPILED_JOBS_DIR
!gsutil mb $DATA_ROOT_DIR
!gsutil mb $MODEL_ROOT_DIR
!gsutil mb $TENSORBOARD_LOGS_DIR

### 1.4. Setup gcsfuse

To more easily work in colab when it comes to interacting with GCS, we configure gcsfuse. In Colab be install gcsfuse first. On a Vertex AI workbench notebook, this will already be done, so we only configure a dynamic mount.
This gives you access to all gcs buckets you have permissions for. 

In [None]:
if "google.colab" in sys.modules:
    !echo "deb http://packages.cloud.google.com/apt gcsfuse-bionic main" > /etc/apt/sources.list.d/gcsfuse.list
    !curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add -
    !apt -qq update
    !apt -qq install gcsfuse 

In [None]:
!mkdir gcs
!gcsfuse --debug_gcs --debug_fuse --implicit-dirs gcs

## 2. Pipeline Preparation

Let's use the well known flower dataset as an example. 
Here we download the data locally, extract it and copy it to gcs. We use gsutil, because it is faster for copying large amounts of data compared to relying on gcsfuse here.

This dataset follows the exact convention this pipeline relies on. Under its root directory it has exactly one subdirectory per class, containing the respective images.

### 2.1. Prepare Data

In [None]:
from pathlib import Path

In [None]:
DATA_DIR_LOCAL = "data/flower_data"
DATA_DIR = f"{DATA_ROOT_DIR}/flower_data"

In [None]:
Path(DATA_DIR_LOCAL).mkdir(exist_ok=True)
Path(DATA_DIR.replace("gs://", "gcs/")).mkdir(exist_ok=True)

In [None]:
import urllib
DOWNLOAD_URL = 'http://download.tensorflow.org/example_images/flower_photos.tgz'
urllib.request.urlretrieve(DOWNLOAD_URL, f"{DATA_DIR_LOCAL}/flower_photos.tgz")
!tar xfz $DATA_DIR_LOCAL/flower_photos.tgz -C $DATA_DIR_LOCAL

In [None]:
!gsutil -m cp -r $DATA_DIR_LOCAL $DATA_DIR

### 2.2 Define Training Script

Now we can move towards the more interesting part. 
The goal of this section is to create a Docker image containing our training script, so we can use it later on. 

The name of the image is already defined in our configuration above. Here we create a training directory with which contains the source code for training, a Dockerfile and a requirements.txt file. 

The training script, together with the utils file, load the training dataset, splits it into train and validation sets, creates a model, configures Keras callbacks for writing logs, checkpoints and propagating your metrics to Vertex AI Hyperparameter-Tuning jobs.

Note also the environment variables, which the training script relies upon. Those are provided by Vertex training jobs or the hyperparameter tuning jobs respectively.

In [None]:
!mkdir -p training/src

In [None]:
%%writefile training/src/utils.py
import os
import pickle  # nosec
import shutil
from pathlib import Path

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2
from tensorflow.keras.layers import Dense, Dropout, Input, Resizing
from tensorflow.keras.models import Model

import tensorflow_hub as hub

import hypertune


class HyperTuneCallback(tf.keras.callbacks.Callback):
    def __init__(self, metric=None) -> None:
        super().__init__()
        self.metric = metric
        self.hpt = hypertune.HyperTune()

    def on_epoch_end(self, epoch, logs=None):
        if logs and self.metric in logs:
            self.hpt.report_hyperparameter_tuning_metric(
                hyperparameter_metric_tag=self.metric,
                metric_value=logs[self.metric],
                global_step=epoch,
            )


def prepare_dataset(
    train_dataset_uri, compression, shuffle, shuffle_buffer, seed, batch_size, val_size
):
    with open(
        os.path.join(train_dataset_uri.replace("gs://", "/gcs/"), "element_spec.pickle"), "rb"
    ) as fh:
        element_spec = pickle.load(fh)  # nosec

    dataset = tf.data.experimental.load(
        train_dataset_uri, element_spec=element_spec, compression=compression
    )

    if shuffle:
        dataset = dataset.shuffle(shuffle_buffer, seed=seed)

    dataset_size = dataset.cardinality().numpy()
    num_train_samples = (1.0 - val_size) * dataset_size

    train_data, val_data = dataset.take(num_train_samples), dataset.skip(num_train_samples)

    if shuffle:
        train_data = train_data.shuffle(shuffle_buffer, seed=seed)
        val_data = val_data.shuffle(shuffle_buffer, seed=seed)

    train_data = train_data.batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)
    val_data = val_data.batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)
    return train_data, val_data


def create_model(
    image_size, num_classes, base_model_dir, num_neurons, 
    dropout, activation, learning_rate
):
    inputs = Input(shape=(None, None, 3))
    x = Resizing(*image_size)(inputs)

    base_model = None
    if base_model_dir.startswith("gs://"):
        base_model = tf.keras.models.load_model(base_model_dir.replace("gs://", "/gcs/"))

    if base_model_dir.startswith("https://tfhub.dev"):
        base_model = hub.KerasLayer(
          base_model_dir, 
          trainable=False, 
          input_shape=(*image_size, 3)
        )

    if not base_model:
        base_model = MobileNetV2(weights="imagenet", input_tensor=x)
        base_model.trainable = False
        outputs = base_model.layers[-2].output
    else:
        outputs = base_model(x)

    model = Model(inputs=inputs, outputs=outputs)

    outputs = Dense(num_neurons, activation=activation)(model.output)
    outputs = Dropout(dropout)(outputs)
    outputs = Dense(num_classes, activation="softmax")(outputs)
    model = Model(inputs=model.input, outputs=outputs)

    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss="sparse_categorical_crossentropy", metrics=["accuracy"])
    return model


def configure_keras_callbacks(
    tensorboard_log_dir,
    tensorboard_kwargs,
    checkpoints_dir,
    checkpoint_kwargs,
    hypertune,
    hypertune_kwargs,
):
    callbacks = []

    if hypertune:
        callbacks.append(HyperTuneCallback(**hypertune_kwargs))

    if tensorboard_log_dir:
        fuse_path = tensorboard_log_dir.replace("gs://", "/gcs/")
        if Path(fuse_path).exists():
            shutil.rmtree(fuse_path)
        callbacks.append(
            tf.keras.callbacks.TensorBoard(log_dir=tensorboard_log_dir, **tensorboard_kwargs)
        )
    if checkpoints_dir:
        callbacks.append(tf.keras.callbacks.ModelCheckpoint(checkpoints_dir, **checkpoint_kwargs))

    return callbacks


In [None]:
%%writefile training/src/trainer.py

import argparse
import json
import os

from utils import create_model, prepare_dataset, configure_keras_callbacks
import tensorflow as tf


def get_args():
    """Parses args. Must include all hyperparameters you want to tune."""

    def custom_int(value):
        return int(float(value))

    parser = argparse.ArgumentParser()
    parser.add_argument("--train-dataset-uri", type=str)
    parser.add_argument("--num-classes", type=custom_int)
    parser.add_argument("--compression", type=str, default="NONE")
    parser.add_argument("--base-model-dir", type=str, default=None)
    parser.add_argument("--val-size", type=float, default=0.2)
    parser.add_argument("--image-size", type=json.loads, default=[124, 124])
    parser.add_argument("--epochs", type=custom_int, default=1)
    parser.add_argument("--steps-per-epoch", type=custom_int, default=100)
    parser.add_argument("--validation-steps", type=custom_int, default=100)
    parser.add_argument("--batch-size", type=custom_int, default=1)
    parser.add_argument("--shuffle", type=bool, default=True)
    parser.add_argument("--shuffle-buffer", type=custom_int, default=64)
    parser.add_argument("--seed", type=custom_int, default=None)
    parser.add_argument("--save-checkpoints", type=bool, default=False)
    parser.add_argument("--checkpoint-kwargs", type=json.loads, default={})
    parser.add_argument("--tensorboard-log-root", type=str, default=None)
    parser.add_argument("--tensorboard-kwargs", type=json.loads, default={})

    parser.add_argument("--hypertune", type=bool, default=False)

    # tunable parameters
    parser.add_argument("--num-neurons", type=custom_int, default=64)
    parser.add_argument("--dropout", type=float, default=0.5)
    parser.add_argument("--activation", type=str, default="relu")
    parser.add_argument("--learning-rate", type=float, default=0.001)

    args = parser.parse_args()
    return args


if __name__ == "__main__":
    args = get_args()

    print(args)
    print(os.environ)

    if args.tensorboard_log_root:
        job_subdir = os.environ.get("CLOUD_ML_JOB_ID", "")
        trial_subdir = os.environ.get("CLOUD_ML_TRIAL_ID", "")
        tensorboard_log_dir = os.path.join(
            args.tensorboard_log_root, job_subdir, trial_subdir
        ).rstrip("/")
    else:
        tensorboard_log_dir = None

    checkpoints_dir = os.environ.get("AIP_CHECKPOINT_DIR") if args.save_checkpoints else None

    trained_model_dir = os.environ.get("AIP_MODEL_DIR")

    print(tensorboard_log_dir, checkpoints_dir, trained_model_dir)

    model = create_model(
        image_size=args.image_size,
        num_classes=args.num_classes,
        base_model_dir=args.base_model_dir,
        num_neurons=args.num_neurons,
        dropout=args.dropout,
        activation=args.activation,
        learning_rate=args.learning_rate,
    )

    callbacks = configure_keras_callbacks(
        tensorboard_log_dir=tensorboard_log_dir,
        tensorboard_kwargs=args.tensorboard_kwargs,
        checkpoints_dir=checkpoints_dir,
        checkpoint_kwargs=args.checkpoint_kwargs,
        hypertune=args.hypertune,
        hypertune_kwargs=dict(metric=os.environ.get("CLOUD_ML_HP_METRIC_TAG")),
    )

    train_data, val_data = prepare_dataset(
        train_dataset_uri=args.train_dataset_uri,
        compression=args.compression,
        shuffle=args.shuffle,
        shuffle_buffer=args.shuffle_buffer,
        seed=args.seed,
        batch_size=args.batch_size,
        val_size=args.val_size,
    )

    model.fit(
        train_data,
        epochs=args.epochs,
        steps_per_epoch=args.steps_per_epoch,
        validation_data=val_data,
        validation_steps=args.validation_steps,
        callbacks=callbacks,
    )

    metric_values = model.evaluate(val_data)

    metrics = {k: v for k, v in zip(model.metrics_names, metric_values)}

    metadata = dict(framework=f"Tensorflow {tf.__version__}")

    model.save(trained_model_dir)

    trained_model_dir_fuse = trained_model_dir.replace("gs://", "/gcs/")

    with open(os.path.join(trained_model_dir_fuse, "metrics.json"), "w") as fh:
        json.dump(metrics, fh)
    with open(os.path.join(trained_model_dir_fuse, "metadata.json"), "w") as fh:
        json.dump(metadata, fh)


In [None]:
%%writefile training/requirements.txt
tensorflow==2.8
cloudml-hypertune==0.1.0.dev6
tensorflow_hub==0.12.0

In [None]:
%%writefile training/Dockerfile
FROM python:3.8-slim

COPY requirements.txt .

RUN pip install -r requirements.txt --quiet --no-cache-dir 

COPY src /src

Now let's build and push this image to the container registry.

In [None]:
!gcloud builds submit training -t "{CONTAINER_REGISTRY_BASE}/components/training:latest"

### 2.2. Define Components

Finally, we can define the Kubeflow pipeline components.

In [None]:
from typing import NamedTuple

from kfp.v2 import dsl

from kfp.v2.dsl import Metrics, Model, Output, Artifact, Input, Dataset, ClassificationMetrics

#### 2.2.1 Reusable Components

We start with a few generally reusable components I created.
You can easily use them as they are in other pipelines.

##### Get Worker Pool Spec

This component takes a workerpool spec and adds arguments and environment variables to it, which can be provided as dicts.

In [None]:
@dsl.component(base_image="python:3.8-slim")
def GetWorkerPoolSpecsOp(
    worker_pool_specs: list,
    args: dict = {},
    hyperparams: dict = {},
    env: dict = {},
) -> list:

    for spec in worker_pool_specs:
        if "args" not in spec["container_spec"]:
            spec["container_spec"]["args"] = []
        for k, v in args.items():
            spec["container_spec"]["args"].append(f"--{k.replace('_', '-')}={v}")
        for k, v in hyperparams.items():
            spec["container_spec"]["args"].append(f"--{k.replace('_', '-')}={v}")

        if env:
            if "env" not in spec["container_spec"]:
                spec["container_spec"]["env"] = []
            for k, v in env.items():
                spec["container_spec"]["env"].append(dict(name=k, value=v))

    return worker_pool_specs

##### Get Custom Job Results

This one takes a CustomJob resource, get all the results that job has written to gcs, moves them to the artifacts bucket and outputs a model and a metric artifact.

This only assumes that your training writes a metadata.json and a metrics.json file into the same directory you have written your model to in the training script. 

In [None]:
@dsl.component(
    base_image="python:3.8-slim",
    packages_to_install=[
        "google-cloud-pipeline-components==0.2.6",
        "google-cloud-aiplatform==1.9.0",
    ],
)
def GetCustomJobResultsOp(
    project: str,
    location: str,
    job_resource: str,
    model: Output[Model],
    metrics: Output[Metrics],
):
    import json
    import shutil
    from pathlib import Path
    import google.cloud.aiplatform as aip
    from google.protobuf.json_format import Parse
    from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources

    aip.init(project=project, location=location)

    training_gcp_resources = Parse(job_resource, GcpResources())
    custom_job_id = training_gcp_resources.resources[0].resource_uri
    custom_job_name = custom_job_id[custom_job_id.find("project"):]

    job_resource = aip.CustomJob.get(custom_job_name).gca_resource
    job_base_dir = job_resource.job_spec.base_output_directory.output_uri_prefix

    job_base_dir_fuse = job_base_dir.replace("gs://", "/gcs/")
    model_uri_fuse = model.uri.replace("gs://", "/gcs/")

    shutil.copytree(job_base_dir_fuse, Path(model_uri_fuse).parent, dirs_exist_ok=True)
    shutil.rmtree(job_base_dir_fuse)

    with open(f"{model_uri_fuse}/metadata.json") as fh:
        model_metadata = json.load(fh)

    with open(f"{model_uri_fuse}/metrics.json") as fh:
        metrics_dict = json.load(fh)

    for k, v in metrics_dict.items():
        metrics.log_metric(k, v)

    model.metadata = model_metadata

##### Get Hyperparameter-Tuning Job Results

This component does the corresponding thing (to the component above) for the hyperparameter tuning job.
It does however output a dict of the best hyperparameters based on the study spec metrics provided. 

In [None]:
@dsl.component(
    base_image="python:3.8-slim",
    packages_to_install=[
        "google-cloud-pipeline-components==0.2.6",
        "google-cloud-aiplatform==1.9.0",
    ],
)
def GetHyperparameterTuningJobResultsOp(
    project: str, location: str, job_resource: str, study_spec_metrics: list
) -> dict:
    import google.cloud.aiplatform as aip
    from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources
    from google.protobuf.json_format import Parse
    from google.cloud.aiplatform_v1.types import study

    aip.init(project=project, location=location)

    gcp_resources_proto = Parse(job_resource, GcpResources())
    tuning_job_id = gcp_resources_proto.resources[0].resource_uri
    tuning_job_name = tuning_job_id[tuning_job_id.find("project"):]

    job_resource = aip.HyperparameterTuningJob.get(tuning_job_name).gca_resource

    trials = job_resource.trials

    if len(study_spec_metrics) > 1:
        raise RuntimeError(
            "Unable to determine best parameters for multi-objective hyperparameter tuning."
        )

    goal = study_spec_metrics[0]["goal"]
    if goal == study.StudySpec.MetricSpec.GoalType.MAXIMIZE:
        best_fn = max
    elif goal == study.StudySpec.MetricSpec.GoalType.MINIMIZE:
        best_fn = min
    best_trial = best_fn(trials, key=lambda trial: trial.final_measurement.metrics[0].value)

    return {p.parameter_id: p.value for p in best_trial.parameters}

#### Add Servig Config

This component takes a general model artifact and adds a provided serving config to the metadata. This is required, because the model upload component provided by google expects this information there.  

In [None]:
@dsl.component(base_image="python:3.8-slim")
def AddServingConfigOp(
    trained_model: Input[Model],
    configured_model: Output[Artifact],
    serving_config: dict,
):
    configured_model.uri = trained_model.uri
    configured_model.metadata = trained_model.metadata
    configured_model.metadata.update(serving_config)

#### 2.2.2. Pipeline Specific Components

We can continue with the pipeline specific components from here. If you want to adapt this pipeline for other machine learning problems or add particular preprocessing steps, those are the one you have to replace. Next to the training script of course.

##### Data Processing

Start by writing a preprocessing component, which takes the dataset, applies some processing, splits it and outputs a training and test dataset. 

In [None]:
@dsl.component(base_image="tensorflow/tensorflow:2.8.0")
def PreprocessingOp(
    dataset: Input[Dataset],
    train_dataset: Output[Dataset],
    test_dataset: Output[Dataset],
    compression: str = "GZIP",
    test_size: float = 0.1,
    data_load_kwargs: dict = {},
    seed: int = None,
) -> NamedTuple("Outputs", [("train_dataset_uri", str), ("test_dataset_uri", str)]):
    import logging
    import os
    import pickle  # nosec
    from collections import namedtuple

    import tensorflow as tf

    logging.info("Start Processing.")

    data_dir_fuse = dataset.uri.replace("gs://", "/gcs/")

    train_dataset_dir_fuse = train_dataset.uri.replace("gs://", "/gcs/")
    test_dataset_dir_fuse = test_dataset.uri.replace("gs://", "/gcs/")

    train_data = tf.keras.utils.image_dataset_from_directory(
        data_dir_fuse, subset="training", validation_split=test_size, seed=seed, **data_load_kwargs
    )
    test_data = tf.keras.utils.image_dataset_from_directory(
        data_dir_fuse,
        subset="validation",
        validation_split=test_size,
        seed=seed,
        **data_load_kwargs
    )

    train_dataset.metadata["classes"] = train_data.class_names
    test_dataset.metadata["classes"] = test_data.class_names

    train_data = train_data.map(lambda x, y: (x, y), num_parallel_calls=tf.data.AUTOTUNE).unbatch()
    test_data = test_data.map(lambda x, y: (x, y), num_parallel_calls=tf.data.AUTOTUNE).unbatch()

    tf.data.experimental.save(dataset=train_data, path=train_dataset.uri, compression=compression)
    with open(os.path.join(train_dataset_dir_fuse, "element_spec.pickle"), "wb") as fh:
        pickle.dump(train_data.element_spec, fh)  # nosec

    tf.data.experimental.save(dataset=test_data, path=test_dataset.uri, compression=compression)
    with open(os.path.join(test_dataset_dir_fuse, "element_spec.pickle"), "wb") as fh:
        pickle.dump(test_data.element_spec, fh)  # nosec

    logging.info("Finished Processing.")

    output = namedtuple("Outputs", ["train_dataset_uri", "test_dataset_uri"])
    return output(train_dataset_uri=train_dataset.uri, test_dataset_uri=test_dataset.uri)


##### Evaluation

In the evaluation component, we want to take the test dataset, the trained model and output some metrics. 
This example shows how to compute some standard classification metrics and creates a confusion matrix. 

Finally it also outputs a string "true"/"false" (string because kubeflow requires strings for conditionals), which represents the decision whether or not to upload the model to the vertex model registry. This decision is made based on thresholds defined for different metrics and provided as a dict to this component. 

In [None]:
@dsl.component(base_image="tensorflow/tensorflow:2.8.0", packages_to_install=["scikit-learn==1.0.2"])
def EvaluateOp(
    test_dataset: Input[Dataset],
    trained_model: Input[Model],
    metrics: Output[Metrics],
    confusion_matrix: Output[ClassificationMetrics],
    batch_size: int = 16,
    compression: str = "GZIP",
    upload_thresholds: dict = {},
) -> NamedTuple("Outputs", [("upload_decision", str)]):
    import logging
    import os
    import pickle  # nosec
    from collections import namedtuple

    import numpy as np
    import tensorflow as tf
    from sklearn.metrics import accuracy_score
    from sklearn.metrics import confusion_matrix as conf_mat
    from sklearn.metrics import f1_score, precision_score, recall_score

    logging.info("Starting Evaluation.")

    model = tf.keras.models.load_model(trained_model.uri.replace("gs://", "/gcs/"))

    with open(
        os.path.join(test_dataset.uri.replace("gs://", "/gcs/"), "element_spec.pickle"), "rb"
    ) as fh:
        element_spec = pickle.load(fh)  # nosec

    dataset = tf.data.experimental.load(
        test_dataset.uri, element_spec=element_spec, compression=compression
    )

    test_data = dataset.batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)

    y_pred_probs = model.predict(test_data)
    y_pred = y_pred_probs.argmax(axis=1)
    y_true = np.concatenate([y for _, y in test_data], axis=0)

    metrics.log_metric("accuracy", accuracy_score(y_true=y_true, y_pred=y_pred))
    metrics.log_metric("precision", precision_score(y_true=y_true, y_pred=y_pred, average="macro"))
    metrics.log_metric("recall", recall_score(y_true=y_true, y_pred=y_pred, average="macro"))
    metrics.log_metric("f1", f1_score(y_true=y_true, y_pred=y_pred, average="macro"))

    confusion = conf_mat(y_true=y_true, y_pred=y_pred)

    confusion_matrix.log_confusion_matrix(
        categories=test_dataset.metadata["classes"], matrix=confusion.tolist()
    )

    logging.info("Evaluation completed.")

    decision = True
    for metric, value in metrics.metadata.items():
        required = upload_thresholds.get(metric, 0)
        if value < required:
            decision = False

    output = namedtuple("Outputs", ["upload_decision"])
    return output(upload_decision="true" if decision else "false")


##### Training Args

This is a minor component that is required because kubeflow has a limitation when it comes to providing dict inputs to components, which contain pipeline parameters (also outputs form previous components). So we create a component that takes all the training arguments, and returns them in a dict. Note that dicts in the dict need to be serialized explicitly.

In [None]:
@dsl.component(base_image="python:3.8-slim")
def GetTrainingArgsDictOp(
    base_model_dir: str,
    train_dataset: Input[Dataset],
    compression: str,
    val_size: float,
    image_size: list,
    epochs: int,
    steps_per_epoch: int,
    validation_steps: int,
    batch_size: int,
    shuffle: bool,
    shuffle_buffer: int,
    seed: int,
    save_checkpoints: bool,
    checkpoint_kwargs: dict,
    tensorboard_log_root: str,
    tensorboard_kwargs: dict,
    hypertune: bool,
) -> dict:
    import json

    train_dataset_uri = train_dataset.uri
    num_classes = len(train_dataset.metadata["classes"])

    return dict(
        base_model_dir=base_model_dir,
        train_dataset_uri=train_dataset_uri,
        num_classes=num_classes,
        compression=compression,
        val_size=val_size,
        image_size=image_size,
        epochs=epochs,
        steps_per_epoch=steps_per_epoch,
        validation_steps=validation_steps,
        batch_size=batch_size,
        shuffle=shuffle,
        shuffle_buffer=shuffle_buffer,
        seed=seed,
        save_checkpoints=save_checkpoints,
        checkpoint_kwargs=json.dumps(checkpoint_kwargs),
        tensorboard_log_root=tensorboard_log_root,
        tensorboard_kwargs=json.dumps(tensorboard_kwargs),
        hypertune=hypertune,
    )


## 3. Pipeline Definition

Finally we can compose the created components into a pipeline. 

In [None]:
from google_cloud_pipeline_components.experimental import custom_job
from google_cloud_pipeline_components.experimental import hyperparameter_tuning_job
import google_cloud_pipeline_components.aiplatform as gcc_aip
from google.cloud.aiplatform import hyperparameter_tuning as hpt

In [None]:
# define the workerpool spec for the custom jobs 
# (https://cloud.google.com/vertex-ai/docs/reference/rest/v1/CustomJobSpec)
WORKER_POOL_SPECS = [
    dict(
        machine_spec=dict(
            machine_type="n1-standard-4",
        ),
        replica_count=1,
        container_spec=dict(
            image_uri=TRAINING_IMAGE,
            command=["python", "/src/trainer.py"],
        ),
    )
]

# define the metric spec for hyperparameter tuning
# for details: https://cloud.google.com/vertex-ai/docs/reference/rest/v1/StudySpec#MetricSpec
METRIC_SPEC = dict(val_accuracy="maximize")

# define the parameter specs for tuning
# for details: https://cloud.google.com/vertex-ai/docs/reference/rest/v1/StudySpec#ParameterSpec
PARAMETER_SPEC = {
    "learning-rate": hpt.DoubleParameterSpec(min=0.0001, max=1, scale="log"),
    "activation": hpt.CategoricalParameterSpec(values=["swish", "relu", "elu"]),
    "num-neurons": hpt.DiscreteParameterSpec(values=[64, 128, 512], scale=None),
    "dropout": hpt.DiscreteParameterSpec(values=[0.3, 0.4, 0.5, 0.6], scale=None),
}

# create the serving config
# detals: https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/types/artifact_types.py
# container spec: https://cloud.google.com/vertex-ai/docs/reference/rest/v1/ModelContainerSpec
# predict schema: https://cloud.google.com/vertex-ai/docs/reference/rest/v1/PredictSchemata
SERVING_CONFIG = dict(containerSpec=dict(imageUri=SERVING_IMAGE))


PIPELINE_NAME = "classifier-tuning-training-pipeline"


@dsl.pipeline(
    name=PIPELINE_NAME,
    description="A training pipeline an image classifier.",
)
def pipeline(
    project: str = PROJECT,
    location: str = LOCATION,
    base_model_dir: str = MODEL_ROOT_DIR,
    dataset_dir: str = DATA_ROOT_DIR,
    compression: str = "GZIP",
    deploy: str = "false",
    seed: int = 1,
):

    # loads dataset
    import_dataset_step = dsl.importer(
        artifact_uri=dataset_dir,
        artifact_class=dsl.Dataset,
        reimport=False,
    ).set_display_name("Load-Dataset")

    # preprocesses data
    preprocessing_step = PreprocessingOp(
        dataset=import_dataset_step.output,
        compression=compression,
        test_size=0.1,
        data_load_kwargs=dict(
            labels="inferred",
            label_mode="int",
            color_mode="rgb",
            image_size=[224, 224],
            batch_size=6,
            shuffle=True,
        ),
        seed=seed,
    ).set_display_name("Process-Data")

    # define training args for tuning (hypertune = True and e.g. less epochs)
    args = dict(
        base_model_dir=base_model_dir,
        train_dataset=preprocessing_step.outputs["train_dataset"],
        compression=compression,
        val_size=0.2,
        image_size=[224, 224],
        epochs=5,
        steps_per_epoch=100,
        validation_steps=100,
        batch_size=3,
        shuffle=True,
        shuffle_buffer=64,
        seed=seed,
        save_checkpoints=True,
        checkpoint_kwargs=dict(monitor="val_accuracy", mode="max", save_best_only=True),
        tensorboard_log_root=f"{TENSORBOARD_LOGS_DIR}/{PIPELINE_NAME}",
        tensorboard_kwargs=dict(),
        hypertune=True,
    )

    # create the args dict to pass to next component
    hypertune_args_step = GetTrainingArgsDictOp(**args).set_display_name("Get-Hypertune-Args")

    # create the workerpool spec for hyperparameter tuning
    # dont provide hyperparams, because they are defined in the PARAMETER_SPEC
    # and directly passed to the hyperparameter tuning job
    hypertune_worker_pool_specs_step = GetWorkerPoolSpecsOp(
        worker_pool_specs=WORKER_POOL_SPECS,
        args=hypertune_args_step.output,
    ).set_display_name("Get-Hypertune-Worker-Pool-Spec")

    # create the actual hyperparameter tuning job
    # here you can choose how many trials to do and how many to run in parallel
    hypertune_step = hyperparameter_tuning_job.HyperparameterTuningJobRunOp(
        display_name="hypertune-job",
        project=project,
        location=location,
        service_account=SERVICE_ACCOUNT,
        worker_pool_specs=hypertune_worker_pool_specs_step.output,
        study_spec_metrics=hyperparameter_tuning_job.utils.serialize_metrics(METRIC_SPEC),
        study_spec_parameters=hyperparameter_tuning_job.utils.serialize_parameters(PARAMETER_SPEC),
        max_trial_count=6,
        parallel_trial_count=2,
        base_output_directory=f"{ARTIFACTS_ROOT_DIR}/{PIPELINE_NAME}/hypertune-job"
    ).set_display_name("Hypertune-Job")

    # now we can extract the results of the hyperparameter tuning job
    hypertune_results_step = GetHyperparameterTuningJobResultsOp(
        project=project,
        location=location,
        job_resource=hypertune_step.output,
        study_spec_metrics=hyperparameter_tuning_job.utils.serialize_metrics(METRIC_SPEC),
    ).set_display_name("Get-Hypertune-Results")

    # update our args dict for training
    args.update(dict(hypertune=False))

    # create the args dict again
    training_args_step = GetTrainingArgsDictOp(**args).set_display_name("Get-Training-Args")

    # create the workerpool spec for final training
    # this time we provide the hyperparams form the tuning job results
    training_worker_pool_specs_step = GetWorkerPoolSpecsOp(
        worker_pool_specs=WORKER_POOL_SPECS,
        hyperparams=hypertune_results_step.output,
        args=training_args_step.output,
    ).set_display_name("Get-Training-Worker-Pool-Spec")

    # here we run the final training job
    training_step = custom_job.CustomTrainingJobOp(
        display_name="training-job",
        project=project,
        location=location,
        service_account=SERVICE_ACCOUNT,
        worker_pool_specs=training_worker_pool_specs_step.output,
        base_output_directory=f"{ARTIFACTS_ROOT_DIR}/{PIPELINE_NAME}/training-job",
        labels=dict(),
    ).set_display_name("Training-Job")

    # now we can extract the training results
    training_results_step = GetCustomJobResultsOp(
        project=project, location=location, job_resource=training_step.output
    ).set_display_name("Get-Training-Results")

    # we evaluate the model performance
    # prove thresholds for all the metric you compute in the evaluation component
    evaluation_step = EvaluateOp(
        test_dataset=preprocessing_step.outputs["test_dataset"],
        trained_model=training_results_step.outputs["model"],
        compression=compression,
        batch_size=2,
        upload_thresholds=dict(accuracy=0.90, precision=0.90, recall=0.90),
    ).set_display_name("Evaluation")

    # the result of the evaluation is a decision about model uploading
    with dsl.Condition(evaluation_step.outputs["upload_decision"] == "true"):

        # we add the serving config to the trained model artifact
        add_serving_config_step = AddServingConfigOp(
            trained_model=training_results_step.outputs["model"],
            serving_config=SERVING_CONFIG,
        ).set_display_name("Add-Serving-Config")

        # upload the model to the Vertex model registry
        upload_model_step = gcc_aip.ModelUploadOp(
            project=project,
            location=location,
            display_name="rps-classifier",
            unmanaged_container_model=add_serving_config_step.outputs["configured_model"],
        ).set_display_name("Upload-Model")

        # again a condition, base on the pipeline parameter "deploy"
        # only deploy if this parameter it true
        with dsl.Condition(deploy == "true"):
            # creates an endpoint to deploy model to
            endpoint_create_step = gcc_aip.EndpointCreateOp(
                project=project, location=location, display_name="classifier-endpoint"
            ).set_display_name("Create-Deployment-Endpoint")
            
            # deploys model to endpoint
            gcc_aip.ModelDeployOp(
                model=upload_model_step.outputs["model"],
                endpoint=endpoint_create_step.outputs["endpoint"],
                dedicated_resources_machine_type="n1-standard-4",
                dedicated_resources_min_replica_count=1,
            ).set_display_name("Deploy-Model")

## 4. Compile and Run Pipeline

The last step is to compile the pipeline and run it on Vertex AI pipelines.

I also open up tensorboard directly, pointing to the tensorboard logs bucket. This will actually allow you to monitor the hyperparameter tuning jobs and the final training job.

In [None]:
from kfp.v2 import compiler
from google.cloud import aiplatform as aip

In [None]:
COMPILED_JOB_PATH = f"{COMPILED_JOBS_DIR.replace('gs://', 'gcs/')}/{PIPELINE_NAME}/{PIPELINE_NAME}.json"

In [None]:
Path(COMPILED_JOB_PATH).parent.mkdir(exist_ok=True)

In [None]:
# overwrite pipeline parameters
pipeline_parameters = dict(
    project=PROJECT,
    location=LOCATION,
    base_model_dir="https://tfhub.dev/google/imagenet/mobilenet_v1_100_224/classification/5",
    dataset_dir=f"{DATA_ROOT_DIR}/flower_data/flower_photos",
    compression="GZIP",
    deploy="true",
    seed=1,
)

In [None]:
compiler.Compiler().compile(
            pipeline_func=pipeline,
            package_path=COMPILED_JOB_PATH,
            pipeline_parameters=pipeline_parameters,
        )

In [None]:
aip.init(project=PROJECT, location=LOCATION)

pipeline_job = aip.PipelineJob(
    display_name=PIPELINE_NAME,
    template_path=COMPILED_JOB_PATH,
    pipeline_root=f"{ARTIFACTS_ROOT_DIR}/{PIPELINE_NAME}",
    enable_caching=True
)

In [None]:
pipeline_job.submit(service_account=SERVICE_ACCOUNT)

In [None]:
%load_ext tensorboard

In [None]:
%tensorboard --logdir $TENSORBOARD_LOGS_DIR