In [360]:
from fairworkflows import FairWorkflow, FairStep, FairVariable
import inspect
import typing
from typing import Callable, get_type_hints, List, Union
import functools
from noodles import schedule, run_single, run_parallel, get_workflow
from noodles.tutorial import display_workflows, run_and_print_log

def _extract_inputs_from_function(func) -> List[FairVariable]:
    """
    Extract inputs from function using inspection. The name of the argument will be the name of
    the fair variable, the corresponding type hint will be the type of the variable.
    """
    argspec = inspect.getfullargspec(func)
    try:
        return [FairVariable(name=arg, type=argspec.annotations[arg].__name__)
                for arg in argspec.args]
    except KeyError:
        raise ValueError('Not all input arguments have type hinting, '
                         'FAIR step functions MUST have type hinting, '
                         'see https://docs.python.org/3/library/typing.html')
        
def _extract_outputs_from_function(func) -> List[FairVariable]:
    """
    Extract outputs from function using inspection. The name will be {function_name}_output{
    output_number}. The corresponding return type hint will be the type of the variable.
    """
    annotations = get_type_hints(func)
    try:
        return_annotation = annotations['return']
    except KeyError:
        raise ValueError('The return of the function does not have type hinting, '
                         'FAIR step functions MUST have type hinting, '
                         'see https://docs.python.org/3/library/typing.html')
    if _is_generic_tuple(return_annotation):
        return [FairVariable(name=func.__name__ + '_output' + str(i + 1), type=annotation.__name__)
                for i, annotation in enumerate(return_annotation.__args__)]
    else:
        return [FairVariable(name=func.__name__ + '_output1', type=return_annotation.__name__)]

def _is_generic_tuple(type_):
    """
    Check whether a type annotation is Tuple
    """
    if hasattr(typing, '_GenericAlias'):
        # 3.7
        # _GenericAlias cannot be imported from typing, because it doesn't
        # exist in all versions, and it will fail the type check in those
        # versions as well, so we ignore it.
        return (isinstance(type_, typing._GenericAlias)
                and type_.__origin__ is tuple)
    else:
        # 3.6 and earlier
        # GenericMeta cannot be imported from typing, because it doesn't
        # exist in all versions, and it will fail the type check in those
        # versions as well, so we ignore it.
        return (isinstance(type_, typing.GenericMeta)
                and type_.__origin__ is typing.Tuple)

    
    
    
def fairstep(func):

    label='test_step'
    is_pplan_step = True
    is_manual_task = False
    is_script_task = True
    
    # Description of step is the raw function code
    description = inspect.getsource(func)
    inputs = _extract_inputs_from_function(func)
    outputs = _extract_outputs_from_function(func)
    func._fairstep = FairStep(label=label,
                                  description=description,
                                  is_pplan_step=is_pplan_step,
                                  is_manual_task=is_manual_task,
                                  is_script_task=is_script_task,
                                  inputs=inputs,
                                  outputs=outputs)
    
    return schedule(func)

def fairworkflow(name=None):
    def fairworkflow_inner(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            promise = func(*args, **kwargs)
            class FairWorkflowP:
                def __init__(self):
                    self.name = name
                    self.promise = promise
                def display(self):
                    display_workflows(prefix='control', workflow=self.promise)
                def execute(self, num_threads=1):
                    if num_threads==1:
                        return run_single(self.promise)
                    elif num_threads>1:
                        return run_parallel(self.promise, num_threads)
                def prov(self):
                    run_and_print_log(fw.promise)
                
                def get_workflow(self):
                    workflow = get_workflow(self.promise)
                    
                    steps_dict = {}
                    for i, n in workflow.nodes.items():
                        steps_dict[i] = n.foo._fairstep
                        print(steps_dict[i])
                    print(steps_dict)
                    for i in workflow.links:
                        print(i, steps_dict[i])
                        for j in workflow.links[i]:
                            print("TO", str(j[0]), str(j[1].name))
                
            return FairWorkflowP()
        return wrapper
    return fairworkflow_inner

<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>
<br/>


# Define the steps of your workflow
Each step should be its own function. Mark the function as such with the @fairstep decorator.

In [361]:
@fairstep
def add(a:float, b:float) -> float:
    """Adding up numbers!"""
    return a + b

In [362]:
@fairstep
def sub(a: float, b: float) -> float:
    """Subtracting numbers."""
    return a - b

In [363]:
@fairstep
def mul(a: float, b: float) -> float:
    """Multiplying numbers."""
    return a * b

In [364]:
@fairstep
def weird(a: float, b:float) -> float:
    """A weird function"""
    return a * 2 + b * 4
    

# Define your workflow using @fairworkflow
Now write a function which describes your workflow. Mark this function with the @fairworkflow decorator.

In [365]:
@fairworkflow(name='My Workflow')
def my_workflow(in1, in2, in3):
    """
    A simple addition, subtraction, multiplication workflow
    """
    t1 = add(in1, in2)
    t2 = sub(in1, in2)
    t3 = mul(weird(t1, in3), t2)
    return t3

In [366]:
fw = my_workflow(1, 4, 3)

In [367]:
fw.display()

| workflow |
| --- |
| ![workflow workflow](control-workflow.svg) |

# Execute your workflow using .execute()
Set num_threads greater than 1 if you wish to exploit parallelisation in your workflow.

In [368]:
answer = fw.execute(num_threads=2)

answer

-66

# Get retrospective provenance of run

In [369]:
fw.prov()

In [370]:
fw.get_workflow()

Step URI = None
@prefix bpmn: <http://dkm.fbk.eu/index.php/BPMN2_Ontology#> .
@prefix ns1: <http://purl.org/dc/terms/> .
@prefix pplan: <http://purl.org/net/p-plan#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .

_:Nb6004b69d42c4fc399a36f0ae4095b24 {
    [] a bpmn:ScriptTask,
            pplan:Step ;
        rdfs:label "test_step" ;
        ns1:description """@fairstep
def mul(a: float, b: float) -> float:
    \"\"\"Multiplying numbers.\"\"\"
    return a * b
""" ;
        pplan:hasInputVar [ a pplan:Variable,
                    "float" ],
            [ a pplan:Variable,
                    "float" ] ;
        pplan:hasOutputVar [ a pplan:Variable,
                    "float" ] .
}


Step URI = None
@prefix bpmn: <http://dkm.fbk.eu/index.php/BPMN2_Ontology#> .
@prefix ns1: <http://purl.org/dc/terms/> .
@prefix pplan: <http://purl.org/net/p-plan#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .

_:N8e80a15495644764995c3ab37c269834 {
    [] a bpmn:ScriptTask,
     