In [None]:
import itertools
import uuid
from typing import Tuple, Any, List

from owlrl import DeductiveClosure, OWLRL_Semantics
from pyshacl import validate

from common import *

# Pipeline generation algorithm

In [None]:
ontology = get_ontology_graph()

### 1. Obtain Intent Information functions

In [None]:
def get_intent_iri(intent_graph):
    intent_iri_query = f"""
PREFIX dtbox: <{dtbox}>
SELECT ?iri
WHERE {{
    ?iri a dtbox:Intent .
}}
"""
    result = intent_graph.query(intent_iri_query).bindings
    assert len(result) == 1
    return result[0]['iri']


def get_intent_dataset_problem(intent_graph, intent_iri):
    dataset_problem_query = f"""
    PREFIX dtbox: <{dtbox}>
    SELECT ?dataset ?problem
    WHERE {{
        {intent_iri.n3()} a dtbox:Intent .
        {intent_iri.n3()} dtbox:overData ?dataset .
        {intent_iri.n3()} dtbox:tackles ?problem .
    }}
"""
    result = intent_graph.query(dataset_problem_query).bindings[0]
    return result['dataset'], result['problem']


def get_intent_params(intent_graph, intent_iri):
    params_query = f"""
    PREFIX dtbox: <{dtbox}>
    SELECT ?param ?value
    WHERE {{
        {intent_iri.n3()} a dtbox:UserIntent .
        {intent_iri.n3()} dtbox:usingParameter ?param_value .
        ?param_value dtbox:forParameter ?param .
        ?param_value dtbox:has_value ?value .
    }}
"""
    result = intent_graph.query(params_query).bindings
    return result


def get_intent_info(intent_graph, intent_iri=None) -> Tuple[Any, Any, List[Any], Any]:
    if not intent_iri:
        intent_iri = get_intent_iri(intent_graph)

    dataset, problem = get_intent_dataset_problem(intent_graph, intent_iri)
    params = get_intent_params(intent_graph, intent_iri)

    return dataset, problem, params, intent_iri

### 2. Obtain Loader functions

### 3. Obtain Main component dataset

In [None]:
def get_implementation_input_specs(ontology, implementation):
    input_spec_query = f"""
        PREFIX dtbox: <{dtbox}>
        SELECT ?shape
        WHERE {{
            {implementation.n3()} dtbox:specifiesInput ?spec .
            ?spec a dtbox:IOSpec ;
                dtbox:hasTag ?shape ;
                dtbox:has_position ?position .
            ?shape a dtbox:DataTag .
        }}
        ORDER BY ?position
    """
    results = ontology.query(input_spec_query).bindings
    shapes = [flatten_shape(ontology, result['shape']) for result in results]
    return shapes


def get_implementation_output_specs(ontology, implementation):
    output_spec_query = f"""
        PREFIX dtbox: <{dtbox}>
        SELECT ?shape
        WHERE {{
            {implementation.n3()} dtbox:specifiesOutput ?spec .
            ?spec a dtbox:IOSpec ;
                dtbox:hasTag ?shape ;
                dtbox:has_position ?position .
            ?shape a dtbox:DataTag .
        }}
        ORDER BY ?position
    """
    results = ontology.query(output_spec_query).bindings
    shapes = [flatten_shape(ontology, result['shape']) for result in results]
    return shapes


def flatten_shape(graph, shape):
    if (shape, SH['and'], None) in graph:
        subshapes_query = f"""
            PREFIX sh: <{SH}>
            PREFIX rdf: <{RDF}>

            SELECT ?subshape
            WHERE {{
                {shape.n3()} sh:and ?andNode .
                ?andNode rdf:rest*/rdf:first ?subshape .
            }}
        """
        subshapes = graph.query(subshapes_query).bindings

        return [x for subshape in subshapes for x in flatten_shape(graph, subshape['subshape'])]
    else:
        return [shape]


def get_potential_implementations(ontology, problem_iri, intent_parameters=None) -> List[Tuple[Any, List[Any]]]:
    if intent_parameters is None:
        intent_parameters = []
    intent_params_match = [f'dtbox:hasParameter {param.n3()} ;' for param in intent_parameters]
    intent_params_separator = '            \n'
    main_implementation_query = f"""
    PREFIX dtbox: <{dtbox}>
    SELECT ?implementation
    WHERE {{
        ?implementation a dtbox:Implementation ;
            {intent_params_separator.join(intent_params_match)}
            dtbox:implements ?algorithm .
        ?algorithm a dtbox:Algorithm ;
            dtbox:solves ?problem .
        ?problem dtbox:subProblemOf* {problem_iri.n3()} .
        FILTER NOT EXISTS{{
            ?implementation a dtbox:ApplierImplementation.
        }}
    }}
"""
    results = ontology.query(main_implementation_query).bindings
    implementations = [result['implementation'] for result in results]

    implementations_with_shapes = [
        (implementation, get_implementation_input_specs(ontology, implementation))
        for implementation in implementations]

    return implementations_with_shapes


def get_implementation_components(ontology, implementation) -> List[Any]:
    components_query = f"""
        PREFIX dtbox: <{dtbox}>
        SELECT ?component
        WHERE {{
            ?component dtbox:hasImplementation {implementation.n3()} .
        }}
    """
    results = ontology.query(components_query).bindings
    return [result['component'] for result in results]

In [None]:
def find_components_to_satisfy_shape(ontology, shape, only_appliers=False):
    implementation_query = f"""
        PREFIX dtbox: <{dtbox}>
        SELECT ?implementation
        WHERE {{
            ?implementation a dtbox:{'Applier' if only_appliers else ''}Implementation ;
                dtbox:specifiesOutput ?spec .
            ?spec dtbox:hasTag {shape.n3()} .
        }}
    """
    result = ontology.query(implementation_query).bindings
    implementations = [x['implementation'] for x in result]
    components = [c
                  for implementation in implementations
                  for c in get_implementation_components(ontology, implementation)]
    return components

In [None]:
def identify_data_io(ontology: Graph, ios: List[Any], return_index=False) -> Any:
    for i, io_shapes in enumerate(ios):
        for io_shape in io_shapes:
            if (io_shape, SH.targetClass, dmop.TabularDataset) in ontology:
                return i if return_index else io_shapes


def identify_model_io(ontology: Graph, ios: List[Any], return_index=False) -> Any:
    for i, io_shapes in enumerate(ios):
        for io_shape in io_shapes:
            query = f'''
    PREFIX sh: <{SH}>
    PREFIX rdfs: <{RDFS}>
    PREFIX ddata: <{dd}>

    ASK {{
      {io_shape.n3()} sh:targetClass ?targetClass .

      ?targetClass rdfs:subClassOf* ddata:Model .
    }}
'''
            if ontology.query(query).askAnswer:
                return i if return_index else io_shapes

In [None]:
def satisfies_shape(data_graph, shacl_graph, shape, focus):
    conforms, g, report = validate(data_graph, shacl_graph=shacl_graph, validate_shapes=[shape], focus=focus)
    return conforms


def get_shape_target_class(ontology, shape):
    return ontology.query(f"""
        PREFIX sh: <{SH}>
        SELECT ?targetClass
        WHERE {{
            <{shape}> sh:targetClass ?targetClass .
        }}
    """).bindings[0]['targetClass']

In [None]:
def get_implementation_parameters(ontology, implementation) -> List:
    parameters_query = f"""
        PREFIX dtbox: <{dtbox}>
        SELECT ?parameter ?value
        WHERE {{
            <{implementation}> dtbox:hasParameter ?parameter .
            ?parameter dtbox:hasDefaultValue ?value ;
                       dtbox:has_position ?order .
        }}
        ORDER BY ?order
    """
    results = ontology.query(parameters_query).bindings
    return [{
        'parameter': result['parameter'],
        'value': result['value']
    } for result in results]

In [None]:
def add_step(graph, pipeline, task_name, implementation, parameters, order, previous_task=None, inputs=None,
             outputs=None):
    if outputs is None:
        outputs = []
    if inputs is None:
        inputs = []
    step = dw.term(task_name)
    graph.add((pipeline, dtbox.hasStep, step))
    graph.add((step, RDF.type, dtbox.Step))
    graph.add((step, dtbox.runs, implementation))
    graph.add((step, dtbox.has_position, Literal(order)))
    for i, input in enumerate(inputs):
        in_node = BNode()
        graph.add((in_node, RDF.type, dtbox.IO))
        graph.add((in_node, dtbox.hasData, input))
        graph.add((in_node, dtbox.has_position, Literal(i)))
        graph.add((step, dtbox.hasInput, in_node))
    for o, output in enumerate(outputs):
        out_node = BNode()
        graph.add((out_node, RDF.type, dtbox.IO))
        graph.add((out_node, dtbox.hasData, output))
        graph.add((out_node, dtbox.has_position, Literal(o)))
        graph.add((step, dtbox.hasOutput, out_node))
    for param in parameters:
        param_value = BNode()
        graph.add((step, dtbox.hasParameterValue, param_value))
        graph.add((param_value, dtbox.forParameter, param['parameter']))
        if 'value' in param:
            graph.add((param_value, dtbox.has_value, param['value']))
        else:
            graph.add((param_value, dtbox.has_value, Literal('REQUIRES USER INPUT')))
    if previous_task:
        if isinstance(previous_task, list):
            for previous in previous_task:
                graph.add((previous, dtbox.followedBy, step))
        else:
            graph.add((previous_task, dtbox.followedBy, step))
    return step

In [None]:
def get_implementation_transformations(ontology, implementation) -> List:
    transformation_query = f'''
        PREFIX dtbox: <{dtbox}>
        SELECT ?transformation
        WHERE {{
            <{implementation}> dtbox:hasTransformation ?transformation_list .
            ?transformation_list rdf:rest*/rdf:first ?transformation .
        }}
    '''
    transformations = ontology.query(transformation_query).bindings
    return [x['transformation'] for x in transformations]

In [None]:
def copy_subgraph(source_graph: Graph, source_node: URIRef, destination_graph: Graph, destination_node: URIRef,
                  replace_nodes: bool = True):
    visited_nodes = set()
    nodes_to_visit = [source_node]
    mappings = {source_node: destination_node}

    while nodes_to_visit:
        current_node = nodes_to_visit.pop()
        visited_nodes.add(current_node)
        for predicate, object in source_graph.predicate_objects(current_node):
            if predicate == OWL.sameAs:
                continue
            if replace_nodes and isinstance(object, IdentifiedNode):
                if predicate == RDF.type or object in dmop:
                    mappings[object] = object
                else:
                    if object not in visited_nodes:
                        nodes_to_visit.append(object)
                    if object not in mappings:
                        mappings[object] = BNode()
                destination_graph.add((mappings[current_node], predicate, mappings[object]))
            else:
                destination_graph.add((mappings[current_node], predicate, object))


def annotate_io_with_spec(ontology: Graph, workflow_graph: Graph, io: URIRef, io_spec: List[URIRef]):
    for spec in io_spec:
        io_spec_class = next(ontology.objects(spec, SH.targetClass, True), None)
        if io_spec_class is None or (io, RDF.type, io_spec_class) in workflow_graph:
            continue
        workflow_graph.add((io, RDF.type, io_spec_class))


def annotate_ios_with_specs(ontology: Graph, workflow_graph: Graph, io: List[URIRef], specs: List[List[URIRef]]):
    assert len(io) == len(specs), 'Number of IOs and specs must be the same'
    for io, spec in zip(io, specs):
        annotate_io_with_spec(ontology, workflow_graph, io, spec)


def run_copy_transformation(ontology: Graph, workflow_graph: Graph, transformation, inputs, outputs):
    input_index = next(ontology.objects(transformation, dtbox.copy_input, True)).value
    output_index = next(ontology.objects(transformation, dtbox.copy_output, True)).value
    input = inputs[input_index - 1]
    output = outputs[output_index - 1]

    copy_subgraph(workflow_graph, input, workflow_graph, output)


def run_implementation_transformation(ontology: Graph, workflow_graph: Graph, implementation, inputs, outputs,
                                      parameters):
    transformations = get_implementation_transformations(ontology, implementation)
    for transformation in transformations:
        if (transformation, RDF.type, dtbox.CopyTransformation) in ontology:
            run_copy_transformation(ontology, workflow_graph, transformation, inputs, outputs)
        else:
            prefixes = f'''
PREFIX dtbox: <{dtbox}>
PREFIX rdf: <{RDF}>
PREFIX rdfs: <{RDFS}>
PREFIX owl: <{OWL}>
PREFIX xsd: <{XSD}>
PREFIX dmop: <{dmop}>
'''
            query = next(ontology.objects(transformation, dtbox.transformation_query, True)).value
            query = prefixes + query
            for i in range(len(inputs)):
                query = query.replace(f'$input{i + 1}', f'{inputs[i].n3()}')
            for i in range(len(outputs)):
                query = query.replace(f'$output{i + 1}', f'{outputs[i].n3()}')
            for i in range(len(parameters)):
                query = query.replace(f'$param{i + 1}', f'{parameters[i].toPython()}')
                query = query.replace(f'$parameter{i + 1}', f'{parameters[i].toPython()}')
            workflow_graph.update(query)

In [None]:
def step_name(workflow_name, task_order, implementation):
    return f'{workflow_name}-step_{task_order}_{implementation.fragment.replace("-", "_")}'


def build_workflow_train_test(workflow_name, ontology, dataset, main_component, split_component, transformations):
    workflow_graph = get_graph()
    workflow = dw.term(workflow_name)
    workflow_graph.add((workflow, RDF.type, dtbox.Workflow))
    task_order = 0

    dataset_node = dw.term(f'{workflow_name}-original_dataset')

    copy_subgraph(ontology, dataset, workflow_graph, dataset_node)

    split_step_name = step_name(workflow_name, task_order, split_component)
    split_outputs = [dw[f'{split_step_name}-output_train'], dw[f'{split_step_name}-output_test']]
    split_parameters = get_implementation_parameters(ontology, split_component)
    split_step = add_step(workflow_graph, workflow,
                          split_step_name,
                          split_component,
                          split_parameters,
                          task_order,
                          None,
                          [dataset_node],
                          split_outputs)
    run_implementation_transformation(ontology, workflow_graph, split_component,
                                      [dataset_node], split_outputs,
                                      [p['value'] for p in split_parameters])

    task_order += 1

    train_dataset_node = split_outputs[0]
    test_dataset_node = split_outputs[1]

    previous_train_step = split_step
    previous_test_step = split_step

    for test_implementation in [*transformations, main_component]:
        train_counterpart = next(ontology.objects(test_implementation, dtbox.hasLearner, True), test_implementation)
        same = train_counterpart == test_implementation

        train_step_name = step_name(workflow_name, task_order, train_counterpart)
        test_step_name = step_name(workflow_name, task_order + 1, test_implementation)

        train_input_specs = get_implementation_input_specs(ontology, train_counterpart)
        train_input_data_index = identify_data_io(ontology, train_input_specs, return_index=True)
        train_transformation_inputs = [dw[f'{train_step_name}-input_{i}'] for i in range(len(train_input_specs))]
        train_transformation_inputs[train_input_data_index] = train_dataset_node
        annotate_ios_with_specs(ontology, workflow_graph, train_transformation_inputs,
                                train_input_specs)

        train_output_specs = get_implementation_output_specs(ontology, train_counterpart)
        train_output_model_index = identify_model_io(ontology, train_output_specs, return_index=True)
        train_output_data_index = identify_data_io(ontology, train_output_specs, return_index=True)
        train_transformation_outputs = [dw[f'{train_step_name}-output_{i}'] for i in range(len(train_output_specs))]
        annotate_ios_with_specs(ontology, workflow_graph, train_transformation_outputs,
                                train_output_specs)

        train_step = add_step(workflow_graph, workflow,
                              train_step_name,
                              train_counterpart, [], task_order, previous_train_step, train_transformation_inputs,
                              train_transformation_outputs)

        previous_train_step = train_step

        run_implementation_transformation(ontology, workflow_graph, train_counterpart, train_transformation_inputs,
                                          train_transformation_outputs, [])

        if train_output_data_index is not None:
            train_dataset_node = train_transformation_outputs[train_output_data_index]

        task_order += 1

        test_input_specs = get_implementation_input_specs(ontology, test_implementation)
        test_input_data_index = identify_data_io(ontology, test_input_specs, return_index=True)
        test_input_model_index = identify_model_io(ontology, test_input_specs, return_index=True)
        test_transformation_inputs = [dw[f'{test_step_name}-input_{i}'] for i in range(len(test_input_specs))]
        test_transformation_inputs[test_input_data_index] = test_dataset_node
        test_transformation_inputs[test_input_model_index] = train_transformation_outputs[train_output_model_index]
        annotate_ios_with_specs(ontology, workflow_graph, test_transformation_inputs,
                                test_input_specs)

        test_output_specs = get_implementation_output_specs(ontology, test_implementation)
        test_output_data_index = identify_data_io(ontology, test_output_specs, return_index=True)
        test_transformation_outputs = [dw[f'{test_step_name}-output_{i}'] for i in range(len(test_output_specs))]
        annotate_ios_with_specs(ontology, workflow_graph, test_transformation_outputs,
                                test_output_specs)

        previous_test_steps = [previous_test_step, train_step] if not same else [previous_test_step]
        test_step = add_step(workflow_graph, workflow,
                             test_step_name,
                             test_implementation, [], task_order, previous_test_steps, test_transformation_inputs,
                             test_transformation_outputs)

        run_implementation_transformation(ontology, workflow_graph, test_implementation, test_transformation_inputs,
                                          test_transformation_outputs, [])

        test_dataset_node = test_transformation_outputs[test_output_data_index]
        previous_test_step = test_step
        task_order += 1

    return workflow_graph, workflow

## Algorithm

In [None]:
intent_graph = get_graph()
ins = Namespace('https://diviloper.dev/intent#')
intent_graph.add((ins.DescriptionIntent, RDF.type, dtbox.Intent))
intent_graph.add((ins.DescriptionIntent, dtbox.overData, dd.term('penguins.csv')))
intent_graph.add((ins.DescriptionIntent, dtbox.tackles, dabox.Description))

In [None]:
log = True

In [None]:
dataset, problem, intent_params, intent_iri = get_intent_info(intent_graph)

if log:
    print(f'Dataset: {dataset.fragment}')
    print(f'Problem: {problem.fragment}')
    print(f'Intent params: {intent_params}')
    print('-------------------------------------------------')

comps = get_potential_implementations(ontology, problem, [x['param'] for x in intent_params])
components = [
    (c, impl, inputs)
    for impl, inputs in comps
    for c in get_implementation_components(ontology, impl)
]
if log:
    for component, implementation, inputs in components:
        print(f'Component: {component.fragment} ({implementation.fragment})')
        for im_input in inputs:
            print(f'\tInput: {[x.fragment for x in im_input]}')
    print('-------------------------------------------------')

workflow_order = 0

split_components = [
    dabox.term('component-random_absolute_train_test_split'),
    dabox.term('component-random_relative_train_test_split'),
    dabox.term('component-top_k_absolute_train_test_split'),
    dabox.term('component-top_k_relative_train_test_split'),
]

for component, implementation, inputs in components:
    if log:
        print(f'Component: {component.fragment} ({implementation.fragment})')
    shapes_to_satisfy = identify_data_io(ontology, inputs)
    assert shapes_to_satisfy is not None and len(shapes_to_satisfy) > 0
    if log:
        print(f'\tData input: {[x.fragment for x in shapes_to_satisfy]}')

    unsatisfied_shapes = [shape for shape in shapes_to_satisfy if
                          not satisfies_shape(ontology, ontology, shape, dataset)]

    available_transformations = {
        shape: find_components_to_satisfy_shape(ontology, shape, only_appliers=True)
        for shape in unsatisfied_shapes
    }

    if log:
        print(f'\tUnsatisfied shapes: ')
        for shape, comps in available_transformations.items():
            print(f'\t\t{shape.fragment}: {[x.fragment for x in comps]}')

    transformation_combinations = list(itertools.product(split_components, *available_transformations.values()))
    # TODO - check if the combination is valid and whether further transformations are needed

    if log:
        print(f'\tTransformation combinations: ')
        for combination in transformation_combinations:
            print(f'\t\t{[x.fragment for x in combination]}')

    for transformation_combination in transformation_combinations:
        workflow_name = f'workflow_{workflow_order}_{intent_iri.fragment}_{uuid.uuid4()}'.replace('-', '_')
        wg, w = build_workflow_train_test(workflow_name, ontology, dataset, component, transformation_combination[0], transformation_combinations[1:])
        if log:
            print(f'\t\tWorkflow {workflow_order}: {w.fragment}')
        wg.serialize(f'{workflow_name}.ttl', format='turtle')
        workflow_order += 1

In [None]:
list(ontology.triples((None, dtbox.hasImplementation, None)))