In [1]:
!python3 -m pip install kfp --upgrade

Defaulting to user installation because normal site-packages is not writeable
Requirement already up-to-date: kfp in ./.local/lib/python3.6/site-packages (0.2.5)


In [5]:
import kfp
import kfp.gcp
import kfp.dsl as dsl
import kfp.compiler
import kfp.components
from typing import NamedTuple

from kfp.components import InputPath, InputTextFile, OutputPath, OutputTextFile
from kfp.components import func_to_container_op

In [6]:
client = kfp.Client()

## Small data

### Component 1 - Consuming data and print

In [8]:
@func_to_container_op
def print_small_text(text: str):
    '''Print small text'''
    print(text)


### Component2 - Producing data

In [9]:
@func_to_container_op
def produce_one_small_output() -> str:
    return 'Hello world'

### Define Pipeline

In [10]:
@dsl.pipeline(
    name='Small Data Passing',
    description='A demo for small data passing.'
)
def task_output_to_consumer_pipeline():
    '''Pipeline that passes small data from producer to consumer'''
    produce_task = produce_one_small_output()
    # Passing producer task output as argument to consumer
    consume_task1 = print_small_text(produce_task.output) # task.output only works for single-output components
    consume_task2 = print_small_text(produce_task.outputs['output']) # task.outputs[...] always works

### Run Pipeline

In [12]:
pipeline_func = task_output_to_consumer_pipeline
experiment_name = 'python-functions'

#Specify pipeline argument values
arguments = {}

run_name = pipeline_func.__name__ + ' run'

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)

## Bigger data (files)

- InputPath: tell the system that the function wants to consume the corresponding input data as a file
- OutputPath: tell the system that the function wants to produce the corresponding output data as a file.

**paths for the input and output files are chosen by the system and are passed into the function (as strings).**

You can specify the **type argument** to InputPath and OutputPath ( help users to do type match )

- OutputPath('TFModel') means that the function states that the data it has written to a file has type 'TFModel'
- InputPath('TFModel') means that the function states that it expect the data it reads from a file to have type 'TFModel'


### Example: Pipeline that generates then sums many numbers

the input and output names generally follow the parameter names, 
but the **"_path"** and **"_file"** suffixes are stripped from file/path inputs and outputs. 
E.g. the `number_file_path: InputPath(int)` parameter becomes the `number: int` input. This makes the argument passing look more natural: number=42 instead of number_file_path=42.

In [17]:
# Reading bigger data
@func_to_container_op
def print_text(text_path: InputPath()): # The "text" input is untyped so that any data can be printed
    '''Print text'''
    with open(text_path, 'r') as reader:
        for line in reader:
            print(line, end = '')

# Writing many numbers
@func_to_container_op
def write_numbers(numbers_path: OutputPath(str), start: int = 0, count: int = 10):
    with open(numbers_path, 'w') as writer:
        for i in range(start, count):
            writer.write(str(i) + '\n')


# Reading and summing many numbers
@func_to_container_op
def sum_numbers(numbers_path: InputPath(str)) -> int:
    sum = 0
    with open(numbers_path, 'r') as reader:
        for line in reader:
            sum = sum + int(line)
    return sum


@dsl.pipeline(
    name='Sum numbers',
    description='A pipeline to sum 100000 numbers.'
)
def sum_pipeline(count: 'Integer' = 100000): 
    # Pipeline to sum 100000 numbers
    numbers_task = write_numbers(count=count)
    print_text(numbers_task.output)

    sum_task = sum_numbers(numbers_task.outputs['numbers'])  # argument: numbers_path, _path is removed
    print_text(sum_task.output)


### Run Pipeline

In [18]:
pipeline_func = sum_pipeline
experiment_name = 'python-functions'

#Specify pipeline argument values
arguments = {}

run_name = pipeline_func.__name__ + ' run'

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)