Skip to content

Commit

Permalink
add test cases, bugfix in reduce multicast operator, add flowable ope…
Browse files Browse the repository at this point in the history
…rator mixin
  • Loading branch information
MichaelSchneeberger committed Dec 13, 2019
1 parent 198c507 commit 735a3da
Show file tree
Hide file tree
Showing 107 changed files with 2,405 additions and 1,626 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ able to back-pressure on an `on_next` method call.

A `MultiCast` is used when a *Flowable* emits elements to more than
one `Observer`, and can be though of a nested *Flowable* of type
`Flowable[T[Flowable]]`.
`rx.Observable[T[Flowable]]`.

In RxPY, there are operators called `publish` and `share`,
which create a multicast observable that can then be subscribed
Expand Down Expand Up @@ -127,7 +127,7 @@ The previous code outputs:
To get rid of this drawback, *rxbackpressure* introduces the `MultiCast`
type.
A `MultiCast` represents a collection of *Flowable* and can
be though of as `Flowable[T[Flowable]]` where T is defined by the user.
be though of as `rx.Observable[T[Flowable]]` where T is defined by the user.
Operators on *MultiCasts* are exposed through `rxbp.multicast.op`.

```python
Expand Down Expand Up @@ -278,7 +278,6 @@ by a *Flowable*
- `map` - maps each *Multicast* value by applying a given function.
- `merge` - merges two or more *Multicast* streams together.
- `reduce` - creates a *Multicast* that emits a single value
- `share` - define an expressions with the current *Multicast* that contains the *Multicast* several times
- `zip` - zips *Multicast*s emitting a single *Flowable* to a *Multicast* emitting a single value

### Other operators
Expand Down
15 changes: 15 additions & 0 deletions examples/matchexample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import rxbp

f1 = rxbp.range(10).pipe(
rxbp.op.filter(lambda v: v%2),
)

f2 = rxbp.range(10)

f3 = rxbp.match(f1, f2).pipe(
rxbp.op.filter(lambda v: v[0]%5),
)

result = rxbp.match(f3, f2).run()

print(result)
30 changes: 30 additions & 0 deletions examples/multicast/lifeexample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import rxbp
from rxbp.multicast.multicast import MultiCast


def and_zip(multicast: MultiCast):
return rxbp.multicast.zip(
multicast,
multicast,
).pipe(
rxbp.multicast.op.map(lambda t: t[0].zip(t[1]))
)


def merge_and_reduce(multicast: MultiCast):
return rxbp.multicast.merge(
multicast,
multicast,
).pipe(
rxbp.multicast.op.reduce(),
)


result = rxbp.multicast.from_flowable(rxbp.range(10)).pipe(
rxbp.multicast.op.lift(lambda m: m),
rxbp.multicast.op.map(and_zip),
rxbp.multicast.op.map(merge_and_reduce),
rxbp.multicast.op.flat_map(lambda m: m),
).to_flowable().run()

print(result)
5 changes: 3 additions & 2 deletions examples/multicast/reduceexample.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
# emits a single element: {'val1': flowable1, 'val2': flowable2} where
# flowable1 emits all elements associated to 'val1' and flowable2 emits
# all elements associated to 'val2'
base1 = {'val1': rxbp.range(5), 'val2': rxbp.range(5)}
base2 = {'val1': rxbp.range(3), 'val2': rxbp.range(3)}
base1 = {'val1': rxbp.range(5), 'val2': rxbp.range(5).map(lambda v: v+100)}
base2 = {'val1': rxbp.range(3).map(lambda v: v+10), 'val2': rxbp.range(3).map(lambda v: v+110)}

result = rxbp.multicast.from_flowable(base1).pipe(
rxbp.multicast.op.merge(
rxbp.multicast.from_flowable(base2)
),
# rxbp.multicast.op.debug('d1'),
rxbp.multicast.op.reduce(),
rxbp.multicast.op.map(lambda v: v['val1'].zip(v['val2'])),
).to_flowable().run()
Expand Down
2 changes: 1 addition & 1 deletion examples/multicast/shareexample.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ def merge_and_reduce(multicast: MultiCast):
rxbp.multicast.op.share(merge_and_reduce),
).to_flowable().run()

print(result)
print(result)
4 changes: 2 additions & 2 deletions rxbp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from . import op
from . import multicast
from . import multicast

from . import op
from .source import from_iterable, from_range, from_list, return_value, from_rx, concat, zip, match, \
merge, empty

# from .multicast.source import from_flowables as to_multicast


Expand Down
3 changes: 0 additions & 3 deletions rxbp/ack/acksubject.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import threading

from rx.core import typing

from typing import Any, Optional, List

from rx.disposable import Disposable
Expand Down
2 changes: 1 addition & 1 deletion rxbp/ack/merge.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from rxbp.ack.ackimpl import Continue, Stop
from rxbp.ack.ackbase import AckBase
from rxbp.ack.ackimpl import Continue, Stop
from rxbp.ack.map import _map
from rxbp.ack.zip import _zip

Expand Down
2 changes: 1 addition & 1 deletion rxbp/ack/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def _zip(*args: AckBase) -> AckBase:
class ZipAck(AckBase):
def subscribe(self, single: Single):
n = len(sources)
queues : List[List] = [[] for _ in range(n)]
queues: List[List] = [[] for _ in range(n)]

def next():
if all([len(q) for q in queues]):
Expand Down
65 changes: 37 additions & 28 deletions rxbp/flowable.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,41 @@
from typing import Callable, Any, Generic, Tuple

import rx
import rxbp
from rxbp.flowablebase import FlowableBase
from rxbp.flowableopmixin import FlowableOpMixin
from rxbp.flowables.anonymousflowablebase import AnonymousFlowableBase
from rxbp.flowables.bufferflowable import BufferFlowable

from rxbp.flowables.concatflowable import ConcatFlowable
from rxbp.flowables.controlledzipflowable import ControlledZipFlowable
from rxbp.flowables.debugflowable import DebugFlowable
from rxbp.flowables.executeonflowable import ExecuteOnFlowable
from rxbp.flowables.fastfilterflowable import FastFilterFlowable
from rxbp.flowables.filterflowable import FilterFlowable
from rxbp.flowables.firstflowable import FirstFlowable
from rxbp.flowables.flatmapflowable import FlatMapFlowable
from rxbp.flowables.mapflowable import MapFlowable
from rxbp.flowables.matchflowable import MatchFlowable
from rxbp.flowables.mergeflowable import MergeFlowable
from rxbp.flowables.observeonflowable import ObserveOnFlowable
from rxbp.flowables.fastfilterflowable import FastFilterFlowable
from rxbp.flowables.pairwiseflowable import PairwiseFlowable
from rxbp.flowables.refcountflowable import RefCountFlowable
from rxbp.flowables.repeatfirstflowable import RepeatFirstFlowable
from rxbp.flowables.scanflowable import ScanFlowable
from rxbp.flowables.subscribeonflowable import SubscribeOnFlowable
from rxbp.flowables.tolistflowable import ToListFlowable
from rxbp.flowables.zip2flowable import Zip2Flowable
from rxbp.flowables.zipwithindexflowable import ZipWithIndexFlowable
from rxbp.pipe import pipe
from rxbp.scheduler import Scheduler
from rxbp.selectors.bases import Base
from rxbp.subscriber import Subscriber
from rxbp.subscription import Subscription, SubscriptionInfo
from rxbp.toiterator import to_iterator
from rxbp.torx import to_rx
from rxbp.flowables.controlledzipflowable import ControlledZipFlowable
from rxbp.flowables.filterflowable import FilterFlowable
from rxbp.flowables.flatmapflowable import FlatMapFlowable
from rxbp.flowables.refcountflowable import RefCountFlowable
from rxbp.flowables.zip2flowable import Zip2Flowable
from rxbp.scheduler import Scheduler
from rxbp.subscriber import Subscriber
from rxbp.flowablebase import FlowableBase
from rxbp.typing import ValueType


class Flowable(Generic[ValueType], FlowableBase[ValueType]):
class Flowable(FlowableOpMixin, FlowableBase, Generic[ValueType]):
""" A `Flowable` implements a `subscribe` method allowing to describe a
data flow from source to sink. The "description" is
done with *rxbackpressure* operators exposed to `rxbp.op`.
Expand All @@ -55,36 +55,43 @@ class Flowable(Generic[ValueType], FlowableBase[ValueType]):
able to back-pressure an `on_next` method call.
"""

def __init__(self, flowable: FlowableBase[ValueType]):
def __init__(self, flowable: FlowableBase):
super().__init__()

self.subscriptable = flowable

def unsafe_subscribe(self, subscriber: Subscriber) -> Subscription:
return self.subscriptable.unsafe_subscribe(subscriber=subscriber)

def buffer(self, buffer_size: int):
def buffer(self, buffer_size: int) -> 'Flowable':
flowable = BufferFlowable(source=self, buffer_size=buffer_size)
return Flowable(flowable)

def concat(self, *sources: FlowableBase):
def concat(self, *sources: FlowableBase) -> 'Flowable':
all_sources = itertools.chain([self], sources)
flowable = ConcatFlowable(sources=all_sources)
return Flowable(flowable)

def controlled_zip(self, right: FlowableBase,
request_left: Callable[[Any, Any], bool] = None,
request_right: Callable[[Any, Any], bool] = None,
match_func: Callable[[Any, Any], bool] = None) -> 'Flowable[ValueType]':
def controlled_zip(
self,
right: FlowableBase,
request_left: Callable[[Any, Any], bool] = None,
request_right: Callable[[Any, Any], bool] = None,
match_func: Callable[[Any, Any], bool] = None,
) -> 'Flowable[ValueType]':
""" Creates a new Flowable from two Flowables by combining their item in pairs in a strict sequence.
:param selector: a mapping function applied over the generated pairs
:return: zipped Flowable
"""

flowable = ControlledZipFlowable(left=self, right=right, request_left=request_left,
request_right=request_right,
match_func=match_func)
flowable = ControlledZipFlowable(
left=self,
right=right,
request_left=request_left,
request_right=request_right,
match_func=match_func,
)
return Flowable(flowable)

def debug(self, name=None, on_next=None, on_subscribe=None, on_ack=None, on_raw_ack=None, on_ack_msg=None):
Expand Down Expand Up @@ -125,9 +132,11 @@ def filter_with_index(self, predicate: Callable[[Any, int], bool]) -> 'Flowable[
:return: filtered Flowable
"""

flowable = self.zip_with_index() \
.filter(lambda t2: predicate(t2[0], t2[1])) \
.map(lambda t2: t2[0])
flowable = self.pipe(
rxbp.op.zip_with_index(),
rxbp.op.filter(lambda t2: predicate(t2[0], t2[1])),
rxbp.op.map(lambda t2: t2[0]),
)

return flowable

Expand Down Expand Up @@ -175,7 +184,7 @@ def _(right: Flowable = None, left: Flowable = source):
def inner_result_selector(v1: Any, v2: Tuple[Any]):
return (v1,) + v2

flowable = MatchFlowable(left=left, right=right, func=inner_result_selector)
flowable = MatchFlowable(left=left, right=right, func=inner_result_selector)
return flowable

yield _
Expand Down Expand Up @@ -227,7 +236,7 @@ def pairwise(self):

return Flowable(PairwiseFlowable(source=self))

def pipe(self, *operators: Callable[[FlowableBase], FlowableBase]) -> 'Flowable':
def pipe(self, *operators: Callable[[FlowableOpMixin], FlowableOpMixin]) -> 'Flowable':
return Flowable(pipe(*operators)(self))

def run(self, scheduler: Scheduler = None):
Expand Down Expand Up @@ -299,7 +308,7 @@ def _(right: Flowable = None, left: Flowable = source):
def inner_result_selector(v1: Any, v2: Tuple[Any]):
return (v1,) + v2

flowable = Zip2Flowable(left=left, right=right, func=inner_result_selector)
flowable = Zip2Flowable(left=left, right=right, func=inner_result_selector)
return flowable

yield _
Expand Down
16 changes: 8 additions & 8 deletions rxbp/flowablebase.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import traceback
from abc import ABC, abstractmethod
from typing import Callable, Any, Tuple, Generic
from typing import Callable, Any

from rx.disposable import Disposable
from rxbp.ack.ackimpl import continue_ack, stop_ack
from rxbp.flowableopmixin import FlowableOpMixin
from rxbp.observer import Observer
from rxbp.observerinfo import ObserverInfo
from rxbp.scheduler import Scheduler
from rxbp.schedulers.trampolinescheduler import TrampolineScheduler
from rxbp.subscription import Subscription
from rxbp.subscriber import Subscriber
from rxbp.typing import ValueType
from rxbp.subscription import Subscription


class FlowableBase(Generic[ValueType], ABC):
class FlowableBase(ABC):
""" See `Flowable` for more information.
Two class are used to implement `Flowable`. `FlowableBase` implements the basic interface including the `subscribe`
Expand All @@ -30,10 +30,10 @@ def subscribe(
) -> Disposable:
""" Calling `subscribe` method starts some kind of process that
start a chain reaction where downsream `Flowables`
call the `subscribe` method of their linked upstream `Flowable` until
the sources start emitting data. Once a `Flowable` is subscribed, we
allow it to have mutable states where it make sense.
start a chain reaction where downsream `Flowables`
call the `subscribe` method of their linked upstream `Flowable` until
the sources start emitting data. Once a `Flowable` is subscribed, we
allow it to have mutable states where it make sense.
"""

subscribe_scheduler_ = subscribe_scheduler or TrampolineScheduler()
Expand Down
6 changes: 3 additions & 3 deletions rxbp/flowableoperator.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import Callable

from rxbp.flowablebase import FlowableBase
from rxbp.flowableopmixin import FlowableOpMixin


class FlowableOperator:
def __init__(self, func: Callable[[FlowableBase], FlowableBase]):
def __init__(self, func: Callable[[FlowableOpMixin], FlowableOpMixin]):
self.func = func

def __call__(self, flowable: FlowableBase):
def __call__(self, flowable: FlowableOpMixin):
return self.func(flowable)

0 comments on commit 735a3da

Please sign in to comment.