<a href="https://colab.research.google.com/github/hrbolek/func2pipe/blob/master/notebooks/func2pipe.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Func to Pipe

## Source Code

### Functions Reducer

In [0]:
def createPipe(*F):
  def Fdiamond(sequence):
    result = sequence
    for Fx in F:
      result = Fx(result)
    return result
  return Fdiamond

### Subpipe

In [0]:
def createSub(assign = lambda source, result: {**source, **result}, reducer = None):

    def multipleResults(*F):
        buffer = {'data': None}
        def pre(gen):
            for item in gen:
                buffer['data'] = item
                yield item

        def inner(gen):
            e = gen
            for f in F:
                e = f(e)
            return e

        def post(gen):
            for item in gen:
                yield assign(buffer['data'], item)

        return lambda gen: post(inner(pre(gen)))

    def reducedResult(*F):
        def inner(gen):
            e = gen
            for f in F:
                e = f(e)
            return e

        def result(gen):
            for item in gen:
                itemResult = reducer(inner([item]))
                yield assign(item, itemResult)
        return result

    if reducer == None:
        return multipleResults
    else:
        return reducedResult

### Convertor

In [0]:
from functools import wraps, reduce, partial


In [0]:
from functools import wraps, reduce, partial
def convertToPipeFuncFull(func, with_yield = False, with_state = False, kwargs = {}):
    @wraps(func)
    def innerSelectSimple(generator):
        for i in generator:
            yield func(i, **kwargs)

    if (not with_yield) & (not with_state):
        return innerSelectSimple
    
    @wraps(func)
    def innerSelectWithYield(generator):
        for i in generator:
            for j in func(i, **kwargs):
                yield j


    if (with_yield) & (not with_state):
        return innerSelectWithYield

    @wraps(func)
    def innerSelectWithState(generator):
        state = None
        for i in generator:
            result = None
            if (state):
                result, state = func(i, state = state, **kwargs)
            else:
                result, state = func(i, **kwargs)
            if result:
                yield result

    if (not with_yield) & with_state:
        return innerSelectWithState

    def innerSelectWithYieldWithState(generator):
        state = None
        initState = True
        for i in generator:
            result = None
            if (initState):
                for result, statel in func(i, **kwargs):
                    state = statel
                    if result:
                        yield result
                initState = False
            else:
                for result, statel in func(i, state = state, **kwargs):
                    state = statel
                    if result:
                        yield result


    return innerSelectWithYieldWithState

### Convertor Decorator

In [0]:
def convertToPipeFunc(with_yield = False, with_state = False):
    def pipeit(__func):
        @wraps(__func)
        def binder(**kwargs):
            result = wraps(__func)(convertToPipeFuncFull(__func, with_yield, with_state, kwargs))
            return result
        return binder
    return pipeit

convertToPipeFunc2 = convertToPipeFunc 

## Tests

In [0]:
import unittest

class TestCase(unittest.TestCase):
  def __call__(self, *args, **kwargs):
    self.assertEqual(*args, **kwargs)
    return True

testEquality = TestCase()

### Simple Function

In [79]:
source = [{'value': 0}, {'value': 1}, {'value': 2}]
@convertToPipeFunc()
def add(item, amount):
  return {**item, 'result': item['value'] + amount}

expectedResult = [{'value': 0, 'result': 3}, {'value': 1, 'result': 4},
  {'value': 2, 'result': 5}]

pipe = createPipe(
    add(amount = 2),
    add(amount = 3),
    list)

result = pipe(source)
print('Source:', source)
print('Result:', result)
print('Valid: ', testEquality(expectedResult, result)) 

Source: [{'value': 0}, {'value': 1}, {'value': 2}]
Result: [{'value': 0, 'result': 3}, {'value': 1, 'result': 4}, {'value': 2, 'result': 5}]
Valid:  True


### Function with Yield

In [80]:
source = [{'value': ['A', 'B']}, {'value': ['A', 'C']}, {'value': ['D', 'E']}]

@convertToPipeFunc(with_yield = True)
def revealSubItem(item, itemName):
    for _ in item[itemName]:
        yield _

expectedResult = ['A', 'B', 'A', 'C', 'D', 'E']

pipe = createPipe(
    revealSubItem(itemName = 'value'),
    list)

result = pipe(source)
print('Source:', source)
print('Result:', result)
print('Valid: ', testEquality(expectedResult, result)) 

Source: [{'value': ['A', 'B']}, {'value': ['A', 'C']}, {'value': ['D', 'E']}]
Result: ['A', 'B', 'A', 'C', 'D', 'E']
Valid:  True


### Function with State

In [81]:
source = [{'value': ['A', 'B']}, {'value': ['A', 'C']}, {'value': ['D', 'E']}]

@convertToPipeFunc(with_state = True)
def assignId(item, state = 0, idName = 'id'):
    return {**item, idName: state}, state + 1

expectedResult = [{'value': ['A', 'B'], 'ID': 0}, 
    {'value': ['A', 'C'], 'ID': 1}, {'value': ['D', 'E'], 'ID': 2}]

pipe = createPipe(
    assignId(idName = 'ID'),
    list)

result = pipe(source)
print('Source:', source)
print('Result:', result)
print('Valid: ', testEquality(expectedResult, result)) 

Source: [{'value': ['A', 'B']}, {'value': ['A', 'C']}, {'value': ['D', 'E']}]
Result: [{'value': ['A', 'B'], 'ID': 0}, {'value': ['A', 'C'], 'ID': 1}, {'value': ['D', 'E'], 'ID': 2}]
Valid:  True


### Function with Yield and with State

In [82]:
source = [{'value': ['A', 'B']}, {'value': ['A', 'C']}, {'value': ['D', 'E']}]

@convertToPipeFunc(with_yield = True, with_state = True)
def assignIdToSubItem(item, state = -1, itemName = ''):
    for _ in item[itemName]:
        state = state + 1
        yield {itemName: _, 'id': state}, state

expectedResult = [{'value': 'A', 'id': 0}, {'value': 'B', 'id': 1}, 
  {'value': 'A', 'id': 2}, {'value': 'C', 'id': 3}, 
  {'value': 'D', 'id': 4}, {'value': 'E', 'id': 5}
  ]

pipe = createPipe(
    assignIdToSubItem(itemName = 'value'),
    list)

result = pipe(source)
print('Source:', source)
print('Result:', result)
print('Valid: ', testEquality(expectedResult, result)) 

Source: [{'value': ['A', 'B']}, {'value': ['A', 'C']}, {'value': ['D', 'E']}]
Result: [{'value': 'A', 'id': 0}, {'value': 'B', 'id': 1}, {'value': 'A', 'id': 2}, {'value': 'C', 'id': 3}, {'value': 'D', 'id': 4}, {'value': 'E', 'id': 5}]
Valid:  True


### Subpipe

In [83]:
source = [{'value': 0}, {'value': 1}, {'value': 2}]
@convertToPipeFunc()
def plus(item, amount):
  return item + amount

@convertToPipeFunc()
def selectIt(item, f):
  return f(item)

expectedResult = [
  {'value': 0, 'newvalue': 2}, 
  {'value': 1, 'newvalue': 3}, 
  {'value': 2, 'newvalue': 4}
  ]

pipe = createPipe(
    createSub(assign = lambda source, result: {**source, 'newvalue': result})(
      selectIt(f = lambda item: item['value']),
      plus(amount = 2),
    ),
    list)

result = pipe(source)
print('Source:', source)
print('Result:', result)
print('Valid: ', testEquality(expectedResult, result)) 

Source: [{'value': 0}, {'value': 1}, {'value': 2}]
Result: [{'value': 0, 'newvalue': 2}, {'value': 1, 'newvalue': 3}, {'value': 2, 'newvalue': 4}]
Valid:  True


### Subpipe II

In [84]:
source = [{'values': [0, 1]}, {'values': [2, 1]}, {'values': [5, 2]}]
@convertToPipeFunc()
def plus(item, amount):
    return item + amount

@convertToPipeFunc(with_yield=True)
def selectItAndEnum(item, f):
    for _ in f(item):
      yield _

def createReducer():
    def reducer(source):
        total = 0
        for item in source:
          total = total + item
        return total 
    return reducer

expectedResult = [
  {'values': [0, 1], 'sum': 1}, 
  {'values': [2, 1], 'sum': 3}, 
  {'values': [5, 2], 'sum': 7}
  ]

pipe = createPipe(
    createSub(assign = lambda source, result: {**source, 'sum': result},
      reducer = createReducer())(
      selectItAndEnum(f = lambda item: item['values']),
    ),
    list)

result = pipe(source)
print('Source:', source)
print('Result:', result)
print('Valid: ', testEquality(expectedResult, result)) 

Source: [{'values': [0, 1]}, {'values': [2, 1]}, {'values': [5, 2]}]
Result: [{'values': [0, 1], 'sum': 1}, {'values': [2, 1], 'sum': 3}, {'values': [5, 2], 'sum': 7}]
Valid:  True


In [85]:
source = [{'value': 0}, {'value': 1}, {'value': 2}]
@convertToPipeFunc()
def add(item, amount):
  return {**item, 'result': item['value'] + amount}

@convertToPipeFunc()
def calc(item, func, name):
  result = {}
  result[name] = func(item)
  return result

@convertToPipeFunc(with_yield = True)
def extra(item, extradata):
  for _ in extradata:
    yield _

def createReducer(accumulator = [], f = lambda item, accumulator: [*accumulator, item]):
  def reducer(gen):
    result = accumulator
    for item in gen:
      result = f(item, result)
    return result
  return reducer

pipe_01 = createPipe(
    add(amount = 2),
    createSub(assign = lambda source, result: {**source, 'value2': result}, reducer = createReducer())(
#    SUB(assign = lambda source, result: {**source, **result})(
#    SUB(assign = lambda source, result: {**source, 'value2': result})(
      calc(func = lambda item: 4, name = 'extraValue'),
      extra(extradata = ['A', 'B', 'C'])
    )
    )
result = list(pipe_01(source))
print(result)


[{'value': 0, 'result': 2, 'value2': ['A', 'B', 'C']}, {'value': 1, 'result': 3, 'value2': ['A', 'B', 'C']}, {'value': 2, 'result': 4, 'value2': ['A', 'B', 'C']}]


## Pipe Described by Graph 

In [0]:
def createArrayReducer(accumulator = [], f = lambda accumulator, item: [*accumulator, item]):
  result = lambda gen: reduce(f, gen, accumulator)
  return result

@convertToPipeFunc(with_yield=True)
def stopNotFound(item):
  if item:
    yield item

def graphToPipe(graph, currentnode):
  availableNodes = []
  for node in graph['nodes']:
    availableNodes.append(node)

  def innerBuilder(graph, currentnode, filterq = lambda item: True):
      availableNodes.remove(currentnode)
      descriptorPipe = graph['nodes'][currentnode]

      def buildRelation(relation):
        itemname = relation['to']
        if ('itemname' in relation):
            itemname = relation['itemname']
        filterq = lambda item: True
        if ('filter' in relation):
            filterq = relation['filter']

        innerPipe = innerBuilder(graph, relation['to'], filterq)

        sub = createSub(assign = lambda item, result: {**item, itemname: result}, reducer = createArrayReducer())
        return sub(relation['relation'], innerPipe)

      result = [descriptorPipe, stopNotFound()]
      nodesToRemove = []
      for relation in filter(lambda item: item['from'] == currentnode, graph['edges']):
          nodeName = relation['to']
          nodesToRemove.append(nodeName)
          if nodeName in availableNodes:
            result += [buildRelation(relation)]
          
      for node in nodesToRemove:
        if node in availableNodes:
          availableNodes.remove(node)

      return createPipe(*result)
  return innerBuilder(graph, currentnode)

### Database Model

In [0]:
studentTable = [
    {'id': 258, 'name': 'George Burden'},
    {'id': 263, 'name': 'Julia Seven'},
    {'id': 396, 'name': 'Anthony Previous'},
]

subjectTable = [
    {'id': 1024, 'name': 'Mathematics'},
    {'id': 1144, 'name': 'English'},
    {'id': 1194, 'name': 'History'},
    {'id': 1086, 'name': 'Physics'},
]

student2SubjectTable = [
    {'id': 1, 'studentId': 258, 'subjectId': 1024},
    {'id': 2, 'studentId': 258, 'subjectId': 1086},
    {'id': 3, 'studentId': 263, 'subjectId': 1144},
    {'id': 4, 'studentId': 263, 'subjectId': 1198},
    {'id': 5, 'studentId': 396, 'subjectId': 1024},
    {'id': 6, 'studentId': 396, 'subjectId': 1144},
]

teacherTable = [
    {'id': 2573, 'name': 'Paul Coleman'},
    {'id': 3168, 'name': 'Igor Mashevic'},
    {'id': 1934, 'name': 'Alex Moon'},
    {'id': 2379, 'name': 'Julia Newman'},
]

subject2Teacher = [
    {'id': 1, 'teacherId': 2573, 'subjectId': 1024},
    {'id': 2, 'teacherId': 2573, 'subjectId': 1086},
    {'id': 3, 'teacherId': 3168, 'subjectId': 1144},
    {'id': 4, 'teacherId': 3168, 'subjectId': 1194},
    {'id': 5, 'teacherId': 1934, 'subjectId': 1144},
    {'id': 6, 'teacherId': 2379, 'subjectId': 1086},
]

def SelectCommand(fromTable, whereLambda):
    for item in filter(whereLambda, fromTable):
        yield item

def SelectWhereId(fromTable, idValue):
  for item in SelectCommand(fromTable, lambda item: item['id'] == idValue):
      return item
  return None

### Database Descriptions as Graph

In [0]:
@convertToPipeFunc()
def getStudentRecord(id):
    return SelectWhereId(fromTable=studentTable, idValue=id)

@convertToPipeFunc()
def getTeacherRecord(id):
    return SelectWhereId(fromTable=teacherTable, idValue=id)

@convertToPipeFunc()
def getSubjectRecord(id):
    return SelectWhereId(fromTable=subjectTable, idValue=id)

@convertToPipeFunc(with_yield = True)
def StudentToSubject(item):
  #if item:
    id = item['id']
    for _ in SelectCommand(student2SubjectTable, lambda record: record['studentId'] == id):
        yield _['subjectId']

@convertToPipeFunc(with_yield = True)
def SubjectToStudent(item):
  #if item:
    id = item['id']
    for _ in SelectCommand(student2SubjectTable, lambda record: record['subjectId'] == id):
        yield _['studentId']

@convertToPipeFunc(with_yield = True)
def SubjectToTeacher(item):
  #if item:
    id = item['id']
    for _ in SelectCommand(subject2Teacher, lambda record: record['subjectId'] == id):
        yield _['teacherId']

@convertToPipeFunc(with_yield = True)
def TeacherToSubject(item):
  #if item:
    id = item['id']
    for _ in SelectCommand(subject2Teacher, lambda record: record['teacherId'] == id):
        yield _['subjectId']


### Usage

In [117]:
import json

grDefinition = {
    'nodes': {
        'Students': getStudentRecord(),
        'Teachers': getTeacherRecord(),
        'Subjects': getSubjectRecord(),
        },
    'edges': [
        {'from': 'Students', 'to': 'Subjects', 'relation': StudentToSubject()},
        {'from': 'Subjects', 'to': 'Students', 'relation': SubjectToStudent()},

        {'from': 'Teachers', 'to': 'Subjects', 'relation': TeacherToSubject()},
        {'from': 'Subjects', 'to': 'Teachers', 'relation': SubjectToTeacher()},
    ]
}

queryStudents = graphToPipe(grDefinition, 'Students')
queryTeachers = graphToPipe(grDefinition, 'Teachers')
querySubjects = graphToPipe(grDefinition, 'Subjects')

studentsIds = [259, 258]
queryStudentsResult = list(queryStudents(studentsIds))
#print(queryStudentsResult)
print(json.dumps(queryStudentsResult, indent=2))

[
  {
    "id": 258,
    "name": "George Burden",
    "Subjects": [
      {
        "id": 1024,
        "name": "Mathematics",
        "Teachers": [
          {
            "id": 2573,
            "name": "Paul Coleman"
          }
        ]
      },
      {
        "id": 1086,
        "name": "Physics",
        "Teachers": [
          {
            "id": 2573,
            "name": "Paul Coleman"
          },
          {
            "id": 2379,
            "name": "Julia Newman"
          }
        ]
      }
    ]
  }
]


### Usage II

In [118]:
studentsIds = [258, 396]
queryStudentsResult = list(queryStudents(studentsIds))
#print(queryStudentsResult)
print(json.dumps(queryStudentsResult, indent=2))

[
  {
    "id": 258,
    "name": "George Burden",
    "Subjects": [
      {
        "id": 1024,
        "name": "Mathematics",
        "Teachers": [
          {
            "id": 2573,
            "name": "Paul Coleman"
          }
        ]
      },
      {
        "id": 1086,
        "name": "Physics",
        "Teachers": [
          {
            "id": 2573,
            "name": "Paul Coleman"
          },
          {
            "id": 2379,
            "name": "Julia Newman"
          }
        ]
      }
    ]
  },
  {
    "id": 396,
    "name": "Anthony Previous",
    "Subjects": [
      {
        "id": 1024,
        "name": "Mathematics",
        "Teachers": [
          {
            "id": 2573,
            "name": "Paul Coleman"
          }
        ]
      },
      {
        "id": 1144,
        "name": "English",
        "Teachers": [
          {
            "id": 3168,
            "name": "Igor Mashevic"
          },
          {
            "id": 1934,
            "name": "Alex 

### Usage III (Teachers)

In [120]:
teachersIds = [2573]
queryTeachersResult = list(queryTeachers(teachersIds))
#print(queryStudentsResult)
print(json.dumps(queryTeachersResult, indent=2))

[
  {
    "id": 2573,
    "name": "Paul Coleman",
    "Subjects": [
      {
        "id": 1024,
        "name": "Mathematics",
        "Students": [
          {
            "id": 258,
            "name": "George Burden"
          },
          {
            "id": 396,
            "name": "Anthony Previous"
          }
        ]
      },
      {
        "id": 1086,
        "name": "Physics",
        "Students": [
          {
            "id": 258,
            "name": "George Burden"
          }
        ]
      }
    ]
  }
]


In [92]:

greatSource = [{'id': 'A'}, {'id': 'B'}, {'id': 'C'}]
smallSource = [{'id': 'a'}, {'id': 'b'}, {'id': 'c'}]
fromGreat2Small = {'A': 'a', 'B': 'b', 'C': 'c'}
fromSmall2Great = {'a': 'A', 'b': 'B', 'c': 'C'}

@convertToPipeFunc2()
def greatToSmallRelation(greatItem):
  print('G:', greatItem)
  return {'idS': fromGreat2Small[greatItem['id']]}

@convertToPipeFunc2()
def smallToGreatRelation(smallItem):
  #print('S:', smallItem)
  return {'id': fromSmall2Great[smallItem['id']]}

@convertToPipeFunc2(with_yield=True)
def smallToGreatRelation2(smallItem):
  #print('S:', smallItem)
  for i in greatSource:
    yield i
  #return {'idG': fromSmall2Great[smallItem['id']]}

@convertToPipeFunc2()
def identityPipe(item, description):
  return {**item, 'description': description}

@convertToPipeFunc2(with_yield=True)
def anyToNumbersRelation(item):
  for i in range(1,3):
    #yield {'value': i}
    yield i

@convertToPipeFunc2()
def numberRecord(item):
  return {'value': item}

@convertToPipeFunc2()
def printItem(item, description):
  print(description, item)
  return item

gr = {
    'nodes': {
        'Great': identityPipe(description = 'Great item'), 
        'Small': identityPipe(description = 'Small item'),
        'Numbers': numberRecord(),
        },
    'edges': [
        {'from': 'Great', 'to': 'Small', 'relation': greatToSmallRelation(), 'itemname' : 'little'},
        {'from': 'Small', 'to': 'Great', 'relation': smallToGreatRelation2(), 'itemname' : 'bigBC', 'filter': lambda item: item[id] > 'A'},
        {'from': 'Small', 'to': 'Great', 'relation': smallToGreatRelation2(), 'itemname' : 'bigA', 'filter': lambda item: item[id] <= 'A'},
        {'from': 'Small', 'to': 'Numbers', 'relation': anyToNumbersRelation(), 'itemname' : 'numbers'},
        {'from': 'Great', 'to': 'Numbers', 'relation': anyToNumbersRelation(), 'itemname' : 'numbers',
         'reducers' :[
            {'name': 'count', 
             'selector': lambda item: item['value'], 
             'reducer': lambda data: reduce(lambda x, y: (x + 1), data, 0)}]},
    ]
    }

def createArrayReducer(accumulator = [], f = lambda accumulator, item: [*accumulator, item]):
  def reducer(gen):
    result = accumulator
    for item in gen:
      result = f(result, item)
    return result
  return reducer

def createArrayReducer2(accumulator = [], f = lambda accumulator, item: [*accumulator, item]):
  result = lambda gen: reduce(f, gen, accumulator)
  return result

import json
graphPipe = graphToPipe(gr, 'Small')
result = list(graphPipe(smallSource))
print(json.dumps(result, indent=2))

[
  {
    "id": "a",
    "description": "Small item",
    "bigBC": [
      {
        "id": "A",
        "description": "Great item",
        "numbers": [
          {
            "value": 1
          },
          {
            "value": 2
          }
        ]
      },
      {
        "id": "B",
        "description": "Great item",
        "numbers": [
          {
            "value": 1
          },
          {
            "value": 2
          }
        ]
      },
      {
        "id": "C",
        "description": "Great item",
        "numbers": [
          {
            "value": 1
          },
          {
            "value": 2
          }
        ]
      }
    ]
  },
  {
    "id": "b",
    "description": "Small item",
    "bigBC": [
      {
        "id": "A",
        "description": "Great item",
        "numbers": [
          {
            "value": 1
          },
          {
            "value": 2
          }
        ]
      },
      {
        "id": "B",
        "description": "Great i

In [93]:
@convertToPipeFunc2()
def copy(item, description):
    """describes and converts item into dictionary."""
    return {'input': item, 'description': description}

@convertToPipeFunc2(with_yield = True)
def copyY(item, description):
    """describes and converts item into dictionary."""
    yield {'input': item, 'description': description}

@convertToPipeFunc2(with_yield = False, with_state = True)
def copyS(item, description, state = 0):
    """describes and converts item into dictionary."""
    return ({'input': item, 'description': description, 'state': state}, state+1)

@convertToPipeFunc2(with_yield = True, with_state = True)
def copyYS(item, description, state = 0):
    """describes and converts item into dictionary."""
    yield ({'input': item, 'description': description, 'state': state}, state+1)



print(copy.__doc__)
print(copyY.__doc__)
print(newcopyY.__doc__)
print(newcopyY2.__doc__)

data = [1, 2, 3]
#%%time
print(list(newcopy(data)))
print(list(newcopy2(data)))
print(list(newcopyY(data)))
print(list(newcopyY2(data)))
print(list(copyS(description = 'desc')(data)))
print(list(copyYS(description = 'desc')(data)))
data = range(0, 100000)
%timeit (list(newcopy(data)))
%timeit (list(newcopy2(data)))


describes and converts item into dictionary.
describes and converts item into dictionary.


NameError: ignored