In [1]:
import sys
import os
import tempfile
import importlib

def convert_source_to_func(function_source, component_name):
    '''`kfp` module uses `inspect.getsource()` method which won't work unless 
    function's source code is loaded from a file '''
    tmp_dir = tempfile.gettempdir()
    comp_tmp_dir= os.path.join(tmp_dir, 'component_functions')
    
    if not os.path.exists(comp_tmp_dir):
        os.mkdir(comp_tmp_dir)
        open(os.path.join(comp_tmp_dir, '__init__.py'), 'a').close()
    func_filepath = os.path.join(comp_tmp_dir, '%s.py' % component_name)
    with open(func_filepath, 'w') as f:
        f.write(function_source)

    sys.path.append(comp_tmp_dir)
    task_module = importlib.import_module(component_name)
    importlib.reload(task_module)
    return getattr(task_module, component_name)

# src = '''
# def aaa(x: int) -> int:
#     return x
# '''
# convert_source_to_func(src, 'aaa')(7)

In [2]:
import os
import re
import jupytext
import papermill

import nbconvert
from nbconvert import HTMLExporter
from traitlets.config import Config
import kfp.components as comp
import kfp.dsl as dsl

from jupytext.cli import jupytext
import pickle

import importlib # to import saved funcions

######################

import kfp
from kfp.containers import build_image_from_working_dir
from kfp.components import func_to_container_op

from kfp.components import InputPath, InputTextFile, OutputPath, OutputTextFile
#from kfp.components._structures import  ComponentSpec, InputSpec, OutputSpec

from collections import namedtuple
from typing import NamedTuple

from extra_code import KFNotebookRunner, exec_nb

In [3]:
class ExtraCodeBuilder:
    def __init__(self):
        self._extra_code = ''
        
    def inject_notebook(self, notebook_path):
        with open(notebook_path, 'r') as f:
            self._extra_code += '\nnotebook_source = \'\'\'' + f.read() + '\'\'\'\n\n'
    
    def add_extra_code(self, extra_code_path='extra_code.py'):
        with open(extra_code_path, 'r') as f:
            self._extra_code += '\n' + f.read() + '\n'
            
    @property
    def get_code(self):
        return self._extra_code

# extra_code = ExtraCodeBuilder().get_code 

In [6]:
#TODO: Add typing to class

class NbComponentBuilder:
    def __init__(self, op_name, extra_code_instance=None, inject_notebook_path=None, 
                 remote_notebook_path=None, remove_nb_inputs=False):
        '''
        @@@@@@@ ADD DOC @@@@@@@
        '''
        assert inject_notebook_path or remote_notebook_path, \
            'You need to provide either path to google storage or local filename path ' + \
            'of the notebook that will be injected into component'
        assert not (inject_notebook_path and remote_notebook_path), \
            'Choose either notebook source or path, can\'t do both.'     
        self.op_name = re.sub(r'[^a-zA-Z0-9_]+', '', op_name.replace(' ','-').lower())    
        self.input_params = []
        self.output_params = []
        self.input_artifacts = []
        self.output_artifacts = []
        self.extra_code_instance = extra_code_instance
        self.inject_notebook_path = inject_notebook_path
        self.remote_notebook_path = remote_notebook_path
        self.remove_nb_inputs = remove_nb_inputs
        
        if self.inject_notebook_path:
            if not self.extra_code_instance:
                self.extra_code_instance = ExtraCodeBuilder()
            self.extra_code_instance.inject_notebook(self.inject_notebook_path)
    
    def add_input_param(self, param_name, param_type, default_value=None): 
        self.input_params.append({
            'param_name': param_name,
            'param_type': param_type,
            'default_value': default_value
        })
        
    def add_output_param(self, param_name, param_type): 
        self.output_params.append({
            'param_name': param_name,
            'param_type': param_type
        })
        
    def add_input_artifact(self, name): 
        self.input_artifacts.append(name)
        
    def add_output_artifact(self, name):
        self.output_artifacts.append(name)        
    
    def build_component_function(self):
        def input_param_to_str(p):
            s = '%s: %s' % (p['param_name'], p['param_type'].__name__)
            if p.get('default_value'):
                s += ' = %s' % p['default_value']
            return s        
                    
        func_body = '''
    
    input_params = {input_params}
    output_params = {output_params}
    use_injected_nb_source_code = {inject_code}
    remote_notebook_path = '{remote_notebook_path}'
    ouput_artifacts = [{out_artif}]
    input_artifacts = [{in_artif}]
    remove_nb_inputs = {remove_nb_inputs}
    
    return exec_nb(locals(), input_params, output_params, use_injected_nb_source_code, 
                   remote_notebook_path, ouput_artifacts, input_artifacts, remove_nb_inputs)

'''

        func_body = func_body.format(
            input_params=self.notebook_inputs_params, 
            output_params=[p['param_name'] for p in self.output_params],
            inject_code=True if self.inject_notebook_path else False, 
            remote_notebook_path=self.remote_notebook_path if self.remote_notebook_path else '',
            out_artif=', '.join(["('{a}', {a})".format(a=a) for a in self.output_artifacts]) 
                      if self.output_artifacts else '',
            in_artif=', '.join(["('{a}', {a})".format(a=a) for a in self.input_artifacts]) 
                      if self.input_artifacts else '',
            remove_nb_inputs='True' if self.remove_nb_inputs else 'False'
        )
        #print(func_body)

        args_str = []
        args_str = ['%s: OutputPath(str)' % p for p in self.output_artifacts]
        args_str += ['%s: InputPath()' % p for p in self.input_artifacts]
        default_sorted_input_params = [i for i in self.input_params if not i.get('default_value')] + \
                                      [i for i in self.input_params if i.get('default_value')]
        args_str += [input_param_to_str(p) for p in default_sorted_input_params]
        args_str = ', '.join(args_str)
#         print(args_str)
        
        # ouput_artifacts don't have to be put here, they are being 
        # outputed by adding OutputPath param on function's input
        #TODO: Add ('mlpipeline_metrics', 'Metrics')
        tuple_params = ["('%s', %s)" % (p['param_name'], p['param_type'].__name__) for p in self.output_params]
        
        return_str = "NamedTuple('TaskOutput', [('mlpipeline_ui_metadata', 'UI_metadata'), ('mlpipeline_metrics', 'Metrics'), %s])"
        return_str = return_str % ', '.join(tuple_params)
#         print(tuple_params)
#         print(return_str)
        
        func_source = 'from kfp.components import InputPath, OutputPath\n' 
        func_source += 'from typing import NamedTuple\n\n'
        func_source += f'def {self.op_name}({args_str}) -> {return_str}:\n{func_body}'
        print(func_source)
        return func_source

    def exec_component_function(self):
        function_source = self.build_component_function()
        return convert_source_to_func(function_source, self.op_name)
    
    def build_op(self, base_image, packages_to_install=[], *args, **kwargs):
        task_op = comp.func_to_container_op(
            self.exec_component_function(), 
            base_image=base_image,
            packages_to_install=packages_to_install,
            extra_code=self.extra_code_instance.get_code,
            *args, **kwargs
        )
        return task_op                     
                     
    @property
    def notebook_inputs_params(self): 
        'Returns inputs list formatted as a string'
        input_names = [i['param_name'] for i in self.input_params] + self.input_artifacts ####### ???
        return '[' + ', '.join(["\'%s\'" % n for n in input_names]) + ']'
    
    @property
    def notebook_source_code(self):
        with open(self.notebook_path_to_inject, 'r') as f:
            return r.read()


In [8]:
base_image = 'asia.gcr.io/ppb-services/tensorflow-1.14.0-py3-jupytext-papermill:latest'
packages_to_install = ['pandas', 'matplotlib']
extra_code = ExtraCodeBuilder()
extra_code.add_extra_code('extra_code.py')
nb = NbComponentBuilder(
    'Task 1', 
    inject_notebook_path='notebook_code.py', 
    extra_code_instance=extra_code,
    remove_nb_inputs=True)

nb.add_input_param('a', int, 1)
nb.add_input_param('b', int, 2)
nb.add_input_param('c', int, None)
nb.add_output_param('d', int)
nb.add_output_artifact('x')
first_op = nb.build_op(
    base_image=base_image, 
    packages_to_install=packages_to_install)

# print("SIGNATURE: ", inspect.signature(func))
# # func('input_artifact.txt', 3, 4) # with artifact
# # func(3, 4, 5)
# ################ 

nb = NbComponentBuilder('Task 2', inject_notebook_path='notebook_code2.py',
                        extra_code_instance=extra_code)
nb.add_input_artifact('o')
second_op = nb.build_op(
    base_image=base_image, 
    packages_to_install=packages_to_install)
######

# Define pipeline
@dsl.pipeline(name='Notebook pipeline example')
def notebook_run_pipeline(
    a='a', b='b', c='c'
):
    op1 = first_op(a=a, b=b, c=c)    
    second_op(o=op1.outputs['x'])    

# Run
arguments = {'a': 20, 'b': 30, 'c': 40}
kfp.Client().create_run_from_pipeline_func(notebook_run_pipeline, arguments=arguments)

from kfp.components import InputPath, OutputPath
from typing import NamedTuple

def task1(x: OutputPath(str), c: int, a: int = 1, b: int = 2) -> NamedTuple('TaskOutput', [('mlpipeline_ui_metadata', 'UI_metadata'), ('mlpipeline_metrics', 'Metrics'), ('d', int)]):

    
    input_params = ['a', 'b', 'c']
    output_params = ['d']
    use_injected_nb_source_code = True
    remote_notebook_path = ''
    ouput_artifacts = [('x', x)]
    input_artifacts = []
    remove_nb_inputs = True
    
    return exec_nb(locals(), input_params, output_params, use_injected_nb_source_code, 
                   remote_notebook_path, ouput_artifacts, input_artifacts, remove_nb_inputs)


from kfp.components import InputPath, OutputPath
from typing import NamedTuple

def task2(o: InputPath()) -> NamedTuple('TaskOutput', [('mlpipeline_ui_metadata', 'UI_metadata'), ('mlpipeline_metrics', 'Metrics'), ]):

    
    input_params = ['o']
    output_params = []
    use_injected_nb_source_code = True
    remote_notebo

RunPipelineResult(run_id=27a7d913-d81e-4819-8afd-73726cf82310)