# Vertex AI Pipelines & Tests

The scripts in this notebook have been updated in January 2024 by <chi-charles.zhang@lloydsbanking.com>


References:   
- Google offical docs: https://cloud.google.com/vertex-ai/docs/pipelines/introduction
- A demo on YouTube: https://bit.ly/ml-pipeline-3. 

## Environment & Configurations

### Dependencies

In [None]:
# updated to the latest dependencies on December 2023
!pip install google-cloud-aiplatform==1.37.0 --upgrade --quiet
!pip install google-cloud-pipeline-components==2.6.0 --upgrade --quiet
!pip install kfp==2.4.0 --upgrade --quiet
!pip install --upgrade --quiet pip

In [None]:
# Automatically restart kernel after installs so that your environment can access the new packages
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

In [1]:
# check the installed packages
!python3 -c "from google.cloud import aiplatform; print('aiplatform version: {}'.format(aiplatform.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

aiplatform version: 1.37.0
google_cloud_pipeline_components version: 2.6.0
KFP SDK version: 2.4.0


In [2]:
import kfp

from typing import NamedTuple

from kfp.dsl import pipeline
from kfp.dsl import component
from kfp.dsl import OutputPath
from kfp.dsl import InputPath


from kfp.dsl import Output
from kfp.dsl import Metrics

from kfp import compiler

from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

from google_cloud_pipeline_components.v1.model import ModelUploadOp

### Constant variables
Setting up the following constant variables:

- project id 
- bucket storage
- pipeline root folder
- region
- service account

In [3]:
import os
PROJECT_ID = ""
# Get Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  playpen-ed7014


In [4]:
# create a variable to store the bucket name
BUCKET_NAME="gs://" + "data-science-" + PROJECT_ID + "-bucket"
print("Bucket URL: ", BUCKET_NAME)

Bucket URL:  gs://data-science-playpen-ed7014-bucket


In [5]:
PIPELINE_ROOT = f"{BUCKET_NAME}/my_pipelines/"
print("Pipeline Root URL: ", PIPELINE_ROOT)

Pipeline Root URL:  gs://data-science-playpen-ed7014-bucket/my_pipelines/


In [6]:
REGION = 'europe-west2'  # London

In [7]:
SERVICE_ACCOUNT = "playpen-ed7014-consumer-sa@playpen-ed7014.iam.gserviceaccount.com"

### Clients

In [8]:
aiplatform.init(project=PROJECT_ID,
                location=REGION)

## Pipeline 1: A basic pipeline

### Components

In [25]:
@component(base_image="python:3.10")
def combine_two_strings(string1: str, string2: str) -> str:
    return string1 + " " + string2

In [26]:
@component(base_image="python:3.10")
def swap_words(original_string: str)->NamedTuple("output", [("before", str), ("after", str)]):
    words = original_string.split(" ") 
    swapped_string = words[1] + " " + words[0]
    return original_string, swapped_string

### Basic Pipeline

In [None]:
@pipeline(name="pipeline_1",
          pipeline_root=PIPELINE_ROOT + "pipeline_1")
def pipeline_1(first_string: str='Hello', second_string: str='Pipeline'):
    combine_task = combine_two_strings(string1=first_string, string2=second_string)
    swap_task = swap_words(original_string=combine_task.output)

### Compile

The compiler takes our pipeline (or component) function and compiles it into our pipeline specifiction as yaml file. This yaml file we can use to create our pipeline (or component) in Vertex AI Pipelines.

In [None]:
compiler.Compiler().compile(
    pipeline_func=combine_two_strings, 
    package_path="component_combine_two_strings.yaml",
)

compiler.Compiler().compile(
    pipeline_func=swap_words, 
    package_path="component_swap_words.yaml",
)


compiler.Compiler().compile(
    pipeline_func=pipeline_1, 
    package_path="pipeline_1.yaml",
)

In [None]:
# view component file
# !cat ./component_combine_two_strings.yaml

### Run

Create the run job using the API.
You can also directly upload the pipeline JSON file in the Vertx AI UI.

In [None]:
pipeline_job_1 = pipeline_jobs.PipelineJob(
    display_name="pipeline_1",
    template_path="pipeline_1.yaml",
    parameter_values={"first_string": "Hello", "second_string": "Pipeline"}
)

In [None]:
pipeline_job_1.run(service_account=SERVICE_ACCOUNT,
                   sync=False)

## Pipeline 2: Retrieve and reuse a component
In this example we reuse the component persistently stored in a yaml file

In [None]:
@component(base_image="python:3.10")
def swap_words(original_string: str)->NamedTuple("output", [("before", str), ("after", str)]):
    words = original_string.split(" ") 
    swapped_string = words[1] + " " + words[0]
    return original_string, swapped_string

In [None]:
@pipeline(name="pipeline_2",
          pipeline_root=PIPELINE_ROOT + "pipeline_2")
def reuse_component_pipeline(first_string: str='Reuse', second_string: str='Component'):
    # retrieve the component
    component_combine_two_strings = kfp.components.load_component_from_file('./component_combine_two_strings.yaml')
    
    combine_task = component_combine_two_strings(string1=first_string, string2=second_string)
    swap_task = swap_words(original_string=combine_task.output)

In [None]:
compiler.Compiler().compile(
    pipeline_func=reuse_component_pipeline, 
    package_path="pipeline_2.yaml"
)

pipeline_job_2 = pipeline_jobs.PipelineJob(
    display_name="pipeline_2",
    template_path="pipeline_2.yaml",
    parameter_values={"first_string": "Reuse", "second_string": "Component"}
)

In [None]:
pipeline_job_2.run(service_account=SERVICE_ACCOUNT,
                   sync=False)

## Schedule a pipeline (using Pipeline 1 as example)

We can create a pipeline run schedule in the following ways:
1. Creating a schedule using the `PipelineJobSchedule.create` method.
2. Create a schedule based on a PipelineJob using the `PipelineJob.create_schedule` method.

The following example demonstrates the former way, `PipelineJobSchedule.create` method.

In [None]:
# === Define components and pipeline ===

@component(base_image="python:3.10")
def combine_two_strings(string1: str, string2: str) -> str:
    return string1 + " " + string2

@component(base_image="python:3.10")
def swap_words(original_string: str)->NamedTuple("output", [("before", str), ("after", str)]):
    words = original_string.split(" ") 
    swapped_string = words[1] + " " + words[0]
    return original_string, swapped_string

@pipeline(name="pipeline_1_schedule",
          pipeline_root=PIPELINE_ROOT + "pipeline_1_schedule")
def schedule_pipeline_1(first_string: str='Hello', second_string: str='Schedule'):
    combine_task = combine_two_strings(string1=first_string, string2=second_string)
    swap_task = swap_words(original_string=combine_task.output)

In [None]:
# Compiling
compiler.Compiler().compile(
    pipeline_func=schedule_pipeline_1, 
    package_path="pipeline_1_schedule.yaml",
)

In [None]:
from google.cloud import aiplatform

pipeline_job_1_schedule = pipeline_jobs.PipelineJob(
    display_name="pipeline_1_schedule",
    template_path="pipeline_1_schedule.yaml",
    parameter_values={"first_string": "Hello", "second_string": "Pipeline"}
)

pipeline_job_schedule = aiplatform.PipelineJobSchedule(
    pipeline_job=pipeline_job_1_schedule,
    display_name="pipeline_1_schedule"
)

pipeline_job_schedule.create(
    cron="*/1 * * * *",   # runs every 1 minute; also see cron definition, https://en.wikipedia.org/wiki/Cron
    max_concurrent_run_count=3,  # The maximum number of concurrent runs for the schedule.
    max_run_count=5,  # The maximum number of pipeline runs that the schedule creates after which it's completed.
    service_account=SERVICE_ACCOUNT,
)


## Pipeline 3: using the TF and GPU based image ("tf2-gpu.2-6") in a component

- The following is an example how you can add an TF framework with GPU to your component.     
- For example the training componentn that needs access to accelerators.
- This job takes about 20 minutes.

In [None]:
@component(base_image="gcr.io/deeplearning-platform-release/tf2-gpu.2-6")
def tf_gpu_training_image() -> bool:
    import logging
    import tensorflow as tf

    gpus = tf.config.list_physical_devices('GPU')
    for gpu in gpus:
        logging.info('Name: {} Type: {} TF_version: {}'.format(gpu.name, gpu.device_type, tf.version.VERSION))
    print("TersonFlow version:", tf.version.VERSION)
    
    return True

In [None]:
@pipeline(name="pipeline_3",
          pipeline_root=PIPELINE_ROOT + "pipeline_3")
def tf_gpu_pipeline():
    gpu_training = tf_gpu_training_image()
    gpu_training.add_node_selector_constraint(accelerator="NVIDIA_TESLA_T4")

In [None]:
compiler.Compiler().compile(
    pipeline_func=tf_gpu_training_image, 
    package_path="tf_gpu_training_image.yaml"
)

compiler.Compiler().compile(
    pipeline_func=tf_gpu_pipeline, 
    package_path="pipeline_3.yaml"
)

pipeline_job_3 = pipeline_jobs.PipelineJob(
   display_name="pipeline_3",
   template_path="pipeline_3.yaml"
)

In [None]:
pipeline_job_3.run(service_account=SERVICE_ACCOUNT,
                   sync=False)

## Pipeline 4: Adding additional dependencies

In [None]:
@component(base_image="python:3.10",
           packages_to_install = ["pandas==2.1.4"],)
def additional_packages():
    import pandas
    print("Pandas version: ", pandas.__version__)

# !We expect pipeline fails for demonstration purposes
@component(base_image="python:3.10")
def additional_packages_missing():
    import pandas
    print("Pandas version: ", pandas.__version__)

In [None]:
@pipeline(name="pipeline_4",
          pipeline_root=PIPELINE_ROOT + "pipeline_4")
def pipeline_4():
    additional_packages_task = additional_packages()
    additional_packages_missing_task = additional_packages_missing()

compiler.Compiler().compile(
    pipeline_func=pipeline_4, 
    package_path="pipeline_4.yaml"
)

pipeline_job_4 = pipeline_jobs.PipelineJob(
    display_name="pipeline_4",
    template_path="pipeline_4.yaml"
)

pipeline_job_4.run(service_account=SERVICE_ACCOUNT,
                   sync=False)

## Pipeline 5: End-to-end XGBoost

### Dependencies

In [9]:
from typing import NamedTuple

from kfp import dsl
from kfp.dsl import (Artifact,
                     Dataset,
                     Input,
                     Model,
                     Output,
                     Metrics,
                     ClassificationMetrics,
                     component,
                     Markdown)

from kfp import compiler

### Data

In [10]:
@component(base_image="python:3.10",
           packages_to_install = [
                                   "pandas==2.1.4",  # 1.3.4
                                   "scikit-learn==1.3.2"  #1.0.1
                                 ],
)
def get_data(
    dataset_train: Output[Dataset],
    dataset_test: Output[Dataset],
):

    from sklearn import datasets
    from sklearn.model_selection import train_test_split
    import pandas as pd

    # dataset https://www.kaggle.com/uciml/breast-cancer-wisconsin-data
    data_raw = datasets.load_breast_cancer()
    data = pd.DataFrame(data_raw.data, columns=data_raw.feature_names)
    data["target"] = data_raw.target

    train, test = train_test_split(data, test_size=0.3)

    train.to_csv(dataset_train.path)
    test.to_csv(dataset_test.path)

### Training

In [11]:
@component(base_image="python:3.10",
           packages_to_install = [
                                   "pandas==2.1.4",  # 1.3.4
                                   "scikit-learn==1.3.2", #xgboost requires scikitlearn #1.0.1
                                   "xgboost==2.0.3",  # 1.5.1
                                 ],
)
def train_model(
    dataset: Input[Dataset],
    model_artifact: Output[Model]
):

    from xgboost import XGBClassifier
    import pandas as pd

    data = pd.read_csv(dataset.path)

    model = XGBClassifier(
        objective="binary:logistic"
    )
    model.fit(
        data.drop(columns=["target"]),
        data.target,
    )

    score = model.score(
        data.drop(columns=["target"]),
        data.target,
    )

    model_artifact.metadata["train_score"] = float(score)
    model_artifact.metadata["framework"] = "XGBoost"

    print(model_artifact.path)

    model.save_model(model_artifact.path)

### Evaluation

In [12]:
@component(base_image="python:3.10",
           packages_to_install = [
                                   "pandas==2.1.4",  # 1.3.4
                                   "scikit-learn==1.3.2", #xgboost requires scikitlearn #1.0.1
                                   "xgboost==2.0.3",  # 1.5.1
                                 ],
)
def eval_model(
    test_set: Input[Dataset],
    xgb_model: Input[Model],
    metrics: Output[ClassificationMetrics],
    # smetrics: Output[Metrics]
) -> NamedTuple("Outputs", [("deploy", str)]):
    
    from xgboost import XGBClassifier
    import pandas as pd

    data = pd.read_csv(test_set.path)
    model = XGBClassifier()
    model.load_model(xgb_model.path)

    score = model.score(
        data.drop(columns=["target"]),
        data.target,
    )

    from sklearn.metrics import roc_curve
    y_scores =  model.predict_proba(data.drop(columns=["target"]))[:, 1]
    fpr, tpr, thresholds = roc_curve(
         y_true=data.target.to_numpy(), y_score=y_scores, pos_label=True
    )
    # metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())

    from sklearn.metrics import confusion_matrix
    y_pred = model.predict(data.drop(columns=["target"]))

    metrics.log_confusion_matrix(
       ["False", "True"],
       confusion_matrix(
           data.target, y_pred
       ).tolist()
    )

#     xgb_model.metadata["test_score"] = float(score)
    # smetrics.log_metric("score", float(score))


    deploy = "true"
    #compare threshold or to previous

    return (deploy,)

### Deployment

In [13]:
@component(base_image="python:3.10",
           packages_to_install=["google-cloud-aiplatform==1.37.0"])   # 1.3.0
def deploy(
    model: Input[Model],
    project: str,
    region: str,):

    import logging
    from google.cloud import aiplatform
    aiplatform.init(project=project, location=region)

    logging.basicConfig(level=logging.DEBUG)
    logging.debug(model)

    print("model: ", model)
    print("model.uri: ", model.uri)

    import os
    path,file = os.path.split(model.uri)

    import datetime

    # datetime.datetime.now().strftime('%Y%m%d%H%M%S')
    # serving image https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers#xgboost
    deployed_model = aiplatform.Model.upload(
          display_name="xgboost-pipeline",
          artifact_uri = path,
          serving_container_image_uri="europe-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-7:latest",  # google pre-built-containers#xgboost
    )


### Pipeline

In [14]:
@pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT + "pipeline_5",
    # A name for the pipeline. Use to determine the pipeline Context.
    name="pipeline_5",
)
def pipeline_5():
    dataset_op = get_data()
    training_op = train_model(dataset=dataset_op.outputs["dataset_train"])
    eval_op = eval_model(
        test_set=dataset_op.outputs["dataset_test"],
        xgb_model=training_op.outputs["model_artifact"]
    )

    with dsl.If(
        eval_op.outputs["deploy"] == "true",
        name="deploy",
    ):
        deploy_op = deploy(
                            model=training_op.outputs["model_artifact"],
                            project=PROJECT_ID,
                            region=REGION,
                          )

    # we need a solution for xgb models
    # its here https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api#aiplatform_deploy_model_custom_trained_model_sample-python


### Compile

In [15]:
compiler.Compiler().compile(
    pipeline_func=pipeline_5,
    package_path='pipeline_5.yaml')

### Run Pipeline

In [16]:
pipeline_job_5 = pipeline_jobs.PipelineJob(
        display_name="pipeline_5",
        template_path="pipeline_5.yaml"
)

pipeline_job_5.run(service_account=SERVICE_ACCOUNT,
                   sync=False)

Creating PipelineJob
PipelineJob created. Resource name: projects/209587640097/locations/europe-west2/pipelineJobs/pipeline-5-20240117112703
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/209587640097/locations/europe-west2/pipelineJobs/pipeline-5-20240117112703')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west2/pipelines/runs/pipeline-5-20240117112703?project=209587640097
PipelineJob projects/209587640097/locations/europe-west2/pipelineJobs/pipeline-5-20240117112703 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/209587640097/locations/europe-west2/pipelineJobs/pipeline-5-20240117112703 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/209587640097/locations/europe-west2/pipelineJobs/pipeline-5-20240117112703 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/209587640097/locations/europe-west2/pipelineJobs/pipeline-5-20240117112703

# Pipeline end to end (Huggingface, Sentiment)
Work in progress

## Dependencies

In [None]:
import kfp

from typing import NamedTuple

from kfp.v2.dsl import pipeline
from kfp.v2.dsl import component
from kfp.v2.dsl import OutputPath
from kfp.v2.dsl import InputPath


from typing import NamedTuple

from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component,
                        Markdown)

from kfp.v2 import compiler
from kfp.v2.google.client import AIPlatformClient


from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs

from google_cloud_pipeline_components import aiplatform as gcc_aip

from google_cloud_pipeline_components.v1.model import ModelUploadOp

In [None]:
PROJECT_ID = "sascha-playground-doit"
PIPELINE_ROOT = "gs://doit-vertex-demo/"

In [None]:
# use this instead
aiplatform.init(project=PROJECT_ID,
                location='us-central1')

## Train

In [None]:
@component(
    packages_to_install = [
        "transformers==4.1.1",
        "google-cloud-storage==1.35.0",
        "scikit-learn==0.24.0",
        "pandas==1.1.5"
    ],
    base_image="gcr.io/deeplearning-platform-release/tf2-gpu.2-6"
)
def train_model(
    epochs: int,
    model_artifact: Output[Model],
    smetrics: Output[Metrics]
):

    from sklearn.model_selection import train_test_split
    from transformers import DistilBertTokenizerFast
    from transformers import TFDistilBertForSequenceClassification
    from google.cloud import storage

    import tensorflow as tf
    import json
    import pandas as pd
    import numpy as np
    from io import StringIO

    print('load distilbert')
    # load model and tokenizer
    model = TFDistilBertForSequenceClassification.from_pretrained(
        "distilbert-base-uncased")
    tokenizer = DistilBertTokenizerFast.from_pretrained("distilbert-base-uncased")


    print('start training')
    file = tf.io.gfile.GFile(
        'gs://machine-learning-samples/datasets/sentiment/imdb/csv/dataset.csv', mode='r').read()
    df = pd.read_csv(StringIO(file))

    #df = df.head()

    sentiments = df['sentiment'].values.tolist()
    reviews = df['review'].values.tolist()

    training_sentences, validation_sentences, training_labels, validation_labels = train_test_split(
        reviews,
        sentiments,
        test_size=.2)

    train_encodings = tokenizer(training_sentences,
                                truncation=True,
                                padding=True)
    val_encodings = tokenizer(validation_sentences,
                              truncation=True,
                              padding=True)

    train_dataset = tf.data.Dataset.from_tensor_slices((
        dict(train_encodings),
        training_labels
    ))

    val_dataset = tf.data.Dataset.from_tensor_slices((
        dict(val_encodings),
        validation_labels
    ))

    optimizer = tf.keras.optimizers.Adam(learning_rate=5e-5)

    model.compile(optimizer=optimizer,
                  loss=model.compute_loss,
                  metrics=['accuracy'])

    model.fit(train_dataset.shuffle(100).batch(16),
              epochs=epochs,
              batch_size=16,)

    model.save_pretrained(model_artifact.path)
    print(model_artifact.path)

    print('eval')
    evaluation = model.evaluate(val_dataset.shuffle(100).batch(16),
               batch_size=16)
    print(evaluation)

    smetrics.log_metric("accuracy", float(evaluation[1]))


## Serving Container
code for the container see https://github.com/SaschaHeyer/serving-custom-container

In [None]:
@component(packages_to_install=[
    "google-cloud-build==3.8.3",
    "google-api-python-client"])
def build_serving_container(model_artifact: Input[Model]) -> NamedTuple("Outputs", [("container", str)]):
    from google.cloud.devtools import cloudbuild
    from googleapiclient.discovery import build
    import time

    print('deploy.............')
    print(model_artifact.uri)

    client = cloudbuild.CloudBuildClient()
    build = cloudbuild.Build()


    # version is current timestamp
    version = str(int(time.time()))
    container = "gcr.io/sascha-playground-doit/sentiment-fast-api-test:{}".format(version)


    #todo get the model from the pipeline folder
    build.steps = [{"name": "gcr.io/cloud-builders/git",
                    "args": ["clone", "https://github.com/SaschaHeyer/serving-custom-container"]},
                   {"name": "gcr.io/cloud-builders/gsutil",
                    "args": ["cp", "-r", "gs://doit-vertex-demo/models/sentiment", "./serving-custom-container"]},
                   {"name": "gcr.io/cloud-builders/docker",
                    "args": ["build", "-t", container, "serving-custom-container" ]},
                   {"name": "gcr.io/cloud-builders/docker",
                    "args": ["push", container]}]

    #build.substitutions = {"_VERSION": version}

    operation = client.create_build(project_id="sascha-playground-doit", build=build)
    # Print the in-progress operation
    print("IN PROGRESS:")
    print(operation.metadata)

    result = operation.result()
    # Print the completed status
    print("RESULT:", result.status)

    return (container,)

## Deploy

In [None]:
@component(packages_to_install=["google-cloud-aiplatform==1.15.0"])
def deploy_model(
    project: str,
    region: str,
    container: str
):
  from google.cloud import aiplatform
  aiplatform.init(project=project, location=region)

  ENDPOINT_NAME = "sentiment"
  DISPLAY_NAME  = "sentiment"

  MODEL_TYPE = "query"
  MODEL_NAME = f"{MODEL_TYPE}_model"  # Used by the deployment container.

  def create_endpoint():
    print('create endpoint')
    endpoints = aiplatform.Endpoint.list(
      filter='display_name="{}"'.format(ENDPOINT_NAME),
      order_by='create_time desc',
      project=project,
      location=region,
    )

    if len(endpoints) > 0:
      endpoint = endpoints[0]
    else:
      endpoint = aiplatform.Endpoint.create(
        display_name=ENDPOINT_NAME, project=project, location=region
      )

    return endpoint

  models = aiplatform.Model.list(filter=("display_name={}").format(DISPLAY_NAME))

  PORT = 80
  HEALTH_ROUTE = "/health"
  PREDICT_ROUTE = "/predict"


  if len(models) == 0:
    # upload the initial model
    model_uploaded = aiplatform.Model.upload(
          display_name = DISPLAY_NAME,
          serving_container_image_uri = container,
          serving_container_health_route=HEALTH_ROUTE,
          serving_container_predict_route=PREDICT_ROUTE,
          serving_container_ports=[PORT]
    )
  else:
    #upload a new model version using the exiting model ressource ID
    parent_model = models[0].resource_name

    model_uploaded = aiplatform.Model.upload(
          parent_model = parent_model,
          display_name = DISPLAY_NAME,
          serving_container_image_uri = container,
          serving_container_health_route=HEALTH_ROUTE,
          serving_container_predict_route=PREDICT_ROUTE,
          serving_container_ports=[PORT]
    )

  endpoint = create_endpoint()

  model_deploy = model_uploaded.deploy(
        machine_type="n1-standard-4",
        endpoint=endpoint,
        traffic_split={"0": 100},
        deployed_model_display_name=DISPLAY_NAME,
  )

  # undeploy models without traffic for this specific endpoint
  for model in endpoint.list_models():
    print(model)
    if model.id not in endpoint.traffic_split:
      endpoint.undeploy(deployed_model_id = model.id)


## Pipeline

In [None]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT + "sentiment-pipeline",
    # A name for the pipeline. Use to determine the pipeline Context.
    name="sentiment-pipeline",
)
def pipeline(epochs:int):
    train_op = train_model(epochs).add_node_selector_constraint(
        label_name="cloud.google.com/gke-accelerator",
        value="NVIDIA_TESLA_T4").set_caching_options(False)

    # build custom serving container with latest model
    build_serving_container_op = build_serving_container(train_op.outputs["model_artifact"]).set_caching_options(False)

    # upload and deploy model to vertex ai
    deploy_op = deploy_model("sascha-playground-doit","us-central1", build_serving_container_op.outputs["container"])


## Compile

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path='sentiment_pipeline.json')

## Run

In [None]:
job = pipeline_jobs.PipelineJob(
    display_name="sentiment-pipeline",
    template_path="sentiment_pipeline.json",
    parameter_values={
        'epochs': 3
    }
)

job.submit(experiment="sentiment")

# What about artifacts that are not a dataset or model?

For that we can use outputPath which provides similar like the artifacts a path to Google Cloud Storage where we can store data. In any case you need to serialize any intermediate object from memory to save it as a file. In your next component you then can load the serialized file back to in memory object.

In [None]:
@component()
def first(output_path: OutputPath()):

  # everything that can be serialized to a file can be stored
  # you are not limited to the artifact types like dataset or model

  # common cases are tfidf

  animals = ['cat', 'dog']

  import pickle

  with open(output_path + "animals.pkl", 'wb') as file:
    pickle.dump(animals, file)

In [None]:
@component()
def second(input_path: InputPath()):

  import pickle
  file = open(input_path + "animals.pkl", 'rb')
  data = pickle.load(file)
  file.close()

  print(data)

In [None]:
@pipeline(name="output-path-pipeline",
          pipeline_root=PIPELINE_ROOT + "output-path-pipeline")
def output_pipeline():
    first_task = first()
    second_task = second(first_task.output)

In [None]:
compiler.Compiler().compile(
  pipeline_func=output_pipeline, package_path="output_pipeline.json"
)

In [None]:
job = pipeline_jobs.PipelineJob(
    display_name="output-pipeline",
    template_path="output_pipeline.json"
)

In [None]:
job.run(sync=False)

## Predefined Components

For a full list of pre-defined components see https://cloud.google.com/vertex-ai/docs/pipelines/gcpc-list

Predefined components depends on the use case.
