In [13]:
# small value summary
import kfp
from kfp.components import create_component_from_func as ccff
from typing import NamedTuple # import outside function and use inline

def sum2dList(L:list) -> float:
    '''Calculates sum over 2-d list'''
    return sum( [ sum(row) for row in L ] )

def sum2dDict(D:dict) -> float:
    '''Calculate sum over value list'''
    return sum( [ sum(row) for row in D.values() ] )

def mydivmod(dividend:float, divisor:float) -> NamedTuple('MyDivmodOutput',
    [('quotient', float), ('remainder', float)]):
    '''Calculate the quotient and the remainder'''
    import numpy as np
    def divmod_helper(dividend, divisor):
        return np.divmod(dividend, divisor)
    quotient, remainder = divmod_helper(dividend, divisor)
    from collections import namedtuple # import inside function and use before output
    return namedtuple('MyDivmodOutput', ['quotient', 'remainder'])(quotient, remainder)

def add1(x:float) -> float: # cannot use lambda function
    return x+1

list_comp = ccff(sum2dList, output_component_file='list_component.yaml')
dict_comp = ccff(sum2dDict, output_component_file='dict_component.yaml')
divmod_comp = ccff(mydivmod, output_component_file='divmod_component.yaml', base_image='python:3.6', \
    packages_to_install=["numpy==1.19.5"])
add1_comp = ccff(add1, output_component_file='add1_component.yaml')

@kfp.dsl.pipeline(name='Addition pipeline', description='An example pipeline.')
def add_pipeline(L:list, D:dict):
    list_task = list_comp(L)
    dict_task = dict_comp(D)
    divmod_task = divmod_comp(list_task.output, dict_task.output)
    noop1_task = add1_comp(divmod_task.outputs["quotient"])
    noop2_task = add1_comp(divmod_task.outputs["remainder"])
    
client = kfp.Client("http://127.0.0.1:8080")
client.create_run_from_pipeline_func(add_pipeline, arguments={'L':[[1.0,2.0],[4.0,5.0]], \
    'D':{"A":[1,2],"B":[4,5]}})

RunPipelineResult(run_id=da214015-888e-4659-ab84-2815b33ae4d5)

In [14]:
# big value summary
import kfp
from kfp.components import create_component_from_func as ccff

def write_csv()->float:
    import pandas as pd
    df = pd.DataFrame({"a":[1,2,3], "b":[4,5,6]})
    df.to_csv("/mnt/c/users/James/Desktop/code/work/kf/build_from_sdk_zfinal/example.csv")
    return 0

def print_csv(x:float)->None:
    import pandas as pd
    print( pd.read_csv("/mnt/c/users/James/Desktop/code/work/kf/build_from_sdk_zfinal/example.csv") )
    
write_csv_comp = ccff(write_csv, output_component_file='write_csv_component.yaml', packages_to_install=["pandas"]) 
print_csv_comp = ccff(print_csv, output_component_file='print_csv_component.yaml', packages_to_install=["pandas"])

@kfp.dsl.pipeline(name='csv_pipeline', description='passing big value.')
def big_pipeline():
    vop = dsl.VolumeOp(name="create-pvc", resource_name="my-pvc",
        modes=dsl.VOLUME_MODE_RWO, size="1Gi")
    wc_task = write_csv_comp().add_pvolumes({\
        "/mnt/c/users/James/Desktop/code/work/kf/build_from_sdk_zfinal/": vop.volume})
    pc_task = print_csv_comp(wc_task.output).add_pvolumes({\
        "/mnt/c/users/James/Desktop/code/work/kf/build_from_sdk_zfinal/": vop.volume})
    
client = kfp.Client("http://127.0.0.1:8080")
client.create_run_from_pipeline_func(big_pipeline, arguments={})

RunPipelineResult(run_id=17c0a865-12cb-43f7-945c-a1ddc0c29194)