In [1]:
import sys
sys.path.append('../../')
sys.path.append('../')
from processingNetwork import ProcessingNode
from processingNetwork import ProcessingNetwork
from processingNetwork.PipeProcessingNode import PipeProcessingNode
from processingNetwork import test

import numpy as np

# Pipes
Pipes are like (tensor,operation,(input|output),context=None) tuples in tensorflow. Pipes combine the concept of input, output, operation, and context into one place.  In fact, when implementing a pipe, it is common to lean on several libraries to implement a well behaved function. That might be, tensorflow for a variable, tensorflow for operations, which return a tensor, distributed.dash for a cloud operation, or spark for map reduce. No matter the method, A pipe is cosidered as a well behaved sequential process that manages a logical operation in sequence.

In [2]:
# See run_tests for an exhaustive list of use cases.
test.run_tests()
# No errors means the Pipe framework is working as expected

KeyError: 'output_echo'

In [3]:

# An output only pipe. This pipe reads some settings, if present, and alters the tensors passed with an operation 
def MultiplyPipe(tensors,context= None):
    
    try:
        multiplier = context['multiplier']
    except:
        multiplier = 10
    
    ## In general Operations should support np.array, or sets {label:np.array}
    
    if isinstance(tensors,dict): # Support  sets {label:np.array}
        output = {}
        for tkey in tensors.keys():
            if type(tensors) is not np.ndarray:
                tensors[tkey] = np.array(tensors[tkey])
                output[tkey] = tensors[tkey]*multiplier
    else: # Support np.array
        if tensors is not None: # Support None
            if type(tensors) is not np.ndarray:
                tensors = np.array(tensors)
            output = tensors*multiplier
        else:
            output = None
    return output

# A configuration only pipe. Alters some settings, if present, and alters the tensors passed with an operation 
def GeneralConfigure(tensors,context= None):
    try:
        context.update(tensors)
    except:
        pass
    return context


# An output pipe that does some processing. In this case we just output data. 
#In general, output can be decorated with logging, saving, gating, signal amplification, and many other tasks
def GeneralOutput(tensors,context= None):
    print(tensors)
    return tensors


In [4]:
class ExamplePipeNode(PipeProcessingNode):    
    def pipe_construct(self):
        self.pipes = {}
        
        self.pipes['parameters']= {'name':'parameters',
                               'type': GeneralConfigure, 
                               'io':'input',
                               'examples':[{'multiply':10},{'multiply':np.array([2,2])}],
                               'dependency':None,
                                }
        self.pipes['input']= {'name':'input',
                                 'type': MultiplyPipe, 
                                 'io':'input',
                                 'examples':[{'a':10,'b':np.array([1,2,3])},10,np.array([2,3])],
                                 'dependency':None,
                                }

        self.pipes['output']= {'name':'output',
                               'type': GeneralOutput, 
                               'io':'output',
                                'examples':[{'a':10,'b':np.array([1,2,3])},10,np.array([2,3])],
                               'dependency':'input',
                              }        
        

In [6]:
ep = ExamplePipeNode()
ep.pipe_help(0)


inputs:  ['parameters', 'input']

outputs:  ['output']

Interface:  {'name': 'parameters', 'type': <function GeneralConfigure at 0x7f0a47ed7c80>, 'io': 'input', 'examples': [{'multiply': 10}, {'multiply': array([2, 2])}], 'dependency': None}

Usage Examples:  [{'multiply': 10}, {'multiply': array([2, 2])}]

Usage Full Example: ' parameters ':  {'multiply': 10}


In [7]:
ep.pipe_help(1)


inputs:  ['parameters', 'input']

outputs:  ['output']

Interface:  {'name': 'input', 'type': <function MultiplyPipe at 0x7f0a47ed7d08>, 'io': 'input', 'examples': [{'a': 10, 'b': array([1, 2, 3])}, 10, array([2, 3])], 'dependency': None}

Usage Examples:  [{'a': 10, 'b': array([1, 2, 3])}, 10, array([2, 3])]

Usage Full Example: ' input ':  {'a': 10, 'b': array([1, 2, 3])}


## Create and run a data_driven node

In [18]:
data_in= {'parameters_in':{'multiply': 10},
          'input_in':  {'a': 10, 'b': np.array([1, 2, 3])}}
ndef = {}
hardcoded1 = 'hardcoded'

## Tuples, are pointers to data in the datastream. You expect to find tagged data in the data strea,
ndef = {}
ndef['default_node'] = {'type': ExamplePipeNode,
              'dependencies': {'parameters':('parameters_in',),
                              'input':('input_in',)}
                       }
pn=ProcessingNetwork(ndef)
out = pn.process(data_in)
print(out)

{'a': 100, 'b': array([10, 20, 30])}
{'parameters_in': {'multiply': 10}, 'input_in': {'a': array(10), 'b': array([1, 2, 3])}, 'default_node': {'output': {'a': 100, 'b': array([10, 20, 30])}}}


## Create and run an automatic node

In [19]:
data_in= {}
ndef = {}
hardcoded1 = 'hardcoded'

## Tuples, are pointers to data in the datastream. You expect to find tagged data in the data strea,
ndef = {}
ndef['default_node'] = {'type': ExamplePipeNode,
              'dependencies': {'parameters':{'multiply': 10},
                              'input': {'a': 10, 'b': np.array([1, 2, 3])}}
                       }
pn=ProcessingNetwork(ndef)
out = pn.process(data_in)
print(out)

{'a': 100, 'b': array([10, 20, 30])}
{'default_node': {'output': {'a': 100, 'b': array([10, 20, 30])}}}


None
{'default_node': {'output': None}}
