# Install packages

In [1]:
# ! pip install kfp

# Restart kernel

In [2]:
import os
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

{'status': 'ok', 'restart': True}

In [2]:
from typing import NamedTuple
from datetime import datetime

import google.cloud.aiplatform as aip
from kfp.v2.dsl import component
from kfp import dsl
from kfp.v2 import compiler

# Variables

In [3]:
REGION = "europe-west1"
BUCKET_NAME = "my-simple-pipeline"

In [4]:
BUCKET_URI = f"gs://{BUCKET_NAME}"
PIPELINE_ROOT = f"{BUCKET_URI}/pipeline_root/simple"

In [5]:
# Get some variables from gcloud

shell_output = ! gcloud auth list 2>/dev/null
SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

shell_output = ! gcloud config list --format 'value(core.project)' 2>/dev/null
PROJECT_ID = shell_output[0]

In [6]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
PIPELINE_DISPLAY_NAME = "simple_" + TIMESTAMP

# Create a Cloud Storage bucket

In [7]:
! gcloud storage buckets create $BUCKET_URI --location=$REGION

Creating gs://my-simple-pipeline/...
[1;31mERROR:[0m (gcloud.storage.buckets.create) HTTPError 409: Your previous request to create the named bucket succeeded and you already own it.


# Initialize Vertex AI SDK for Python

In [8]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

# Define pipeline components

In [9]:
@component(output_component_file="hw.yaml", base_image="python:3.9")
def hello_world(text: str) -> str:
    print(text)
    return text

In [10]:
@component(packages_to_install=["google-cloud-storage"])
def two_outputs(text: str) -> NamedTuple("Outputs", [("output_one", str), ("output_two", str)]):
    from google.cloud import storage
    o1 = f"output one from text: {text}"
    o2 = f"output two from text: {text}"
    print(f"output_one: {o1}, output_two: {o2}")
    return (o1, o2)

In [11]:
@component
def consumer(text1: str, text2: str, text3: str):
    print(f"text1: {text1}, text2: {text2}, text3: {text3}")

# Define a pipeline that uses the components

In [12]:
@dsl.pipeline(
    name="hello-world-pipeline",
    description="A simple pipeline",
    pipeline_root=PIPELINE_ROOT
)
def pipeline(text: str = "hi there"):
    hw_task = hello_world(text)
    two_outputs_task = two_outputs(text)
    consumer_task = consumer(
        hw_task.output,
        two_outputs_task.outputs["output_one"],
        two_outputs_task.outputs["output_two"]
    )

# Compile pipeline

In [13]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="simple_pipeline.json")



# Run pipeline

In [19]:
job = aip.PipelineJob(
    display_name=PIPELINE_DISPLAY_NAME,
    template_path="simple_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    location=REGION
)

In [20]:
%%time
job.run(service_account=SERVICE_ACCOUNT)

Creating PipelineJob
PipelineJob created. Resource name: projects/308578548194/locations/europe-west1/pipelineJobs/hello-world-pipeline-20221112191733
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/308578548194/locations/europe-west1/pipelineJobs/hello-world-pipeline-20221112191733')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-west1/pipelines/runs/hello-world-pipeline-20221112191733?project=308578548194
PipelineJob run completed. Resource name: projects/308578548194/locations/europe-west1/pipelineJobs/hello-world-pipeline-20221112191733
CPU times: user 49.8 ms, sys: 8.4 ms, total: 58.2 ms
Wall time: 5.59 s


# Delete the pipeline job

In [17]:
job.delete()

Deleting PipelineJob : projects/308578548194/locations/europe-west1/pipelineJobs/hello-world-pipeline-20221112191702
Delete PipelineJob  backing LRO: projects/308578548194/locations/europe-west1/operations/6287342838669639680
PipelineJob deleted. . Resource name: projects/308578548194/locations/europe-west1/pipelineJobs/hello-world-pipeline-20221112191702


# 🧹 Clean up

In [21]:
delete_pipeline = True
delete_bucket = True

try:
    if delete_pipeline and "DISPLAY_NAME" in globals():
        pipelines = aip.PipelineJob.list(
            filter=f"display_name={DISPLAY_NAME}", order_by="create_time"
        )
        pipeline = pipelines[0]
        aip.PipelineJob.delete(pipeline.resource_name)
        print("Deleted pipeline:", pipeline)
except Exception as e:
    print(e)

if delete_bucket or os.getenv("IS_TESTING"):
    ! gsutil rm -r $BUCKET_URI

Removing gs://my-simple-pipeline/pipeline_root/#1668276958482306...
Removing gs://my-simple-pipeline/pipeline_root/simple/#1668276958604632...      
Removing gs://my-simple-pipeline/pipeline_root/simple/308578548194/#1668276958719416...
Removing gs://my-simple-pipeline/pipeline_root/simple/308578548194/hello-world-pipeline-20221112180440/#1668276958833891...
/ [4 objects]                                                                   
==> NOTE: You are performing a sequence of gsutil operations that may
run significantly faster if you instead use gsutil -m rm ... Please
see the -m section under "gsutil help options" for further information
about when gsutil -m can be advantageous.

Removing gs://my-simple-pipeline/pipeline_root/simple/308578548194/hello-world-pipeline-20221112180440/consumer_7519496250685652992/#1668277051676999...
Removing gs://my-simple-pipeline/pipeline_root/simple/308578548194/hello-world-pipeline-20221112180440/consumer_7519496250685652992/executor_output.json#