In [1]:
with open("requirements.txt", "w") as f:
    f.write("kfp==1.8.9\n")
    
!pip install -r requirements.txt  --upgrade --user



In [2]:
def add(a: float, b: float) -> float:
   '''Calculates sum of two arguments'''
   return a + b

from typing import NamedTuple
def my_divmod(dividend: float, 
              divisor: float,
             ) -> NamedTuple('MyDivmodOutput', [('quotient', float), ('remainder', float), 
                                                ('mlpipeline_ui_metadata', 'UI_metadata'), 
                                                ('mlpipeline_metrics', 'Metrics')]):
    
    import numpy as np
    import json
    '''Divides two numbers and calculate  the quotient and remainder'''
    
    (quotient, remainder) = np.divmod(dividend, divisor)
    
    # Exports a sample tensorboard:
    metadata = {
      'outputs' : [{
        'type': 'tensorboard',
        'source': 'gs://ml-pipeline-dataset/tensorboard-train',
      }]
    }

    # Exports two sample metrics:
    metrics = {
      'metrics': [{
          'name': 'quotient',
          'numberValue':  float(quotient),
        },{
          'name': 'remainder',
          'numberValue':  float(remainder),
        }]}

    from collections import namedtuple
    divmod_output = namedtuple('MyDivmodOutput', 
                               ['quotient', 'remainder', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return divmod_output(quotient, remainder, json.dumps(metadata), json.dumps(metrics))

In [3]:
import kfp
import kfp.components as comp
import kfp.dsl as dsl


add_op = comp.func_to_container_op(add)

divmod_op = comp.func_to_container_op(func=my_divmod, 
                                      base_image="tensorflow/tensorflow:1.15.0-py3")

@dsl.pipeline(
    name='Calculation pipeline',
    description='A toy pipeline that performs arithmetic calculations.'
)
def calc_pipeline(
    a='2',
    b='7',
    c='17',
):
    #Passing pipeline parameter and a constant value as operation arguments
    add_task = add_op(a, 4).set_cpu_request("100m") #Returns a dsl.ContainerOp class instance. 
    
    #Passing a task output reference as operation arguments
    #For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax
    divmod_task = divmod_op(add_task.output, b).set_cpu_request("100m")

    #For an operation with a multiple return values, the output references can be accessed using `task.outputs['output_name']` syntax
    result_task = add_op(divmod_task.outputs['quotient'], c).set_cpu_request("100m")

In [6]:
kfp.compiler.Compiler().compile(calc_pipeline, 'helloworld-3-1.zip')