In [1]:
import kfp
from kfp import dsl
from kfp.components import create_component_from_func, load_component_from_file

import sys

sys.path.insert(0, "..")
from constants import NAMESPACE, HOST, NAMESPACE
from utils import get_session_cookie

In [2]:
# Where all the runs belong to the pipeline reside in
EXPERIMENT_NAME = "mle-3-operators"

## Define pipeline components

In [4]:
# The 1st component to calculate a+b
def add(a: float, b: float) -> float:
    return a + b


# Other than using create_component_from_func, you can also use func_to_container_op (with an image arg),
# load_component_from_url and load_component_from_file, etc., as mentioned below
# https://www.kubeflow.org/docs/components/pipelines/v1/sdk/python-function-components/#getting-started-with-python-function-based-components
add_op = create_component_from_func(
    func=add,
    base_image="python:3.7",  # Base image to execute our code
    output_component_file="../../components/add/component.yaml",  # Optional
    packages_to_install=["pandas==0.24"],  # Optional
)


# The 2nd component to calculate a-b
def minus(a: float, b: float) -> float:
    print(a - b)
    return a - b


minus_op = create_component_from_func(
    func=minus,
    base_image="python:3.7",
    output_component_file="../../components/minus/component.yaml",  # Optional
    packages_to_install=["pandas==0.24"],  # Optional
)

## Define some pipelines

In [8]:
# Define a pipeline with 2 components: add and minus
# The minus component will be run only if it satisfies
# the condition on the sum's output: sum > 3
@dsl.pipeline(
    name="Condition", description="Run minus op if the condition is satisfied."
)
def add_and_minus():
    # A sample pipeline showing condition.

    # You can also pass args to this func as mentioned in the following example
    # https://www.kubeflow.org/docs/components/pipelines/v1/sdk/python-function-components/#getting-started-with-python-function-based-components
    add_task = add_op(1, 2)  # This task receives 2 args
    sum_output_ref = add_task.outputs["Output"]  # Get the output from the previous step

    # If the output of the sum task > 3, then run the minus task
    with dsl.Condition(sum_output_ref > 3):
        minus_task = minus_op(sum_output_ref, 4)

In [14]:
# Define another pipeline which allows to run 3 pairs
# in this pipeline, the add_task goes first, then minus_task
@dsl.pipeline(name="Parallel For", description="Run in parallel two add tasks.")
def parallel_add_and_minus():
    # A sample pipeline showing running tasks in parallel.

    # Other than parallelfor, you can also use exithandler for tasks such as notification
    # for example, if the pipeline fails. Please take a look at this example https://stackoverflow.com/a/57586000
    with dsl.ParallelFor(
        [{"a": 2, "b": 3}, {"a": 20, "b": 30}, {"a": 200, "b": 300}]
    ) as item:
        add_task = add_op(item.a, item.b)
        minus_task = minus_op(item.a, item.b)
        minus_task.after(add_task)  # Make minus task run after the add task


# You can also load all the components from YAML files
# which were exported before
# loaded_add_op = load_component_from_file("../../components/add/component.yaml")
# loaded_minus_op = load_component_from_file("../../components/minus/component.yaml")

# @dsl.pipeline(
#     name='Parallel For',
#     description='Run in parallel two add tasks.'
# )
# def parallel_add_and_minus():
#     # A sample pipeline showing running tasks in parallel.

#     with dsl.ParallelFor([{'a':2, 'b': 3}, {'a': 20, 'b': 30}, {'a': 200, 'b': 300}]) as item:
#         loaded_add_task_1 = loaded_add_op(item.a, item.b)
#         loaded_add_task_2 = loaded_add_op(item.a, item.b)
#         loaded_minus_task = loaded_minus_op(item.a, item.b)
#         loaded_minus_task.after(loaded_add_task_1, loaded_add_task_2) # Make minus task run after the add task

## Run the pipelines

In [6]:
# Get the token to authenticate to the `ml-pipeline` service
session_cookie = get_session_cookie()

# Initialize the client
client = kfp.Client(
    host=f"{HOST}/pipeline",
    cookies=f"authservice_session={session_cookie}",
    namespace=NAMESPACE,
)

In [9]:
# Create a pipeline from the above pre-defined function
client.create_run_from_pipeline_func(
    add_and_minus, arguments={}, experiment_name=EXPERIMENT_NAME, namespace=NAMESPACE
)

RunPipelineResult(run_id=06c3d6f0-eee4-436c-a895-0b99617c6fc3)

In [15]:
# Create another pipeline from the above pre-defined function
client.create_run_from_pipeline_func(
    parallel_add_and_minus,
    arguments={},
    experiment_name=EXPERIMENT_NAME,
    namespace=NAMESPACE,
)

RunPipelineResult(run_id=62d94aab-9df0-4a7a-8d93-eba6d21cdd1a)