In [1]:
from typing import NamedTuple

from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component

import google.cloud.aiplatform as aip

In [2]:
# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

import os
import sys

# If on Google Cloud Notebook, then don't execute this code
if not os.path.exists("/opt/deeplearning/metadata/env_version"):
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS ''

In [3]:
PROJECT_ID = "[your-project-id]" 
if PROJECT_ID == "" or PROJECT_ID is None or PROJECT_ID == "[your-project-id]":
    # Get your GCP project id from gcloud
    shell_output = ! gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID:", PROJECT_ID)
    
! gcloud config set project $PROJECT_ID

Project ID: even-impulse-332807
Updated property [core/project].


In [4]:
BUCKET = "gs://dkpipe"

In [5]:
PIPELINE_ROOT = "gs://dkpipe/pipeline_root"

In [6]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET)

In [7]:
@component(output_component_file="hw.yaml", base_image = "python:3.9")
def hello_world(text: str) -> str:
    print(text)
    return text

In [8]:
@component(packages_to_install = ["google-cloud-storage"])
def two_outputs(text: str) -> NamedTuple(
    "Outputs", [("output_one", str), ("output_two", str)
               ],):
    from google.cloud import storage
    o1 = f"output{text}"
    o2 = f"output{text}"
    print(f"{o1}, {o2}")
    return (o1, o2)

In [9]:
@component
def consumer(text1: str, text2: str, text3: str):
    print(f"text1 {text1}, text2 {text2}, text3 {text3}")

In [10]:
@dsl.pipeline(name= "hello-world-pipe", 
             description = "simple pipeline")
def pipeline(text: str = "hi everyone"):
    hw_task = hello_world(text)
    two_outputs_task = two_outputs(text)
    consumer_task = consumer(
    hw_task.output,
    two_outputs_task.outputs["output_one"],
    two_outputs_task.outputs["output_two"])

In [11]:
from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func = pipeline, package_path = "intro_pipeline.json".replace(" ", "_"))



In [None]:
job = aip.PipelineJob(
    display_name = "job1",
    template_path = "intro_pipeline.json".replace(" ", "_"),
    pipeline_root = PIPELINE_ROOT,
    enable_caching=False,
    )
job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/72552027929/locations/us-central1/pipelineJobs/hello-world-pipe-20211202085417
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/72552027929/locations/us-central1/pipelineJobs/hello-world-pipe-20211202085417')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/hello-world-pipe-20211202085417?project=72552027929
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/72552027929/locations/us-central1/pipelineJobs/hello-world-pipe-20211202085417 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/72552027929/locations/us-central1/pipelineJobs/hello-