In [1]:
import json
import os
from typing import List, Tuple

In [2]:
with open('sources/nodeDocumentation.json', encoding='utf8') as f:
    data = json.load(f)

In [3]:
def get_nodes(data, path):
    nodes = []
    if 'nodes' in data:
        nodes += [(path + node['name'], node) for node in data['nodes']]
    if 'children' in data:
        for child in data['children']:
            nodes += get_nodes(child, path + '/' + data['name'])
    return nodes

In [4]:
def get_name(node):
    substitutions = [
        (' ', '_'),
        ('-', '_'),
        ('/', '_'),
        ('(', ''),
        (')', ''),
    ]
    name = node['name'].lower().strip()
    for old, new in substitutions:
        name = name.replace(old, new)
    return name


def get_implementation_code(node):
    name = get_name(node)

    io_class_translation = {
        'org.knime.core.node.BufferedDataTable': 'ds.TabularDataset',
        'org.knime.base.data.normalize.NormalizerPortObject': 'ds.NormalizerModel',
    }

    params = '[\n'
    for parameter in node['options'] if 'options' in node else []:
        params += f"        Parameter('{parameter['name']}', None, None),  # TODO: check parameter\n"
    params += '    ]'
    inputs = '[\n'
    for input in node['inPorts'] if 'inPorts' in node else []:
        input_class = io_class_translation.get(input['portObjectClass'])
        inputs += f"        {input_class},  # TODO: check input, original: '{input['portObjectClass']}'\n"
    inputs += '    ]'

    outputs = '[\n'
    for output in node['outPorts'] if 'outPorts' in node else []:
        output_class = io_class_translation.get(output['portObjectClass'])
        outputs += f"        {output_class},  # TODO: check output, original: '{output['portObjectClass']}'\n"
    outputs += '    ]'

    model_output = next((True for output in node['outPorts'] if 'model' in output['name'].lower()), False)
    model_input = next((True for input in node['inPorts'] if 'model' in input['name'].lower()), False)

    implementation = 'do.LearnerImplementation' if model_output else 'do.ApplierImplementation' if model_input else 'do.Implementation'
    factory = node['identifier']

    return name, f'''
{name} = KnimeImplementation(
    name='{node['name']}',
    algorithm=None,  # TODO: check algorithm
    parameters={params},
    input={inputs},
    output={outputs},
    implementation_type={implementation},  # TODO: check implementation type
    knime_node_factory='{factory}',
    knime_bundle=KnimeBaseBundle,
)
'''


def get_component_code(node):
    return '', ''

In [5]:
def build_file_from_nodes(nodes, path) -> Tuple[List[str], List[str]]:
    file_path = f'{path}.py'
    implementations = []
    components = []
    with open(file_path, 'w') as f:
        f.write('from ontology_populator.implementations.core import *\n')
        f.write('from ontology_populator.implementations.knime.knime_implementation import *\n')
        f.write('from common import *\n')

        for node in nodes['nodes']:
            implementation, implementation_code = get_implementation_code(node)
            implementations.append(implementation)
            f.write(implementation_code)
            component, component_code = get_component_code(node)
            components.append(component)
            f.write(component_code)
    return implementations, components

In [6]:
def create_package(path):
    os.mkdir(path.lower())
    open(os.path.join(path.lower(), '__init__.py'), 'a').close()


def update_package_init(path, paths, implementations, components):
    with open(os.path.join(path.lower(), '__init__.py'), 'w') as f:
        for p in paths:
            p_fixed = os.path.relpath(p, path).replace('\\', '.').replace('/', '.').replace('.py', '')
            f.write(f'from .{p_fixed} import *\n')
        f.write('implementations = [\n')
        for implementation in implementations:
            for i in implementation:
                f.write(f"    '{i}',\n")
        f.write(']\n')
        f.write('components = [\n')
        for component in components:
            for c in component:
                f.write(f"    '{c}',\n")
        f.write(']\n')

    return [i for impl in implementations for i in impl], [c for comp in components for c in comp]

In [7]:
def build_source_files(source, current_path):
    print('Current path', current_path)
    url_name = get_name(source)
    new_path = os.path.join(current_path, url_name)
    if 'nodes' in source:
        print('\t Create file')
        implementations, components = build_file_from_nodes(source, new_path)
        return new_path, implementations, components
    if 'children' in source:
        print('\t Create package')
        create_package(new_path)
        elements = []
        for child in source['children']:
            result = build_source_files(child, new_path)
            if result is not None:
                elements.append(result)
        implementations, components = update_package_init(new_path, *zip(*elements))
        return new_path, implementations, components
    print('Skipping', source['name'])
    return None

In [9]:
build_source_files(data, './test')

Current path ./test
	 Create package
Current path ./test\root
	 Create file
Current path ./test\root
	 Create package
Current path ./test\root\manipulation
	 Create file
Current path ./test\root\manipulation
	 Create package
Current path ./test\root\manipulation\row
	 Create file
Current path ./test\root\manipulation\row
	 Create file
Current path ./test\root\manipulation\row
	 Create file
Current path ./test\root\manipulation
	 Create file
Current path ./test\root\manipulation
	 Create file
Current path ./test\root
	 Create package
Current path ./test\root\views
	 Create file
Current path ./test\root\views
	 Create file
Current path ./test\root\views
	 Create file
Current path ./test\root\views
	 Create file
Current path ./test\root\views
	 Create file
Current path ./test\root
	 Create package
Current path ./test\root\analytics
	 Create package
Current path ./test\root\analytics\mining
	 Create file
Current path ./test\root\analytics\mining
	 Create file
Current path ./test\root\analy

('./test\\root',
 ['cache',
  'interactive_hilite_collector',
  'table_manipulator',
  'table_validator',
  'table_validator_reference',
  'duplicate_row_filter',
  'filter_apply',
  'filter_apply_row_splitter',
  'filter_definition_merger',
  'hilite_row_splitter',
  'nominal_value_row_filter',
  'nominal_value_row_splitter',
  'numeric_row_splitter',
  'reference_row_filter',
  'reference_row_splitter',
  'row_filter',
  'row_splitter',
  'rule_based_row_filter',
  'rule_based_row_filter_dictionary',
  'rule_based_row_splitter',
  'rule_based_row_splitter_dictionary',
  'concatenate',
  'groupby',
  'ungroup',
  'partitioning',
  'pivoting',
  'unpivoting',
  'rank',
  'row_sampling',
  'bootstrap_sampling',
  'equal_size_sampling',
  'shuffle',
  'sorter',
  'top_k_selector',
  'add_empty_rows',
  'extract_column_header',
  'insert_column_header',
  'rowid',
  'rule_engine',
  'rule_engine_dictionary',
  'extract_table_dimension',
  'extract_table_spec',
  'row_to_column_header',
  