# kubeflow pipeline with Pythorch model

In [None]:
# Install components

!pip3 install -q google-cloud-aiplatform
!pip3 install -q  kfp google-cloud-pipeline-components

Enable this apis
* Cloud Build API
* Artifact Registry
* Container Registry

In [None]:
# After installing these packages you'll need to restart the kernel
import os
if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython
    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [None]:
# Finally, check that you have correctly installed the packages. The KFP SDK version
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

In [None]:
import random
import string

# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))

UUID = generate_uuid()

In [None]:
# Google Cloud project ID cand Region
PROJECT_ID = "[your-project-id]"
# Get your Google Cloud project ID from gcloud
if PROJECT_ID == "" or PROJECT_ID is None or PROJECT_ID == "[your-project-id]":
    # Get your GCP project id from gcloud
    shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]

# Setting Region
PROJECT_REGION = "[your-region]"  # @param {type: "string"}
if PROJECT_REGION == "[your-region]":
    PROJECT_REGION = "us-central1"


In [None]:
# Setting Bucket 
BUCKET_URI = f"gs://aip-{PROJECT_ID}"
GENERATE_BUCKET_URI = False  # @param {type:"boolean"}


if GENERATE_BUCKET_URI:
    bucket_name = "gs://aip-{}".format(PROJECT_ID)
    !gsutil mb -p {PROJECT_ID} -l {REGION} {bucket_name}

    # set GCS bucket object TTL to 7 days
    !echo '{"rule":[{"action": {"type": "Delete"},"condition": {"age": 7}}]}' > gcs_lifecycle.tmp
    !gsutil lifecycle set gcs_lifecycle.tmp {bucket_name}
    !rm gcs_lifecycle.tmp

    BUCKET_URI = bucket_name
    print(f"changed BUCKET_URI to {BUCKET_URI} due to GENERATE_BUCKET_URI is True")

if BUCKET_URI == "" or BUCKET_URI is None or BUCKET_URI == "gs://[your-bucket-name]":
    BUCKET_URI = f"gs://aip-{PROJECT_ID}"

In [None]:
# Setup up the following constants for Vertex AI Pipelines:
PIPELINE_ROOT = "{}/pipeline_root/minst".format(BUCKET_URI)
PACKAGE_PATH = "tmp/image classification pipeline.json".replace(" ", "_")

In [None]:
# Create pipeline directory locally to save the component and pipeline specifications
!mkdir -p ./tmp/pipelines

### Import libraries


In [None]:
import os
from typing import NamedTuple

import google_cloud_pipeline_components
import kfp
from google.cloud import aiplatform
from google.cloud.aiplatform import gapic as aip
from google.cloud.aiplatform import pipeline_jobs
from google.protobuf.json_format import MessageToDict
from google_cloud_pipeline_components import aiplatform as aip_components
from google_cloud_pipeline_components.experimental import custom_job
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import Input, Metrics, Model, Output, component

### Creating Custom model
consider a pipeline with the following steps:

* Ingest data: This step loads training data into the pipeline.
* Preprocess data: This step preprocesses the ingested training data.
* Train model: This step uses the preprocessed training data to train a model.
* Evaluate model: This step evaluates the trained model.
* Deploy: This step deploys the trained model for predictions.

In [None]:
# usefull function to get the latest run of a pipeline
from datetime import datetime
def get_timestamp():
    return datetime.now().strftime("%Y%m%d%H%M%S")

TIMESTAMP = get_timestamp()
print(f"TIMESTAMP = {TIMESTAMP}")

### Copy training application code and Dockerfile from local path to GCS location

In [None]:

APP_NAME = "finetuned-bert-classifier"
# copy training Dockerfile
!gsutil -m cp ../models/finetuned-bert-classifier/container/Dockerfile {BUCKET_URI}/{APP_NAME}/train/
# copy training application code
!gsutil -m cp -r ../models/finetuned-bert-classifier/python_package/trainer/ {BUCKET_URI}/{APP_NAME}/train/
print(f"Copied training application code and Dockerfile to {BUCKET_URI}/{APP_NAME}/train/")

#### Define custom pipeline component to build custom training container

There are a few things to notice about the component specification:

* The standalone function defined is converted as a pipeline component using the @kfp.v2.dsl.component decorator.
* All the arguments in the standalone function must have data type annotations because KFP uses the function’s inputs and outputs to define the component’s interface.
* By default Python 3.7 is used as the base image to run the code defined. You can configure the @component decorator to override the default image by specifying base_image, install additional python packages using packages_to_install parameter and write the compiled component file as a YAML file using output_component_file to share or reuse the component.

In [None]:
@component(
    base_image="gcr.io/google.com/cloudsdktool/cloud-sdk:latest",
    packages_to_install=["google-cloud-build"],
    output_component_file="./tmp/pipelines/build_custom_train_image.yaml",
)
def build_train_image(
    project: str, gs_train_src_path: str, training_image_uri: str
) -> NamedTuple("Outputs", [("training_image_uri", str)]):
    """custom pipeline component to build custom training image using
    Cloud Build and the training application code and dependencies
    defined in the Dockerfile
    """

    import logging
    import os

    from google.cloud.devtools import cloudbuild_v1 as cloudbuild
    from google.protobuf.duration_pb2 import Duration

    # initialize client for cloud build
    logging.getLogger().setLevel(logging.INFO)
    client = cloudbuild.services.cloud_build.CloudBuildClient()

    # parse step inputs to get path to Dockerfile and training application code
    gs_dockerfile_path = os.path.join(gs_train_src_path, "Dockerfile")
    gs_train_src_path = os.path.join(gs_train_src_path, "trainer/")

    logging.info(f"training_image_uri: {training_image_uri}")

    # define build steps to pull the training code and Dockerfile
    # and build/push the custom training container image
    build = cloudbuild.Build()
    build.steps = [
        {
            "name": "gcr.io/cloud-builders/gsutil",
            "args": ["cp", "-r", gs_train_src_path, "."],
        },
        {
            "name": "gcr.io/cloud-builders/gsutil",
            "args": ["cp", gs_dockerfile_path, "Dockerfile"],
        },
        # enabling Kaniko cache in a Docker build that caches intermediate
        # layers and pushes image automatically to Container Registry
        # https://cloud.google.com/build/docs/kaniko-cache
        {
            "name": "gcr.io/kaniko-project/executor:latest",
            "args": [f"--destination={training_image_uri}", "--cache=true"],
        },
    ]
    # override default timeout of 10min
    timeout = Duration()
    timeout.seconds = 7200
    build.timeout = timeout

    # create build
    operation = client.create_build(project_id=project, build=build)
    logging.info("IN PROGRESS:")
    logging.info(operation.metadata)

    # get build status
    result = operation.result()
    logging.info("RESULT:", result.status)

    # return step outputs
    return (training_image_uri,)

## Component: Get Custom Training Job Details from Vertex AI
This step gets details from a custom training job from Vertex AI including training elapsed time, model performance metrics that will be used in the next step before the model deployment. The step additionally creates Model artifact with trained model artifacts.

NOTE: The pre-built custom job component used in the pipeline outputs CustomJob resource but not the model artifacts.

* Inputs:

    * job_resource: Custom job resource returned by pre-built CustomJob component
    * project: Project ID where the job ran
    * region: Region where the job ran
    * eval_metric_key: Evaluation metric key name such as eval_accuracy
    *  model_display_name: Model display name for saving model artifacts
Outputs:

* model: Trained model artifacts created by the training job with added model metadata
* metrics: Model performance metrics captured from the training job

In [None]:
@component(
    base_image="python:3.9",
    packages_to_install=[
        "google-cloud-pipeline-components",
        "google-cloud-aiplatform",
        "pandas",
        "fsspec",
    ],
    output_component_file="./tmp/pipelines/get_training_job_details.yaml",
)
def get_training_job_details(
    project: str,
    location: str,
    job_resource: str,
    eval_metric_key: str,
    model_display_name: str,
    metrics: Output[Metrics],
    model: Output[Model],
) -> NamedTuple(
    "Outputs", [("eval_metric", float), ("eval_loss", float), ("model_artifacts", str)]
):
    """custom pipeline component to get model artifacts and performance
    metrics from custom training job
    """
    import logging
    import shutil
    from collections import namedtuple

    import pandas as pd
    from google.cloud.aiplatform import gapic as aip
    from google.protobuf.json_format import Parse
    from google_cloud_pipeline_components.proto.gcp_resources_pb2 import \
        GcpResources

    # parse training job resource
    logging.info(f"Custom job resource = {job_resource}")
    training_gcp_resources = Parse(job_resource, GcpResources())
    custom_job_id = training_gcp_resources.resources[0].resource_uri
    custom_job_name = "/".join(custom_job_id.split("/")[-6:])
    logging.info(f"Custom job name parsed = {custom_job_name}")

    # get custom job information
    API_ENDPOINT = "{}-aiplatform.googleapis.com".format(location)
    client_options = {"api_endpoint": API_ENDPOINT}
    job_client = aip.JobServiceClient(client_options=client_options)
    job_resource = job_client.get_custom_job(name=custom_job_name)
    job_base_dir = job_resource.job_spec.base_output_directory.output_uri_prefix
    logging.info(f"Custom job base output directory = {job_base_dir}")

    # copy model artifacts
    logging.info(f"Copying model artifacts to {model.path}")
    destination = shutil.copytree(job_base_dir.replace("gs://", "/gcs/"), model.path)
    logging.info(destination)
    logging.info(f"Model artifacts located at {model.uri}/model/{model_display_name}")
    logging.info(f"Model artifacts located at model.uri = {model.uri}")

    # set model metadata
    start, end = job_resource.start_time, job_resource.end_time
    model.metadata["model_name"] = model_display_name
    model.metadata["framework"] = "pytorch"
    model.metadata["job_name"] = custom_job_name
    model.metadata["time_to_train_in_seconds"] = (end - start).total_seconds()

    # fetch metrics from the training job run
    metrics_uri = f"{model.path}/model/{model_display_name}/all_results.json"
    logging.info(f"Reading and logging metrics from {metrics_uri}")
    metrics_df = pd.read_json(metrics_uri, typ="series")
    for k, v in metrics_df.items():
        logging.info(f"     {k} -> {v}")
        metrics.log_metric(k, v)

    # capture eval metric and log to model metadata
    eval_metric = (
        metrics_df[eval_metric_key] if eval_metric_key in metrics_df.keys() else None
    )
    eval_loss = metrics_df["eval_loss"] if "eval_loss" in metrics_df.keys() else None
    logging.info(f"     {eval_metric_key} -> {eval_metric}")
    logging.info(f'     "eval_loss" -> {eval_loss}')

    model.metadata[eval_metric_key] = eval_metric
    model.metadata["eval_loss"] = eval_loss

    # return output parameters
    outputs = namedtuple("Outputs", ["eval_metric", "eval_loss", "model_artifacts"])

    return outputs(eval_metric, eval_loss, job_base_dir)

### Component: Create Model Archive (MAR) file using Torch Model Archiver
This step packages trained model artifacts and custom prediction handler  as a model archive (.mar) file usign Torch Model Archiver tool.
Example: https://github.com/pytorch/serve/tree/master/model-archiver
* Inputs:

    * model_display_name: Model display name for saving model archive file
    * model_version: Model version for saving model archive file
    * handler: Location of custom prediction handler
    * model: Trained model artifacts from the previous step

* Outputs:

    * model_mar: Packaged model archive file (artifact) on GCS
    * mar_env: A list of environment variables required for creating model resource
    * mar_export_uri: GCS path to the model archive file

Copy custom prediction handler code from local path to GCS location

Define custom pipeline component to create model archive file

Define custom pipeline component to build custom serving container

In [None]:
@component(
    base_image="python:3.9",
    packages_to_install=["torch-model-archiver"], 
    output_component_file="./tmp/pipelines/generate_mar_file.yaml",
)
def generate_mar_file(
    model_display_name: str,
    model_version: str,
    handler: str,
    model: Input[Model],
    model_mar: Output[Model],
) -> NamedTuple("Outputs", [("mar_env_var", list), ("mar_export_uri", str)]):
    """custom pipeline component to package model artifacts and custom
    handler to a model archive file using Torch Model Archiver tool
    """
    import logging
    import os
    import subprocess
    import time
    from collections import namedtuple
    from pathlib import Path

    logging.getLogger().setLevel(logging.INFO)

    # create directory to save model archive file
    model_output_root = model.path
    mar_output_root = model_mar.path
    export_path = f"{mar_output_root}/model-store"
    try:
        Path(export_path).mkdir(parents=True, exist_ok=True)
    except Exception as e:
        logging.warning(e)
        # retry after pause
        time.sleep(2)
        Path(export_path).mkdir(parents=True, exist_ok=True)

    # parse and configure paths for model archive config
    handler_path = (
        handler.replace("gs://", "/gcs/") + "predictor/handler.py"
        if handler.startswith("gs://")
        else handler
    )
    model_artifacts_dir = f"{model_output_root}/model/{model_display_name}"
    extra_files = [
        os.path.join(model_artifacts_dir, f)
        for f in os.listdir(model_artifacts_dir)
        if f != "pytorch_model.bin"
    ]

    # define model archive config
    mar_config = {
        "MODEL_NAME": model_display_name,
        "HANDLER": handler_path,
        "SERIALIZED_FILE": f"{model_artifacts_dir}/pytorch_model.bin",
        "VERSION": model_version,
        "EXTRA_FILES": ",".join(extra_files),
        "EXPORT_PATH": f"{model_mar.path}/model-store",
    }

    # generate model archive command
    archiver_cmd = (
        "torch-model-archiver --force "
        f"--model-name {mar_config['MODEL_NAME']} "
        f"--serialized-file {mar_config['SERIALIZED_FILE']} "
        f"--handler {mar_config['HANDLER']} "
        f"--version {mar_config['VERSION']}"
    )
    if "EXPORT_PATH" in mar_config:
        archiver_cmd += f" --export-path {mar_config['EXPORT_PATH']}"
    if "EXTRA_FILES" in mar_config:
        archiver_cmd += f" --extra-files {mar_config['EXTRA_FILES']}"
    if "REQUIREMENTS_FILE" in mar_config:
        archiver_cmd += f" --requirements-file {mar_config['REQUIREMENTS_FILE']}"

    # run archiver command
    logging.warning("Running archiver command: %s", archiver_cmd)
    with subprocess.Popen(
        archiver_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
    ) as p:
        _, err = p.communicate()
        if err:
            raise ValueError(err)

    # set output variables
    mar_env_var = [{"name": "MODEL_NAME", "value": model_display_name}]
    mar_export_uri = f"{model_mar.uri}/model-store/"

    outputs = namedtuple("Outputs", ["mar_env_var", "mar_export_uri"])
    return outputs(mar_env_var, mar_export_uri)

In [None]:
# copy custom prediction handler
!gsutil cp ../models/finetuned-bert-classifier/predictor/handler.py {BUCKET_URI}/{APP_NAME}/serve/predictor/handler.py
!gsutil cp ../models/finetuned-bert-classifier/predictor/Dockerfile.serve {BUCKET_URI}/{APP_NAME}/serve/predictor/Dockerfile.serve
!gsutil cp ../models/finetuned-bert-classifier/predictor/index_to_name.json {BUCKET_URI}/{APP_NAME}/serve/predictor/index_to_name.json

# list copied files from GCS location
!gsutil ls -lR {BUCKET_URI}/{APP_NAME}/serve/

print(f"Copied custom prediction handler code to {BUCKET_URI}/{APP_NAME}/serve/")

In [None]:
@component(
    base_image="python:3.9",
    packages_to_install=["google-cloud-build"],
    output_component_file="./tmp/pipelines/build_custom_serving_image.yaml",
)
def build_custom_serving_image(
    project: str, gs_serving_dependencies_path: str, serving_image_uri: str
) -> NamedTuple("Outputs", [("serving_image_uri", str)],):
    """custom pipeline component to build custom serving image using
    Cloud Build and dependencies defined in the Dockerfile
    """
    import logging
    import os

    from google.cloud.devtools import cloudbuild_v1 as cloudbuild
    from google.protobuf.duration_pb2 import Duration

    logging.getLogger().setLevel(logging.INFO)
    build_client = cloudbuild.services.cloud_build.CloudBuildClient()

    logging.info(f"gs_serving_dependencies_path: {gs_serving_dependencies_path}")
    gs_dockerfile_path = os.path.join(gs_serving_dependencies_path, "Dockerfile.serve")

    logging.info(f"serving_image_uri: {serving_image_uri}")
    build = cloudbuild.Build()
    build.steps = [
        {
            "name": "gcr.io/cloud-builders/gsutil",
            "args": ["cp", gs_dockerfile_path, "Dockerfile"],
        },
        # enabling Kaniko cache in a Docker build that caches intermediate
        # layers and pushes image automatically to Container Registry
        # https://cloud.google.com/build/docs/kaniko-cache
        {
            "name": "gcr.io/kaniko-project/executor:latest",
            "args": [f"--destination={serving_image_uri}", "--cache=true"],
        },
    ]
    # override default timeout of 10min
    timeout = Duration()
    timeout.seconds = 7200
    build.timeout = timeout

    # create build
    operation = build_client.create_build(project_id=project, build=build)
    logging.info("IN PROGRESS:")
    logging.info(operation.metadata)

    # get build status
    result = operation.result()
    logging.info("RESULT:", result.status)

    # return step outputs
    return (serving_image_uri,)

#### Component: Create custom serving container running TorchServe

The step builds a custom serving container running TorchServe HTTP server to serve prediction requests for the models mounted. The output from this step is the Container registry URI to the custom serving container.

* Inputs:
    * project: Project ID to run
    * serving_image_uri: Custom serving container URI from Container registry
    * gs_serving_dependencies_path: Location of serving dependencies - Dockerfile
* Outputs:
    * serving_image_uri: Custom serving container URI from Container registry
    * Create Dockerfile from TorchServe CPU image as base, install required dependencies and run TorchServe serve command

In [None]:
@component(
    base_image="python:3.9",
    packages_to_install=[
        "google-cloud-aiplatform", 
        "google-cloud-pipeline-components"
    ],
    output_component_file="./tmp/pipelines/make_prediction_request.yaml",
)
def make_prediction_request(project: str, bucket: str, endpoint: str, instances: list):
    """custom pipeline component to pass prediction requests to Vertex AI
    endpoint and get responses
    """
    import base64
    import logging

    from google.cloud import aiplatform
    from google.protobuf.json_format import Parse
    from google_cloud_pipeline_components.proto.gcp_resources_pb2 import \
        GcpResources

    logging.getLogger().setLevel(logging.INFO)
    aiplatform.init(project=project, staging_bucket=bucket)

    # parse endpoint resource
    logging.info(f"Endpoint = {endpoint}")
    gcp_resources = Parse(endpoint, GcpResources())
    endpoint_uri = gcp_resources.resources[0].resource_uri
    endpoint_id = "/".join(endpoint_uri.split("/")[-8:-2])
    logging.info(f"Endpoint ID = {endpoint_id}")

    # define endpoint client
    _endpoint = aiplatform.Endpoint(endpoint_id)

    # call prediction endpoint for each instance
    for instance in instances:
        if not isinstance(instance, (bytes, bytearray)):
            instance = instance.encode()
        logging.info(f"Input text: {instance.decode('utf-8')}")
        b64_encoded = base64.b64encode(instance)
        test_instance = [{"data": {"b64": f"{str(b64_encoded.decode('utf-8'))}"}}]
        response = _endpoint.predict(instances=test_instance)
        logging.info(f"Prediction response: {response.predictions}")

## Define Pipeline Specification

In [None]:
import os
from datetime import datetime

REGION = os.getenv("REGION", "us-central1")
APP_NAME = os.getenv("APP_NAME", "finetuned-bert-classifier")
VERSION = datetime.now().strftime("%Y%m%d%H%M%S")
MODEL_NAME = APP_NAME
BUCKET = BUCKET_URI
MODEL_DISPLAY_NAME = f"{MODEL_NAME}-{VERSION}"

PIPELINE_NAME = f"pytorch-{APP_NAME}"
PIPELINE_ROOT = f"{BUCKET}/pipeline_root/{MODEL_NAME}"
GCS_STAGING = f"{BUCKET}/pipeline_root/{MODEL_NAME}"

TRAIN_IMAGE_URI = f"gcr.io/{PROJECT_ID}/pytorch_gpu_train_{MODEL_NAME}"
SERVE_IMAGE_URI = f"gcr.io/{PROJECT_ID}/pytorch_cpu_predict_{MODEL_NAME}"

MACHINE_TYPE = "n1-standard-8"
REPLICA_COUNT = "1"
ACCELERATOR_TYPE = "NVIDIA_TESLA_T4"
ACCELERATOR_COUNT = "1"
NUM_WORKERS = 1

SERVING_HEALTH_ROUTE = "/ping"
SERVING_PREDICT_ROUTE = f"/predictions/{MODEL_NAME}"
SERVING_CONTAINER_PORT= [{"containerPort": 7080}]
SERVING_MACHINE_TYPE = "n1-standard-4"
SERVING_MIN_REPLICA_COUNT = 1
SERVING_MAX_REPLICA_COUNT=1
SERVING_TRAFFIC_SPLIT='{"0": 100}'



In [None]:
@dsl.pipeline(name=f"pytorch-{APP_NAME}")
def pytorch_text_classifier_pipeline(
    pipeline_job_id: str,
    gs_train_script_path: str,
    gs_serving_dependencies_path: str,
    eval_acc_threshold: float,
    is_hp_tuning_enabled: str = "n",
):
    # ========================================================================
    # build custom training container image
    # ========================================================================
    # build custom container for training job passing the
    # GCS location of the training application code
    build_custom_train_image_task = (
        build_train_image(
            project=PROJECT_ID,
            gs_train_src_path=gs_train_script_path,
            training_image_uri=TRAIN_IMAGE_URI,
        )
        .set_caching_options(True)
        .set_display_name("Build custom training image")
    )


    # ========================================================================
    # model training
    # ========================================================================
    # train the model on Vertex AI by submitting a CustomJob
    # using the custom container (no hyper-parameter tuning)
    # define training code arguments
    training_args = ["--num-epochs", "2", "--model-name", MODEL_NAME]
    # define job name
    JOB_NAME = f"{MODEL_NAME}-train-pytorch-cstm-cntr-{TIMESTAMP}"
    GCS_BASE_OUTPUT_DIR = f"{GCS_STAGING}/{TIMESTAMP}"
    # define worker pool specs
    worker_pool_specs = [
        {
            "machine_spec": {
                "machine_type": MACHINE_TYPE,
                "accelerator_type": ACCELERATOR_TYPE,
                "accelerator_count": ACCELERATOR_COUNT,
            },
            "replica_count": REPLICA_COUNT,
            "container_spec": {"image_uri": TRAIN_IMAGE_URI, "args": training_args},
        }
    ]

    run_train_task = (
        custom_job.CustomTrainingJobOp(
            project=PROJECT_ID,
            location=REGION,
            display_name=JOB_NAME,
            base_output_directory=GCS_BASE_OUTPUT_DIR,
            worker_pool_specs=worker_pool_specs,
        )
        .set_display_name("Run custom training job")
        .after(build_custom_train_image_task)
    )

    # ========================================================================
    # get training job details
    # ========================================================================
    training_job_details_task = get_training_job_details(
        project=PROJECT_ID,
        location=REGION,
        job_resource=run_train_task.output,
        eval_metric_key="eval_accuracy",
        model_display_name=MODEL_NAME,
    ).set_display_name("Get custom training job details")

    # ========================================================================
    # model deployment when condition is met
    # ========================================================================
    with dsl.Condition(
        training_job_details_task.outputs["eval_metric"] > eval_acc_threshold,
        name="model-deploy-decision",
    ):
        # ===================================================================
        # create model archive file
        # ===================================================================
        create_mar_task = generate_mar_file(
            model_display_name=MODEL_NAME,
            model_version=VERSION,
            handler=gs_serving_dependencies_path,
            model=training_job_details_task.outputs["model"],
        ).set_display_name("Create MAR file")

        # ===================================================================
        # build custom serving container running TorchServe
        # ===================================================================
        # build custom container for serving predictions using
        # the trained model artifacts served by TorchServe
        build_custom_serving_image_task = build_custom_serving_image(
            project=PROJECT_ID,
            gs_serving_dependencies_path=gs_serving_dependencies_path,
            serving_image_uri=SERVE_IMAGE_URI,
        ).set_display_name("Build custom serving image")

        # ===================================================================
        # create model resource
        # ===================================================================
        # upload model to vertex ai
        model_upload_task = (
            aip_components.ModelUploadOp(
                project=PROJECT_ID,
                display_name=MODEL_DISPLAY_NAME,
                serving_container_image_uri=SERVE_IMAGE_URI,
                serving_container_predict_route=SERVING_PREDICT_ROUTE,
                serving_container_health_route=SERVING_HEALTH_ROUTE,
                serving_container_ports=SERVING_CONTAINER_PORT,
                serving_container_environment_variables=create_mar_task.outputs[
                    "mar_env_var"
                ],
                artifact_uri=create_mar_task.outputs["mar_export_uri"],
            )
            .set_display_name("Upload model")
            .after(build_custom_serving_image_task)
        )

        # ===================================================================
        # create Vertex AI Endpoint
        # ===================================================================
        # create endpoint to deploy one or more models
        # An endpoint provides a service URL where the prediction requests are sent
        endpoint_create_task = (
            aip_components.EndpointCreateOp(
                project=PROJECT_ID,
                display_name=MODEL_NAME + "-endpoint",
            )
            .set_display_name("Create endpoint")
            .after(create_mar_task)
        )

        # ===================================================================
        # deploy model to Vertex AI Endpoint
        # ===================================================================
        # deploy models to endpoint to associates physical resources with the model
        # so it can serve online predictions
        model_deploy_task = aip_components.ModelDeployOp(
            endpoint=endpoint_create_task.outputs["endpoint"],
            model=model_upload_task.outputs["model"],
            deployed_model_display_name=MODEL_NAME,
            dedicated_resources_machine_type=SERVING_MACHINE_TYPE,
            dedicated_resources_min_replica_count=SERVING_MIN_REPLICA_COUNT,
            dedicated_resources_max_replica_count=SERVING_MAX_REPLICA_COUNT,
            traffic_split=SERVING_TRAFFIC_SPLIT,
        ).set_display_name("Deploy model to endpoint")

        # ===================================================================
        # test model deployment
        # ===================================================================
        # test model deployment by making online prediction requests
        test_instances = [
            "Jaw dropping visual affects and action! One of the best I have seen to date.",
            "Take away the CGI and the A-list cast and you end up with film with less punch.",
        ]
        predict_test_instances_task = make_prediction_request(
            project=PROJECT_ID,
            bucket=BUCKET,
            endpoint=model_deploy_task.outputs["gcp_resources"],
            instances=test_instances,
        ).set_display_name("Test model deployment making online predictions")
        predict_test_instances_task


In [None]:
PIPELINE_JSON_SPEC_PATH = "./tmp/pipelines/pytorch_text_classifier_pipeline_spec.json"
compiler.Compiler().compile(
    pipeline_func=pytorch_text_classifier_pipeline,
    package_path=PIPELINE_JSON_SPEC_PATH
)

In [None]:
# initialize Vertex AI SDK
aiplatform.init(project=PROJECT_ID, location=PROJECT_REGION)

# define pipeline parameters
# NOTE: These parameters can be included in the pipeline config file as needed
PIPELINE_JOB_ID = f"pipeline-{APP_NAME}-{get_timestamp()}"
TRAIN_APP_CODE_PATH = f"{BUCKET}/{APP_NAME}/train/"
SERVE_DEPENDENCIES_PATH = f"{BUCKET}/{APP_NAME}/serve/"
print(PIPELINE_JOB_ID)
print(TRAIN_APP_CODE_PATH)

print(SERVE_DEPENDENCIES_PATH)

pipeline_params = {
    "pipeline_job_id": PIPELINE_JOB_ID,
    "gs_train_script_path": TRAIN_APP_CODE_PATH,
    "gs_serving_dependencies_path": SERVE_DEPENDENCIES_PATH,
    "eval_acc_threshold": 0.87,
    "is_hp_tuning_enabled": "n",
}


# define pipeline job
pipeline_job = pipeline_jobs.PipelineJob(
    display_name=PIPELINE_NAME,
    job_id=PIPELINE_JOB_ID,
    template_path=PIPELINE_JSON_SPEC_PATH,
    pipeline_root=PIPELINE_ROOT,
    parameter_values=pipeline_params,
    enable_caching=True,
)

In [None]:
# submit pipeline job for execution
response = pipeline_job.run(sync=True)
response