Useful links:
* http://stackoverflow.com/questions/9708902/in-practice-what-are-the-main-uses-for-the-new-yield-from-syntax-in-python-3
* http://www.dabeaz.com/coroutines/
* http://www.slideshare.net/dabeaz/python-generator-hacking
* https://www.youtube.com/watch?v=aurOB4qYuFM&feature=youtu.be&t=53m8s
* http://excess.org/article/2013/02/itergen2/

Generators: 
- yield single data
- can get data from other generators via next()
- "pull" data through the chain
- pass *inputs* to their constructor  (**pull data**)
-  **multiple inputs** (via `next()`) / **single output** (via `yield`)

Coroutines:
- you ``send()`` them data which they receive via ``yield`` 
- they can send multiple outputs to sub-coroutines, so branching can be more fancy 
  - could have a routine called ``thinsplit(out,thinout,1000)`` that sends most events through to `out` , and every 1000th event to a `display` 
- need to ``send(None)`` the first time to prime them, but can wrap in a decorator to do that automatically
- pass *outputs* to their constructor (**push data**)
- **single input** (via `(yield)`) / **multiple outputs** (via `send(out_n)`)


In both cases:
- no need for ``init()`` and ``process()`` routines, both are encapsulated in one

Asyncio:
- allows code to do stuff while some part of it is blocking on i/o.
defines an event loop that runs fine-grained processes (not event by event in a data sense!)
- coroutines in asyncio are called using `yield from routine(...)`, which should be some operation that sometimes blocks.  
- Increases performance if there are a lot of blocking operations running in "parallel" in a single thread
- while one coroutine is waiting for I/O, other unrelated ones can still execute. 
- basically removes I/O bottlenecks
- **question**: is this useful for us? Or are we more computation dominated? If all our I/O bottleneck happens at the start of a pipeline, then this probably doesn't help much...

In [81]:
def coroutine(func):
    """ 
    decorator to automatically "prime" the coroutine (e.g. to 
    send None the first time, to get the init parts of the coroutine to execute)
    """ 
    def start(*args, **kwargs):
        cr = func(*args, **kwargs)
        cr.send(None)
        return cr
    return start

In [205]:
def pushdata( pipeline, num=10 ):
    """ 
    drive data through the output pipeline
    this is not a coroutine!
    """
    print("Init gen")
    try:
        for ii in range(num):
            print("  SEND",ii)
            pipeline.send(ii)
    except StopIteration:
        print("pushdata: Something asked me to stop!")
        
@coroutine
def co_process( output ):
    print("Init process")
    try:
        while True:
            data = (yield)
            data += 100        
            #print("  process send",data)
            output.send(data)
    except GeneratorExit:
        print("PROCESS exited")
        output.close()

@coroutine
def co_filter( output, otheroutput, every=3 ):
    while True:
        data = (yield)
        if int(data) % every != 0:
            output.send(data)
        else:
            otheroutput.send(data)

@coroutine
def co_split( output, otheroutput, every=2 ):
    count = 0
    while True:
        data = (yield)
        if count % every == 0:
            otheroutput.send(data)
        output.send(data) #send through
        count += 1

@coroutine    
def co_sink(name):
    print("Init sink",name)
    try:
        while True:
            data = (yield)
            print( "{0:>10s} RECEIVED: {1}".format(name,data ))
    except GeneratorExit:
        print("CLOSING SINK",name)

In [206]:
pipeline = co_process( 
                co_filter( 
                        co_split( co_sink("first"), co_sink("display")), 
                        co_sink("second") ))
# run the loop by driving it with the source 
pushdata(pipeline,10)
pipeline.close()


Init sink first
Init sink display
Init sink second
Init process
Init gen
  SEND 0
   display RECEIVED: 100
     first RECEIVED: 100
  SEND 1
     first RECEIVED: 101
  SEND 2
    second RECEIVED: 102
  SEND 3
   display RECEIVED: 103
     first RECEIVED: 103
  SEND 4
     first RECEIVED: 104
  SEND 5
    second RECEIVED: 105
  SEND 6
   display RECEIVED: 106
     first RECEIVED: 106
  SEND 7
     first RECEIVED: 107
  SEND 8
    second RECEIVED: 108
  SEND 9
   display RECEIVED: 109
     first RECEIVED: 109
PROCESS exited
CLOSING SINK first
CLOSING SINK display
CLOSING SINK second


In [207]:
@coroutine
def co_stopif(condition, target):
    while True:
        data = (yield)
        if condition(data):
            break
        else:
            target.send(data)
    print("STOP condition met, stopping")
    target.close()
   
            

pipeline = co_process( 
                co_stopif( lambda x: x>104, 
                    co_filter( 
                        co_split( co_sink("first"), co_sink("display")), 
                        co_sink("second") )
                    )
            )

pushdata(pipeline, 10 )

Init sink first
Init sink display
Init sink second
Init process
Init gen
  SEND 0
   display RECEIVED: 100
     first RECEIVED: 100
  SEND 1
     first RECEIVED: 101
  SEND 2
    second RECEIVED: 102
  SEND 3
   display RECEIVED: 103
     first RECEIVED: 103
  SEND 4
     first RECEIVED: 104
  SEND 5
STOP condition met, stopping
CLOSING SINK first
CLOSING SINK display
CLOSING SINK second
pushdata: Something asked me to stop!


doing both input and output using yield rather than send:

In [224]:
def running_avg():
    "coroutine that accepts numbers and yields their running average"
    total = float((yield))
    count = 1
    while True:
        i = yield total / count
        count += 1
        total += i
 
def sender(target):
    target.send(None)
    for ii in range(10):
        yield target.send(ii)

In [225]:
r = running_avg()
r.send(None)
print(r.send(10))
print(r.send(100))
print(r.send(100))

for x in range(10):
    print( r.send(x*10) )    

10.0
55.0
70.0
52.5
44.0
40.0
38.57142857142857
38.75
40.0
42.0
44.54545454545455
47.5
50.76923076923077


In [226]:
r= running_avg()
for x in sender(running_avg()):
    print( x )



0.0
0.5
1.0
1.5
2.0
2.5
3.0
3.5
4.0
4.5


In [227]:
next(d)

StopIteration: 

Input/Output data:
-----------------

Should be a dictionary, with possible nesting. Values can be scalars or arrays, but should avoid more complex types.  A flatten routine similar to below will be used to transform it to columns in an output table:

In [247]:
import collections

def flatten_dict(d, parent_key='', sep='.'):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, collections.MutableMapping):
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)



In [248]:
d = dict(image=[1,2,3,4,5], hillasparams=dict(length=1,width=0.2,centroid=dict(x=0.2,y=1.0)) )
print(" Original:",d)
print("Flattened:",flatten_dict(d))

 Original: {'hillasparams': {'length': 1, 'width': 0.2, 'centroid': {'y': 1.0, 'x': 0.2}}, 'image': [1, 2, 3, 4, 5]}
Flattened: {'hillasparams.length': 1, 'hillasparams.centroid.x': 0.2, 'hillasparams.width': 0.2, 'hillasparams.centroid.y': 1.0, 'image': [1, 2, 3, 4, 5]}


Another possibility would be record arrays...

In [288]:
import numpy as np
from numpy.lib import recfunctions
a = np.array( [([1,2,3],2,3),([1,2,3],5,6)], dtype=[('x','3d'),('y',int),('z',float)] )


In [289]:
r = a.view(np.recarray)
print(r.x)
print(r.y)

[[ 1.  2.  3.]
 [ 1.  2.  3.]]
[2 5]


In [314]:
rr =recfunctions.append_fields(r, names="B", data=100, asrecarray=True,usemask=False )

TypeError: len() of unsized object

In [312]:
rr

rec.array([([1.0, 2.0, 3.0], 2, 3.0, [100.0, 2e-323, 2.5e-323]),
       ([1.0, 2.0, 3.0], 5, 6.0, [1e+20, 1e+20, 1e+20])], 
      dtype=[('x', '<f8', (3,)), ('y', '<i8'), ('z', '<f8'), ('B', '<f8', (3,))])