# Control structures and more

This notebook shows how you can influence the flow of a kubeflow pipeline. It also puts some more components together, so you can see how that is done and on what basis. Most if not all of the code was utilized from the kubeflow samples available here:

https://github.com/kubeflow/pipelines/blob/master/samples/tutorials/Data%20passing%20in%20python%20components/Data%20passing%20in%20python%20components%20-%20Files.py

First, the usual imports and connecting with the client.

In [1]:
import kfp
from kfp import dsl
from kfp.components import func_to_container_op, InputPath, OutputPath

host='http://localhost:8080'
client = kfp.Client(host=host)

And the pipeline code. All it does is generate a random number and on the basis of the outcome run either the left or the right half of the pipeline.

What you should notice (and question) is how kubeflow knows how to put the pipeline components in the right order. This is actually done on the basis of hooking up the outputs of components into the inputs of other components. You'll see in this example how that works.

In [2]:
# This is a simple op/task that flips a coin, implemented as a python function.
@func_to_container_op
def flip_coin_op() -> str:
    """Flip a coin and output heads or tails randomly."""
    import random
    result = random.choice(['heads', 'tails'])
    print(result)
    return result


# This will be implemented as a python function, accepting two parameters and outputting an integer.
@func_to_container_op
def get_random_int_op(minimum: int, maximum: int) -> int:
    """Generate a random number between minimum and maximum (inclusive)."""
    # When imports are needed, they have to be embedded in the function. The function "body" will be copied
    # into the yaml, so everything including the imports have to be in there.
    import random
    result = random.randint(minimum, maximum)
    print(result)
    return result


# A task that repeats the input line and writes it to a text file. This component can be thought of
# as generating an "artifact" in a ML pipeline.
# 
# The "OutputPath" is a special type in kubeflow and is treated differently in the execution.
# and can be used as input parameter for another function, as we will see in the next example.
@func_to_container_op
def repeat_line(line: str, output_text_path: OutputPath(str), count: int = 10):
    '''Repeat the line specified number of times'''
    with open(output_text_path, 'w') as writer:
        for i in range(count):
            writer.write(line + '\n')


# This accepts an "InputPath", which typically should be the OutputPath of another function.
@func_to_container_op
def print_text(text_path: InputPath()):
    with open(text_path, 'r') as reader:
        for line in reader:
            print(line, end = '')


# The pipeline definition. 
# Here, we run 'repeat-task' first, which generates some input data for us.
# print_task then gets passed the output location of the first task and prints each line to console.
@dsl.pipeline(
    name='Control structures and more',
    description='Generates numbers in a control flow.'
)
def control_structures_and_more():
    flip = flip_coin_op()
    with dsl.Condition(flip.output == 'heads'):
        random_num_head = get_random_int_op(10, 50)
        with dsl.Condition(random_num_head.output > 30):
            repeat_line_op = repeat_line(line="Heads more than 30", count=random_num_head.output)
            print_text_op = print_text(repeat_line_op.output)
        with dsl.Condition(random_num_head.output <= 30):
            repeat_line_op = repeat_line(line="Heads less than or equal to 30", count=random_num_head.output)
            print_text_op = print_text(repeat_line_op.output)

    with dsl.Condition(flip.output == 'tails'):
        random_num_head = get_random_int_op(10, 50)
        with dsl.Condition(random_num_head.output > 30):
            repeat_line_op = repeat_line(line="Tails more than 30", count=random_num_head.output)
            print_text_op = print_text(repeat_line_op.output)
        with dsl.Condition(random_num_head.output <= 30):
            repeat_line_op = repeat_line(line="Tails less than or equal to 30", count=random_num_head.output)
            print_text_op = print_text(repeat_line_op.output)

Compiling it to verify syntax and also having a look at the yaml output:

In [3]:
kfp.compiler.Compiler().compile(control_structures_and_more, 'control_structures_and_more.yaml')

And we submit the pipeline to kubeflow and trigger it:

In [4]:
client.create_run_from_pipeline_func(control_structures_and_more, arguments={})

RunPipelineResult(run_id=8d69cf9a-0c83-414c-9785-431ff0df70fb)

Follow the run link and check out the pipeline run. You'll see in "Input/Output" the generated files and when you click on them it shows the contents. This is because we passed the file as "InputPath" and "OutputPath" types, so that kubeflow knows it should retrieve the contents of the file and not just show a path to a file.