Skip to content

Commit

Permalink
add reduce operator, from_iterable sends its elements in one single b…
Browse files Browse the repository at this point in the history
…atch, add optimal control example
  • Loading branch information
MichaelSchneeberger committed Jan 18, 2020
1 parent 4dadfd2 commit 8950e4b
Show file tree
Hide file tree
Showing 51 changed files with 1,493 additions and 467 deletions.
12 changes: 6 additions & 6 deletions README.md
Expand Up @@ -136,7 +136,7 @@ import rxbp

f = rxbp.multicast.from_flowable(rxbp.range(10)).pipe(
rxbp.multicast.op.map(lambda base: base.pipe(
rxbp.op.connect_flowable(base.pipe(
rxbp.op.collect_flowables(base.pipe(
rxbp.op.map(lambda v: v + 1),
rxbp.op.filter(lambda v: v % 2 == 0)),
),
Expand Down Expand Up @@ -167,7 +167,7 @@ are called *selectors* and propagated internally when subscribing
to a *Flowable*.

If two Flowables have the same base,
they should match in the sense of the `connect_flowable` operator,
they should match in the sense of the `collect_flowables` operator,
e.g. every pair of elements that get zipped from the two
Flowables should belong together.

Expand Down Expand Up @@ -196,10 +196,10 @@ When to use a Flowable, when RxPY Observable?

A *Flowable* is used when some asynchronous stage cannot process the
data fast enough, or needs to synchronize the data with some other event.
Let's take the `connect_flowable` operator for instance. It gets elements from
Let's take the `collect_flowables` operator for instance. It gets elements from
two or more sources and emits a tuple once it received one
element from each source. But what happens if one source emits the
elements before the others do? Without back-pressure, the `connect_flowable` operation
elements before the others do? Without back-pressure, the `collect_flowables` operation
has to buffer the elements until it receives data from the other sources.
This might be ok depending on how much data needs to be buffered. But
often we can not risk having too much data buffered somewhere in our
Expand Down Expand Up @@ -242,7 +242,7 @@ index in addition to the value
- `controlled_zip` - combines the elements emitted by two Flowables
into pairs in a controlled sequence.
- `match` - combines the elements emitted by two Flowables into matching pairs.
- `connect_flowable` - combines the elements emitted by two Flowables into pairs in
- `collect_flowables` - combines the elements emitted by two Flowables into pairs in
a strict sequence.

### Other operators
Expand Down Expand Up @@ -277,7 +277,7 @@ by a *Flowable*
which takes MultiCast context (see `share` operator) as argument
- `merge` - merges two or more *Multicast* streams together.
- `reduce_flowable` - creates a *Multicast* that emits a single value
- `connect_flowable` - zips *Multicast*s emitting a single *Flowable* to a *Multicast* emitting a single value
- `collect_flowables` - zips *Multicast*s emitting a single *Flowable* to a *Multicast* emitting a single value

### Other operators

Expand Down
8 changes: 4 additions & 4 deletions docs/20191002_multicasting.md
Expand Up @@ -62,15 +62,15 @@ exposes a *multi-cast Flowable* as an argument to the function `func`. Inside
the function `func`, the *multi-cast Flowable* can used multiple
times as it the case for the *shared Observable* in *RxPY*. The following
example zips the elements from the same source but skips every second
element on the second connect_flowable input.
element on the second collect_flowables input.

``` python
import rxbp.depricated
import rxbp

rxbp.range(10).pipe(
rxbp.depricated.share(lambda f1: f1.pipe(
rxbp.op.connect_flowable(f1.pipe(
rxbp.op.collect_flowables(f1.pipe(
rxbp.op.filter(lambda v: v % 2 == 0)),
)
)),
Expand Down Expand Up @@ -107,7 +107,7 @@ tunneled_shared = rxbp.range(10).pipe(
# consume the shared Flowable in another place
tunneled_shared.pipe(
rxbp.op.flat_map(lambda f1: f1.pipe(
rxbp.op.connect_flowable(f1.pipe(
rxbp.op.collect_flowables(f1.pipe(
rxbp.op.filter(lambda v: v % 2 == 0)),
),
)),
Expand Down Expand Up @@ -151,7 +151,7 @@ source2 = source1.pipe(
rxbp.op.share(lambda source: source.pipe(
rxbp.op.flat_map(
lambda fdict: fdict["input"].pipe(
rxbp.op.connect_flowable(fdict["input"].pipe(
rxbp.op.collect_flowables(fdict["input"].pipe(
rxbp.op.filter(lambda v: v % 2 == 0)),
),
)
Expand Down
2 changes: 1 addition & 1 deletion examples/examplefromreadmemulticast.py
Expand Up @@ -3,7 +3,7 @@
It creates a MultiCast object from a Flowable by using the `from_flowable`
function. The Flowable boxed into a MultiCast object can now be
subscribed to more than one observer. In this example, we use it to connect_flowable
subscribed to more than one observer. In this example, we use it to collect_flowables
it with itself. The result is that a new Flowable is created that emits
paired elements from the same source.
Expand Down
8 changes: 5 additions & 3 deletions examples/matchoperator/matchrecords.py
Expand Up @@ -18,7 +18,7 @@


fdict = {
't_main': rxbp.from_rx(subject, base='t_main'), #.range(0, 1000, base='t_main').map(lambda i: 10 * i/1000 * 1.01 + 1.4),
't_main': rxbp.from_rx(subject, base='t_main'),
't_1': rxbp.range(0, 100, base='t_1').map(lambda i: 10 * i/100 * 0.9 + 2.1),
't_2': rxbp.range(0, 100, base='t_2').map(lambda i: 10 * i/100 * 1.05 + 0.9),
}
Expand Down Expand Up @@ -137,14 +137,16 @@ def sel_flow(fdict: Dict[str, Flowable]):
rxbp.multicast.op.map(interpolate_v1),
rxbp.multicast.op.map(interpolate_v2),
rxbp.multicast.op.map(sel_flow),
).to_flowable().subscribe(lambda v: result.append(v))
).to_flowable().subscribe(
on_next=lambda v: result.append(v),
on_completed=lambda: print('completed')
)


rx.range(0, 1000).pipe(
rxop.map(lambda i: 10 * i/1000 * 1.01 + 1.4),
).subscribe(subject)


t, v1, v2 = list(zip(*result))

pyplot.plot(t, v1)
Expand Down
2 changes: 1 addition & 1 deletion examples/multicast/liftexample.py
Expand Up @@ -10,7 +10,7 @@


def connect_and_zip(multicast: MultiCast):
return rxbp.multicast.connect_flowable(
return rxbp.multicast.collect_flowables(
multicast,
multicast,
).pipe(
Expand Down
33 changes: 33 additions & 0 deletions examples/multicast/shareexample.py
@@ -0,0 +1,33 @@
"""
This example demonstrates the use-case of the share operator defined
on MultiCast objects.
"""

import rxbp
from rxbp.multicast.multicast import MultiCast


def connect_and_zip(multicast: MultiCast):
return rxbp.multicast.collect_flowables(
multicast,
multicast,
).pipe(
rxbp.multicast.op.map(lambda t: t[0].zip(t[1]))
)


def merge_and_reduce(multicast: MultiCast):
return rxbp.multicast.merge(
multicast,
multicast,
).pipe(
rxbp.multicast.op.reduce_flowable(),
)


result = rxbp.multicast.from_flowable(rxbp.range(10)).pipe(
rxbp.multicast.op.share(connect_and_zip),
rxbp.multicast.op.share(merge_and_reduce),
).to_flowable().run()

print(result)
@@ -1,6 +1,6 @@
"""
This example demonstrates the use-case of the connect_flowable operator defined
on MultiCast objects. The connect_flowable operator pairs Flowables send
This example demonstrates the use-case of the collect_flowables operator defined
on MultiCast objects. The collect_flowables operator pairs Flowables send
through the MultiCast object at different point in time.
This example merges two Flowables by first zipping two MultiCast
Expand All @@ -9,7 +9,7 @@

import rxbp

result = rxbp.multicast.connect_flowable(
result = rxbp.multicast.collect_flowables(
rxbp.multicast.from_flowable(rxbp.range(4)),
rxbp.multicast.from_flowable(rxbp.range(4)),
).pipe(
Expand Down
3 changes: 2 additions & 1 deletion examples/multicastandflowable/loopflowableexample.py
Expand Up @@ -49,6 +49,7 @@
rxbp.multicast.op.loop_flowable(
func=lambda mc: mc.pipe(
rxbp.multicast.op.map(lambda t: {
**t,
'a': t['input'].pipe(
rxbp.op.zip(t['a'], t['b']),
rxbp.op.map(lambda v: sum(v)),
Expand All @@ -62,7 +63,7 @@
),
initial={'a': 1, 'b': 2},
),
rxbp.multicast.op.map(lambda t: t['a']),
# rxbp.multicast.op.map(lambda t: t['a']),
).to_flowable().run()

print(result)

0 comments on commit 8950e4b

Please sign in to comment.