### Helper function to compile and run the pipeline job

In [None]:
def run_pipeline_job(p_name, p_func):
    from datetime import datetime
    from kfp.v2 import compiler
    from kfp.v2.google.client import AIPlatformClient

    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

    PROJECT_ID = 'cloud-sandbox-danielw'
    BUCKET = 'gs://ai-demo-uscentral'
    REGION = "us-central1"

    PIPELINE_DEF = p_name + ".json"
    PIPELINE_ROOT = "{bucket}/pipeline_root/{pipeline}/{timestamp}".format(
        bucket = BUCKET, 
        pipeline = p_name, 
        timestamp = TIMESTAMP
    )

    # compile the pipeline
    compiler.Compiler().compile(
        pipeline_func = p_func, 
        package_path = PIPELINE_DEF
    )

    # connect to Vertex AI platform
    api_client = AIPlatformClient(
        project_id = PROJECT_ID,
        region = REGION,
    )

    # submit the pipeline
    response = api_client.create_run_from_job_spec(
        job_spec_path = PIPELINE_DEF, 
        pipeline_root = PIPELINE_ROOT
    )
    
    return response

### Conditional execution
You can use the `with dsl.Condition(task1.outputs["output_name"] = "value"):` context to execute parts of the pipeline conditionally

In [None]:
from typing import NamedTuple

import kfp
from kfp import dsl
from kfp.components import func_to_container_op, InputPath, OutputPath

In [None]:
@func_to_container_op
def get_random_int_op(minimum: int, maximum: int) -> int:
    """Generate a random number between minimum and maximum (inclusive)."""
    import random
    result = random.randint(minimum, maximum)
    print(result)
    return result

@func_to_container_op
def flip_coin_op() -> str:
    """Flip a coin and output heads or tails randomly."""
    import random
    result = random.choice(['heads', 'tails'])
    print(result)
    return result

@func_to_container_op
def print_op(message: str):
    """Print a message."""
    print(message)

In [None]:
@dsl.pipeline(
    name='conditional-pipeline',
    description='Shows how to use dsl.Condition().'
)
def conditional_pipeline():
    flip = flip_coin_op()
    with dsl.Condition(flip.output == 'heads', name = "heads"):
        random_num_head = get_random_int_op(0, 9)
        with dsl.Condition(random_num_head.output > 5):
            print_op('heads and %s > 5!' % random_num_head.output)
        with dsl.Condition(random_num_head.output <= 5):
            print_op('heads and %s <= 5!' % random_num_head.output)

    with dsl.Condition(flip.output == 'tails', name = "tails"):
        random_num_tail = get_random_int_op(10, 19)
        with dsl.Condition(random_num_tail.output > 15):
            print_op('tails and %s > 15!' % random_num_tail.output)
        with dsl.Condition(random_num_tail.output <= 15):
            print_op('tails and %s <= 15!' % random_num_tail.output)

In [None]:
result = run_pipeline_job("conditional_pipeline", conditional_pipeline)

### Exit handlers
You can use `with dsl.ExitHandler(exit_task):` context to execute a task when the rest of the pipeline finishes (succeeds or fails)

In [None]:
@func_to_container_op
def fail_op(message: str):
    """Fails."""
    import sys
    print(message)    
    sys.exit(1)


@dsl.pipeline(
    name='exit-handler-pipeline',
    description='Shows how to use dsl.Condition() and dsl.ExitHandler().'
)
def exit_handler_pipeline():
    exit_task = print_op('Exit handler has worked!')
    
    with dsl.ExitHandler(exit_task):
        flip = flip_coin_op()
        
        with dsl.Condition(flip.output == 'heads'):
            fail_op("Failing the run to demonstrate that exit handler still gets executed.")

        with dsl.Condition(flip.output == 'tails'):
            fail_op("Failing the run to demonstrate that exit handler still gets executed.")

In [None]:
result = run_pipeline_job("exit_handler_pipeline", exit_handler_pipeline)

### Loop
You can use `with dsl.ParallelFor():` context to loop through a list or array and execute tasks in parallel

In [None]:
from kfp import components

@components.create_component_from_func
def args_generator_op() -> str:
    return '[1.1, 1.2, 1.3]'


@components.create_component_from_func
def print_op(s: float):
    print(s)


@dsl.pipeline(name='pipeline-with-loop-output')
def loop_pipeline():
    args_generator = args_generator_op()
    with dsl.ParallelFor(args_generator.output) as item:
        print_op(item)

In [None]:
result = run_pipeline_job("loop_pipeline", loop_pipeline)

### Loop with after
You can use `with dsl.ParallelFor():` and `task.after()` context to loop through a list or array and execute tasks in parallel

In [None]:
from kfp import components
from kfp import dsl
from typing import List

@components.create_component_from_func
def print_op(text: str) -> str:
    print(text)
    return text


@components.create_component_from_func
def concat_op(a: str, b: str) -> str:
    print(a + b)
    return a + b

@components.create_component_from_func
def generate_op() -> str:
    import json
    return json.dumps([{'a': i, 'b': i * 10} for i in range(1, 5)])

@dsl.pipeline(name='pipeline-with-loop-parameter')
def loop_after_pipeline(greeting:str='this is a test for looping through parameters'):
    print_task = print_op(text=greeting)

    generate_task = generate_op()
    with dsl.ParallelFor(generate_task.output) as item:
        concat_task = concat_op(a=item.a, b=item.b)
        concat_task.after(print_task)
        print_task_2 = print_op(concat_task.output)

In [None]:
result = run_pipeline_job("loop_after_pipeline", loop_after_pipeline)

### Loop with subgraph
You can use `with dsl.ParallelFor():` and `with dsl.SubGraph()` context to loop through a list or array and execute tasks in parallel

In [None]:
@kfp.components.create_component_from_func
def print_op(s: str):
    import time
    time.sleep(3)
    print(s)

@dsl.pipeline(name='pipeline-subgraph')
def subgraph_pipeline():
    loop_args = [{'A_a': 1, 'B_b': 2}, {'A_a': 10, 'B_b': 20}]
    with dsl.SubGraph(parallelism=2):
        with dsl.ParallelFor(loop_args) as item:
            print_op(item)
            print_op(item.A_a)
            print_op(item.B_b)

In [None]:
result = run_pipeline_job("subgraph_pipeline", subgraph_pipeline)

### A three-step pipeline with first two running in parallel.

In [None]:
from kfp import components
from kfp import dsl
from typing import List

@components.create_component_from_func
def print_op(text: str) -> str:
    print(text)
    return text

@components.create_component_from_func
def concat_op(a: str, b: str) -> str:
    print(a + b)
    return a + b

@dsl.pipeline(
  name='join-pipeline',
  description='run in parallel and prints the concatenated result.'
)
def join_pipeline():
    """A three-step pipeline with first two running in parallel."""

    print_task_1 = print_op("task 1")
    print_task_2 = print_op("task 2")

    concat_task = concat_op(print_task_1.output, print_task_2.output)
    
    print_task_3 = print_op(concat_task.output)
    print_task_3.after(concat_task)

In [None]:
result = run_pipeline_job("join_pipeline", join_pipeline)

### Pipeline Parallelism

dsl.get_pipeline_conf().set_parallelism(1)

In [None]:
import kfp
from kfp import dsl
import kfp.components as comp

@comp.create_component_from_func
def print_op(msg: str):
    """Print a message."""
    print(msg)

@dsl.pipeline(
    name='pipeline-parallelism',
    description='The pipeline shows how to set the max number of parallel pods in a pipeline.'
)
def pipeline_parallelism():
    op1 = print_op('hey, what are you up to?')
    op2 = print_op('train my model.').
    dsl.get_pipeline_conf().set_parallelism(1)

In [None]:
result = run_pipeline_job("pipeline_parallelism", pipeline_parallelism)

### add_pod_annotation and add_op_transformer

In [None]:
import kfp
from kfp import dsl
import kfp.components as comp

@comp.create_component_from_func
def print_op(msg: str):
    """Print a message."""
    print(msg)

def add_annotation(op):
  op.add_pod_annotation(name='hobby', value='football')
  return op

@dsl.pipeline(
    name='pipeline-transformer',
    description='The pipeline shows how to apply functions to all ops in the pipeline by pipeline transformers'
)
def transform_pipeline():
  op1 = print_op('hey, what are you up to?').set_display_name("Display Name")
  op2 = print_op('train my model.').add_pod_annotation(name='tag', value='print task 2')
  # dsl.get_pipeline_conf().add_op_transformer(add_annotation)

In [None]:
result = run_pipeline_job("transform_pipeline", transform_pipeline)

### Retry

In [None]:
import kfp
from kfp import dsl
import kfp.components as comp

@comp.create_component_from_func
def random_failure_op(exit_codes: str):
    """A component that fails randomly."""
    import random
    import sys
    
    exit_code = int(random.choice(exit_codes.split(","))) 
    print(exit_code)
    sys.exit(exit_code)

@dsl.pipeline(
    name='retry-random-failures',
    description='The pipeline includes two steps which fail randomly. It shows how to use ContainerOp(...).set_retry(...).'
)
def retry_pipeline():
    op1 = random_failure_op('0,1,2,3').set_retry(10)
    op2 = random_failure_op('0,1').set_retry(5)

In [None]:
result = run_pipeline_job("retry_pipeline", retry_pipeline)