In [None]:
!pip install google-cloud-pipeline-components

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [None]:
PROJECT_ID="dt-tu-sandbox-dev"
BUCKET_NAME="gs://ovo-demos"

REGION="europe-west2"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

In [None]:
import kfp

from kfp import compiler, dsl
from kfp.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from typing import NamedTuple

In [None]:
@component()
def product_name(text: str) -> str:
    return text

In [None]:
@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

In [None]:
@component(base_image="python:3.9")
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

In [None]:
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text=text)
    emoji_task = emoji(text=emoji_str)
    consumer_task = build_sentence(
        product=product_task.output,
        emoji=emoji_task.outputs["emoji"],
        emojitext=emoji_task.outputs["emoji_text"],
    )

In [None]:
compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

In [None]:
job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    enable_caching=True
)

In [None]:
job.submit(experiment="pipeline-experiment-intro")