In [None]:
!python3 -m pip install kfp --upgrade --user

In [159]:
EXPERIMENT_NAME = 'Simple notebook pipeline'
BASE_IMAGE = 'tensorflow/tensorflow:2.0.0b0-py3'

In [160]:
import json
import numpy as np
import kfp
import kfp.dsl as dsl
from kfp import compiler
from kfp import components

In [205]:
@dsl.python_component(
    name='mult_op',
    description='multiplies two matrix',
    base_image=BASE_IMAGE
)
def matrix_mult(matrices: str, x: int, y: int) -> str:
    import json
    import numpy as np
    m = json.loads(matrices)
    print(m)
    X, Y = np.array(m[x]), np.array(m[y])
    Z = X @ Y
    print(f"{X}\n@\n{Y}\n=\n{Z}")
    return json.dumps(Z.tolist())

@dsl.python_component(
    name='disassemble_op',
    description='disassembles two matrix',
    base_image=BASE_IMAGE
)
def disassemble(X_:str, Y_: str) -> str:
    import json
    import numpy as np
    X, Y = np.array(json.loads(X_)), np.array(json.loads(Y_))
    A, B = X[0:(X.shape[0] // 2), : ], X[(X.shape[0] // 2):X.shape[0], : ]
    C, D = Y[:, 0:(Y.shape[1] // 2)], Y[:, (Y.shape[1] // 2):Y.shape[1]]
    return json.dumps([A.tolist(), B.tolist(), C.tolist(), D.tolist()])

@dsl.python_component(
    name='assemble_op',
    description='assembles results of multiplication',
    base_image=BASE_IMAGE
)
def assemble(A_:str, B_:str, C_:str, D_:str) -> str:
    import json
    import numpy as np
    A, B, C, D = np.array(json.loads(A_)), np.array(json.loads(B_)), np.array(json.loads(C_)), np.array(json.loads(D_))
    return json.dumps((np.block([[A, B], [C, D]])).tolist())

  after removing the cwd from sys.path.


In [206]:
mult_op = components.func_to_container_op(
    matrix_mult, 
    base_image=BASE_IMAGE
)
disassemble_op = components.func_to_container_op(
    disassemble, 
    base_image=BASE_IMAGE
)
assemble_op = components.func_to_container_op(
    assemble, 
    base_image=BASE_IMAGE
)

In [207]:
@dsl.pipeline(
   name='My pipeline',
   description='A pipeline for matrix multiplication.'
)
def calc_pipeline(
X, Y
):
    task1 = disassemble_op(X, Y)
    c1 = mult_op(task1.output, 0, 2)
    c2 = mult_op(task1.output, 0, 3)
    c3 = mult_op(task1.output, 1, 2)
    c4 = mult_op(task1.output, 1, 3)
    task2 = assemble_op(c1.output, c2.output, c3.output, c4.output)

In [208]:
pipeline_func = calc_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
compiler.Compiler().compile(pipeline_func, pipeline_filename)

In [209]:
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)

In [210]:
arguments = {'X': json.dumps([[1,2], [4,5]]), 'Y': json.dumps([[1,0], [0,1]])}

run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)