-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathrx.py
90 lines (69 loc) · 2.76 KB
/
rx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from collections import namedtuple, OrderedDict
from reactivex.subject import Subject
Program = namedtuple('Program', ['sinks', 'sources', 'run'])
def make_sink_proxies(drivers):
''' Build a list of sink proxies. sink proxies are a two-level ordered
dictionary. The first level contains the lst of drivers, and the second
level contains the list of sink proxies for each driver:
drv1-->sink1
| |->sink2
|
drv2-->sink1
|->sink2
'''
sink_proxies = OrderedDict()
if drivers is not None:
for driver_name in drivers._fields:
driver = getattr(drivers, driver_name)
driver_sink = getattr(driver, 'input')
if driver_sink is not None:
driver_sink_proxies = OrderedDict()
for name in driver_sink._fields:
driver_sink_proxies[name] = Subject()
sink_proxies[driver_name] = driver.input(**driver_sink_proxies)
return sink_proxies
def call_drivers(drivers, sink_proxies, source_factory):
sources = OrderedDict()
for name in drivers._fields:
try:
source = None
# Source drivers have no sink
if name in sink_proxies:
source = getattr(drivers, name).call(sink_proxies[name])
else:
source = getattr(drivers, name).call()
# sink drivers have no source
if source is not None:
sources[name] = source
except Exception as e:
raise RuntimeError('Unable to initialize {} driver'.format(name)) from e
if source_factory is None:
return None
return source_factory(**sources)
def subscribe_sinks(sinks, sink_proxies, scheduler):
for driver_name in sinks._fields:
driver = getattr(sinks, driver_name)
for sink_name in driver._fields:
getattr(driver, sink_name).subscribe(
getattr(sink_proxies[driver_name], sink_name),
scheduler=scheduler)
def setup(entry_point, drivers):
sink_proxies = make_sink_proxies(drivers)
sources = call_drivers(drivers, sink_proxies, entry_point.input)
sinks = entry_point.call(sources)
def _run(scheduler=None):
subscribe_sinks(sinks, sink_proxies, scheduler)
def dispose():
return
return dispose
return Program(sinks=sinks, sources=sources, run=_run)
def run(entry_point, drivers):
'''
Takes a function and circularly connects it to the given collection of
driver functions.
parameters:
- entry_point (Component): the function to call once the streams are configured.
- drivers: a list of Component namedtuple where each Component is a driver.
'''
program = setup(entry_point, drivers)
return program.run()