## Stream

## Review the Streams blog post...

In [49]:
from streams import Stream, combine_latest
source = Stream()
from operator import add, sub, mul

In [5]:
inc = lambda x: x + 1
isodd = lambda x: bool(x % 2)
iseven = lambda x: not isodd(x)

In [46]:
s = source.map(inc)
evens = s.filter(iseven)
even_total = evens.accumulate(add)

odds = s.filter(isodd)
odd_total = odds.accumulate(sub)

s2 = combine_latest(odd_total, even_total).map(print)

## Handle functions that expect multiple arguments

We could formulate all our functions to take in dictionaries and extract the keys that they need, but that imposes restrictions on user code. Can non-invasively wrap user functions with arbitrary signatures?

In [92]:
import inspect
import functools

def dec(func):
    @functools.wraps(func)
    def inner(bound_args):
        return func(**bound_args.arguments)
    return inner

def multiply(a, b, c=1): 
    return a * b * c
 
sin = Stream()
s2 =  sin.map(dec(multiply))
sout = s2.map(print)

sin.emit(inspect.signature(multiply).bind(2, 3, c=1))

6


[]

What happens when we chain multiple user functions?

In [118]:
import inspect
import functools
import numpy as np

def dec(func):
    @functools.wraps(func)
    def inner(kwargs):
        # Pass the kwargs that match the function signature.
        params = inspect.signature(func).parameters
        ret = func(**{argname: kwargs[argname] for argname in params})
        #
        first_param, *_ = list(params)
        kwargs_copy = kwargs.copy()
        kwargs_copy[first_param] = ret
        return kwargs_copy
    return inner

def multiply(img, mask):
    print('multiple')
    return img*mask
 
def ctsatcenter(img, center):
    print('cts')
    return img[center]

sin = Stream()
s2 =  sin.map(dec(multiply))
s3 = s2.map(dec(ctsatcenter))
sout = s3.map(print)
 
#sin = Stream()
#s2 = sin.select(('image', None), ('mask', None))
#s_attr = sin.get_attributes().select('beam_origin')
#s3 = s2.merge(s_attr)
#s4 = s3.map(ctsatcenter)
#s4.map(print) # sink to printing the result
 
 
#sdoc = StreamDoc(kwargs=(dict(image=img1, mask=mask1)), attributes=dict(name="Bob", beam_origin=(100,100)))
#sin.emit(sdoc)
x = dict(img=np.ones((3, 3)), mask=np.ones((3, 3)), center=(1, 1))
sin.emit(x)

multiple
cts
{'img': 1.0, 'mask': array([[ 1.,  1.,  1.],
       [ 1.,  1.,  1.],
       [ 1.,  1.,  1.]]), 'center': (1, 1)}


[]