diff --git a/examples/multicast/mergeexample.py b/examples/multicast/mergeexample.py index d23f73a..3f91d86 100644 --- a/examples/multicast/mergeexample.py +++ b/examples/multicast/mergeexample.py @@ -5,7 +5,6 @@ result = rxbp.multicast.merge(m1, m2).pipe( - # rxbp.multicast.op.debug('d1'), rxbp.multicast.op.reduce(), ).to_flowable().run() diff --git a/examples/multicast/reduceexample.py b/examples/multicast/reduceexample.py index b0ca0b7..187ad88 100644 --- a/examples/multicast/reduceexample.py +++ b/examples/multicast/reduceexample.py @@ -22,7 +22,6 @@ rxbp.multicast.op.merge( rxbp.multicast.from_flowable(base2) ), - # rxbp.multicast.op.debug('d1'), rxbp.multicast.op.reduce(), rxbp.multicast.op.map(lambda v: v['val1'].zip(v['val2'])), ).to_flowable().run() diff --git a/rxbp/multicast/collectablemulticasts/collectablemulticast.py b/rxbp/multicast/collectablemulticasts/collectablemulticast.py index bed9452..747775f 100644 --- a/rxbp/multicast/collectablemulticasts/collectablemulticast.py +++ b/rxbp/multicast/collectablemulticasts/collectablemulticast.py @@ -74,8 +74,11 @@ def map(self, func: Callable[[MultiCastValue], MultiCastValue]): def pipe(self, *operators: MultiCastOperator) -> 'CollectableMultiCast': return reduce(lambda acc, op: op(acc), operators, self) - def reduce(self): - main = self._main.reduce() + def reduce( + self, + maintain_order: bool = None, + ): + main = self._main.reduce(maintain_order=maintain_order) return CollectableMultiCast(main=main, collected=self._collected) def share(self): diff --git a/rxbp/multicast/flowables/flatmapnobackpressureflowable.py b/rxbp/multicast/flowables/flatconcatnobackpressureflowable.py similarity index 66% rename from rxbp/multicast/flowables/flatmapnobackpressureflowable.py rename to rxbp/multicast/flowables/flatconcatnobackpressureflowable.py index 3d21737..ff3b4b2 100644 --- a/rxbp/multicast/flowables/flatmapnobackpressureflowable.py +++ b/rxbp/multicast/flowables/flatconcatnobackpressureflowable.py @@ -1,12 +1,12 @@ from typing import Any, Callable from rxbp.flowablebase import FlowableBase -from rxbp.multicast.observables.flatmapnobackpressureobservable import FlatMapNoBackpressureObservable +from rxbp.multicast.observables.flatconcatnobackpressureobservable import FlatConcatNoBackpressureObservable from rxbp.subscriber import Subscriber from rxbp.subscription import Subscription, SubscriptionInfo -class FlatMapNoBackpressureFlowable(FlowableBase): +class FlatConcatNoBackpressureFlowable(FlowableBase): def __init__(self, source: FlowableBase, selector: Callable[[Any], FlowableBase]): super().__init__() @@ -21,8 +21,8 @@ def observable_selector(elem: Any): subscription = flowable.unsafe_subscribe(subscriber=subscriber) return subscription.observable - observable = FlatMapNoBackpressureObservable(source=subscription.observable, selector=observable_selector, - scheduler=subscriber.scheduler, subscribe_scheduler=subscriber.subscribe_scheduler) + observable = FlatConcatNoBackpressureObservable(source=subscription.observable, selector=observable_selector, + scheduler=subscriber.scheduler, subscribe_scheduler=subscriber.subscribe_scheduler) # base becomes undefined after flat mapping base = None diff --git a/rxbp/multicast/multicast.py b/rxbp/multicast/multicast.py index afe4499..53af0a8 100644 --- a/rxbp/multicast/multicast.py +++ b/rxbp/multicast/multicast.py @@ -107,8 +107,11 @@ def get_source(_, info: MultiCastInfo) -> rx.typing.Observable: def map(self, func: Callable[[MultiCastValue], MultiCastValue]): return MultiCast(MapMultiCast(source=self, func=func)) - def reduce(self): - return MultiCast(ReduceMultiCast(source=self)) + def reduce( + self, + maintain_order: bool = None, + ): + return MultiCast(ReduceMultiCast(source=self, maintain_order=maintain_order)) def share(self): subject = Subject() diff --git a/rxbp/multicast/multicastopmixin.py b/rxbp/multicast/multicastopmixin.py index 4c71914..b035c5a 100644 --- a/rxbp/multicast/multicastopmixin.py +++ b/rxbp/multicast/multicastopmixin.py @@ -61,7 +61,10 @@ def map(self, func: Callable[[MultiCastValue], MultiCastValue]): ... @abstractmethod - def reduce(self): + def reduce( + self, + maintain_order: bool = None, + ): ... @abstractmethod diff --git a/rxbp/multicast/multicasts/defermulticast.py b/rxbp/multicast/multicasts/defermulticast.py index f8f1f3c..1f6b96c 100644 --- a/rxbp/multicast/multicasts/defermulticast.py +++ b/rxbp/multicast/multicasts/defermulticast.py @@ -54,7 +54,14 @@ def unsafe_subscribe(self, subscriber: Subscriber) -> Subscription: start = StartWithInitialValueFlowable() shared = RefCountFlowable(source=start) + # mutual curr_index variable is used to index the input Flowables + # as the sequal to the deferred Flowables + # {0: deferred_flowable_1, + # 1: input_flowable_1, + # 2: input_flowable_2,} curr_index = 0 + + # map initial value(s) to a dictionary if isinstance(initial, list): curr_index = len(initial) initial_dict = {idx: val for idx, val in enumerate(initial)} @@ -102,51 +109,57 @@ def map_to_flowable_dict(base: MultiCastBase): output = self.func(init) def map_func(base: MultiCastValue): - - def select_first_index(state): - class SingleFlowableDict(SingleFlowableMixin, FlowableDict): - def get_single_flowable(self) -> Flowable: - return state[0] - - return SingleFlowableDict(state) - - def select_none(state): - return FlowableDict(state) - match_error_message = f'defer function returned "{base}" which does not match initial "{initial}"' if isinstance(base, Flowable) and len(initial_dict) == 1: + + # if initial would be a dicitonary, then the input to the defer operator + # must be a dicitonary and not a Flowable. assert not isinstance(initial, dict), match_error_message - if isinstance(initial, list): - assert len(base) == 1, match_error_message + # create standard form + flowable_state = {0: base} - deferred_values = {list(initial_dict.keys())[0]: base} # deferred values refer to the values returned by the defer function - select_flowable_dict = select_none + # function that will map the resulting state back to a Flowable + from_state = lambda state: state[0] elif isinstance(base, list): assert isinstance(initial, list) and len(initial) == len(base) - deferred_values = {idx: val for idx, val in enumerate(base)} - select_flowable_dict = select_first_index + # create standard form + flowable_state = {idx: val for idx, val in enumerate(base)} + + # def select_first_index(state): + # class SingleFlowableDict(SingleFlowableMixin, FlowableDict): + # def get_single_flowable(self) -> Flowable: + # return state[0] + # + # return SingleFlowableDict(state) + + # function that will map the resulting state to a list + from_state = lambda state: list(state.values()) elif isinstance(base, dict) or isinstance(base, FlowableStateMixin): if isinstance(base, FlowableStateMixin): - deferred_values = base.get_flowable_state() + flowable_state = base.get_flowable_state() else: - deferred_values = base + flowable_state = base - match_error_message = f'defer function returned "{deferred_values.keys()}", ' \ + match_error_message = f'defer function returned "{flowable_state.keys()}", ' \ f'which does not match initial "{initial.keys()}"' - assert isinstance(initial, dict) and set(initial.keys()) <= set(deferred_values.keys()), match_error_message + assert isinstance(initial, dict) and set(initial.keys()) <= set(flowable_state.keys()), match_error_message - select_flowable_dict = select_none + # function that will map the resulting state to a FlowableDict + def select_none(state): + return FlowableDict(state) + from_state = select_none else: raise Exception(f'illegal case "{base}"') - shared_deferred_values = {key: RefCountFlowable(value) for key, value in deferred_values.items()} + # share flowables + shared_flowable_state = {key: RefCountFlowable(value) for key, value in flowable_state.items()} lock = threading.RLock() is_first = [True] @@ -164,14 +177,14 @@ def unsafe_subscribe(self, subscriber: Subscriber) -> Subscription: is_first[0] = False close_loop = True - # close defer loop only if first element has received + # close defer loop only if first subscribed if close_loop: def gen_index_for_each_deferred_state(): """ for each value returned by the defer function """ for key in initial_dict.keys(): def for_func(key=key): - return Flowable(MapFlowable(shared_deferred_values[key], selector=lambda v: (key, v))) + return Flowable(MapFlowable(shared_flowable_state[key], selector=lambda v: (key, v))) yield for_func() indexed_deferred_values = gen_index_for_each_deferred_state() @@ -231,9 +244,9 @@ def observe(self, observer_info: ObserverInfo): return Subscription(info=SubscriptionInfo(None), observable=defer_observable) # create a flowable for all deferred values - new_states = {k: Flowable(DeferFlowable(v, k)) for k, v in shared_deferred_values.items()} + new_states = {k: Flowable(DeferFlowable(v, k)) for k, v in shared_flowable_state.items()} - return select_flowable_dict(new_states) + return from_state(new_states) return output.get_source(info=info).pipe( rxop.map(map_func), diff --git a/rxbp/multicast/multicasts/reducemulticast.py b/rxbp/multicast/multicasts/reducemulticast.py index 4d35179..797ae16 100644 --- a/rxbp/multicast/multicasts/reducemulticast.py +++ b/rxbp/multicast/multicasts/reducemulticast.py @@ -7,7 +7,7 @@ from rxbp.flowable import Flowable from rxbp.flowables.refcountflowable import RefCountFlowable from rxbp.multicast.flowables.connectableflowable import ConnectableFlowable -from rxbp.multicast.flowables.flatmapnobackpressureflowable import FlatMapNoBackpressureFlowable +from rxbp.multicast.flowables.flatconcatnobackpressureflowable import FlatConcatNoBackpressureFlowable from rxbp.multicast.flowables.flatmergenobackpressureflowable import FlatMergeNoBackpressureFlowable from rxbp.multicast.flowablestatemixin import FlowableStateMixin from rxbp.multicast.multicastInfo import MultiCastInfo @@ -19,8 +19,13 @@ class ReduceMultiCast(MultiCastBase): - def __init__(self, source: MultiCastBase): + def __init__( + self, + source: MultiCastBase, + maintain_order: bool = None, + ): self.source = source + self.maintain_order = maintain_order def get_source(self, info: MultiCastInfo): source = self.source.get_source(info=info).pipe( @@ -30,7 +35,7 @@ def get_source(self, info: MultiCastInfo): or isinstance(v, list)), ) - def func(first: Union[FlowableStateMixin, dict], lifted_obs: Observable): + def func(lifted_obs: Observable, first: Union[FlowableStateMixin, dict]): if isinstance(first, dict): to_state = lambda s: s from_state = lambda s: s @@ -48,70 +53,73 @@ def func(first: Union[FlowableStateMixin, dict], lifted_obs: Observable): first_state = to_state(first) - class ReduceObservable(Observable): - def __init__(self, first: FlowableStateMixin): - super().__init__() - - self.first = first - - def _subscribe_core( - self, - observer: rx.typing.Observer, - scheduler: Optional[rx.typing.Scheduler] = None - ) -> rx.typing.Disposable: - # # share only if more than one elem in first state - # shared_source = lifted_obs.pipe( - # rxop.share(), - # ) - - # lifted_flowable = rxbp.from_rx(lifted_obs) - - conn_observer = ConnectableObserver( - underlying=None, - scheduler=info.multicast_scheduler, - subscribe_scheduler=info.multicast_scheduler, - ) - - # subscribe to source rx.Observables immediately - source_flowable = rxbp.from_rx(lifted_obs) - subscriber = Subscriber( - scheduler=info.multicast_scheduler, - subscribe_scheduler=info.multicast_scheduler, - ) - subscription = source_flowable.unsafe_subscribe(subscriber=subscriber) - subscription.observable.observe(ObserverInfo(conn_observer)) - - conn_flowable = ConnectableFlowable(conn_observer=conn_observer) - - if 1 < len(first_state): - shared_flowable = RefCountFlowable(conn_flowable) - else: - shared_flowable = conn_flowable - - def gen_flowables(): - for key in first_state.keys(): - def for_func(key=key, shared_flowable=shared_flowable): - def selector(v: FlowableStateMixin): - flowable = to_state(v)[key] - return flowable + # class ReduceObservable(Observable): + # def __init__( + # self, + # first: FlowableStateMixin, + # maintain_order: bool = None, + # ): + # super().__init__() + # + # self.first = first + # self.maintain_order = maintain_order + # + # def _subscribe_core( + # self, + # observer: rx.typing.Observer, + # scheduler: Optional[rx.typing.Scheduler] = None + # ) -> rx.typing.Disposable: + + conn_observer = ConnectableObserver( + underlying=None, + scheduler=info.multicast_scheduler, + subscribe_scheduler=info.multicast_scheduler, + ) + + # subscribe to source rx.Observables immediately + source_flowable = rxbp.from_rx(lifted_obs) + subscriber = Subscriber( + scheduler=info.multicast_scheduler, + subscribe_scheduler=info.multicast_scheduler, + ) + subscription = source_flowable.unsafe_subscribe(subscriber=subscriber) + subscription.observable.observe(ObserverInfo(conn_observer)) + + conn_flowable = ConnectableFlowable(conn_observer=conn_observer) + + if 1 < len(first_state): + shared_flowable = RefCountFlowable(conn_flowable) + else: + shared_flowable = conn_flowable - flattened_flowable = FlatMergeNoBackpressureFlowable(shared_flowable, selector) - # flattened_flowable = FlatMapNoBackpressureFlowable(shared_flowable, selector) - result = RefCountFlowable(flattened_flowable) + def gen_flowables(): + for key in first_state.keys(): + def for_func(key=key, shared_flowable=shared_flowable): + def selector(v: FlowableStateMixin): + flowable = to_state(v)[key] + return flowable - return key, Flowable(result) + if self.maintain_order: + flattened_flowable = FlatConcatNoBackpressureFlowable(shared_flowable, selector) + else: + flattened_flowable = FlatMergeNoBackpressureFlowable(shared_flowable, selector) - yield for_func() + result = RefCountFlowable(flattened_flowable) + flowable = Flowable(result) + return key, flowable - result_flowables = dict(gen_flowables()) - result = from_state(result_flowables) + yield for_func() - # def action(_, __): - observer.on_next(result) - observer.on_completed() + result_flowables = dict(gen_flowables()) + result = from_state(result_flowables) + return result - # info.multicast_scheduler.schedule(action) + # observer.on_next(result) + # observer.on_completed() - return ReduceObservable(first=first) + # return ReduceObservable( + # first=first, + # maintain_order=self.maintain_order, + # ) - return LiftObservable(source=source, func=func, subscribe_scheduler=info.multicast_scheduler) + return LiftObservable(source=source, func=func, scheduler=info.multicast_scheduler) diff --git a/rxbp/multicast/multicasts/zipmulticast.py b/rxbp/multicast/multicasts/zipmulticast.py index a564f9b..0ba2f23 100644 --- a/rxbp/multicast/multicasts/zipmulticast.py +++ b/rxbp/multicast/multicasts/zipmulticast.py @@ -5,7 +5,7 @@ from rxbp.flowable import Flowable from rxbp.flowables.refcountflowable import RefCountFlowable from rxbp.multicast.flowables.connectableflowable import ConnectableFlowable -from rxbp.multicast.flowables.flatmapnobackpressureflowable import FlatMapNoBackpressureFlowable +from rxbp.multicast.flowables.flatconcatnobackpressureflowable import FlatConcatNoBackpressureFlowable from rxbp.multicast.multicastInfo import MultiCastInfo from rxbp.multicast.multicastbase import MultiCastBase from rxbp.multicast.typing import MultiCastValue @@ -57,7 +57,7 @@ def for_func(source=source): conn_flowable = ConnectableFlowable(conn_observer=conn_observer) - flattened_flowable = FlatMapNoBackpressureFlowable(conn_flowable, to_flowable) + flattened_flowable = FlatConcatNoBackpressureFlowable(conn_flowable, to_flowable) ref_count_flowable = RefCountFlowable(flattened_flowable) diff --git a/rxbp/multicast/observables/flatmapnobackpressureobservable.py b/rxbp/multicast/observables/flatconcatnobackpressureobservable.py similarity index 82% rename from rxbp/multicast/observables/flatmapnobackpressureobservable.py rename to rxbp/multicast/observables/flatconcatnobackpressureobservable.py index 46b41a8..3ad1d6c 100644 --- a/rxbp/multicast/observables/flatmapnobackpressureobservable.py +++ b/rxbp/multicast/observables/flatconcatnobackpressureobservable.py @@ -1,12 +1,12 @@ from typing import Callable, Any -from rxbp.multicast.observer.flatmapnobackpressureobserver import FlatMapNoBackpressureObserver +from rxbp.multicast.observer.flatconcatnobackpressureobserver import FlatConcatNoBackpressureObserver from rxbp.observable import Observable from rxbp.observerinfo import ObserverInfo from rxbp.scheduler import Scheduler -class FlatMapNoBackpressureObservable(Observable): +class FlatConcatNoBackpressureObservable(Observable): def __init__( self, source: Observable, @@ -26,7 +26,7 @@ def observe(self, observer_info: ObserverInfo): scheduler = self.scheduler subscribe_scheduler = self.subscribe_scheduler - concat_observer = FlatMapNoBackpressureObserver( + concat_observer = FlatConcatNoBackpressureObserver( observer=observer, selector=self.selector, scheduler=scheduler, diff --git a/rxbp/multicast/observer/flatmapnobackpressureobserver.py b/rxbp/multicast/observer/flatconcatnobackpressureobserver.py similarity index 97% rename from rxbp/multicast/observer/flatmapnobackpressureobserver.py rename to rxbp/multicast/observer/flatconcatnobackpressureobserver.py index c9fc05f..253887e 100644 --- a/rxbp/multicast/observer/flatmapnobackpressureobserver.py +++ b/rxbp/multicast/observer/flatconcatnobackpressureobserver.py @@ -12,7 +12,7 @@ from rxbp.typing import ElementType -class FlatMapNoBackpressureObserver(Observer): +class FlatConcatNoBackpressureObserver(Observer): def __init__( self, observer: Observer, @@ -112,4 +112,4 @@ def on_completed(self): self.is_completed = True if len(conn_observers) == 0: - self.observer.on_completed() \ No newline at end of file + self.observer.on_completed() diff --git a/rxbp/multicast/op.py b/rxbp/multicast/op.py index 4fc091c..4cc43a9 100644 --- a/rxbp/multicast/op.py +++ b/rxbp/multicast/op.py @@ -178,12 +178,14 @@ def op_func(source: MultiCastOpMixin): return MultiCastOperator(op_func) -def reduce(): +def reduce( + maintain_order: bool = None, +): """ Lift the current `MultiCast[ReducableMixin[T]]` to a `MultiCast[ReducableMixin[T]]`. """ def op_func(source: MultiCastOpMixin): - return source.reduce() + return source.reduce(maintain_order=maintain_order) # return MultiCast(ReduceMultiCast(source=source)) return MultiCastOperator(op_func) diff --git a/rxbp/multicast/rxextensions/liftobservable.py b/rxbp/multicast/rxextensions/liftobservable.py index ba0eb10..42de4fe 100644 --- a/rxbp/multicast/rxextensions/liftobservable.py +++ b/rxbp/multicast/rxextensions/liftobservable.py @@ -1,51 +1,59 @@ +import threading +import types from typing import Any, Callable, Optional import rx from rx import Observable from rx.core import Observer from rx.core.typing import Scheduler -from rx.disposable import Disposable +from rx.disposable import Disposable, CompositeDisposable class LiftObservable(Observable): def __init__( self, source: Observable, - func: Callable[[Any, Observable], Any], - subscribe_scheduler: Scheduler, + func: Callable[[Observable, Any], Any], + scheduler: Scheduler, ): super().__init__() self.source = source self.func = func - self.subscribe_scheduler = subscribe_scheduler + self.scheduler = scheduler - class InnerObservable(Observable): - def __init__(self, first, subscribe_scheduler): + self.disposable = CompositeDisposable() + + class LiftedSingleObservable(Observable): + def __init__( + self, + first, + subscribe_scheduler, + disposable: CompositeDisposable, + ): super().__init__() self.first = first self.subscribe_scheduler = subscribe_scheduler + self.disposable = disposable self.observer = None def _subscribe_core(self, - observer: rx.typing.Observer, - scheduler: Optional[rx.typing.Scheduler] = None - ) -> rx.typing.Disposable: + observer: rx.typing.Observer, + scheduler: Optional[rx.typing.Scheduler] = None + ) -> rx.typing.Disposable: self.observer = observer - # def action(_, __): - # self.observer.on_next(self.first) - # - # return self.subscribe_scheduler.schedule(action) + return self.disposable - class LiftObserver(Observer): # replace by TestObserver? + class LiftObserver(Observer): # replace by TestObserver? def __init__( self, func: Callable[[Any, Observable], Any], observer: Observer, subscribe_scheduler: Scheduler, + disposable: CompositeDisposable, ): super().__init__() @@ -54,40 +62,83 @@ def __init__( self.subscribe_scheduler = subscribe_scheduler self.is_first = True + self.elements = [] + + self.observable: LiftObservable.LiftedSingleObservable = None - self.observable: LiftObservable.InnerObservable = None + self.disposable = disposable + + self.lock = threading.RLock() def on_next(self, val): if self.is_first: self.is_first = False - self.observable = LiftObservable.InnerObservable(val, self.subscribe_scheduler) - observable = self.func(val, self.observable) - disposable = observable.subscribe(self.observer) - # _ = self.observer.on_next(outer_val) + self.observable = LiftObservable.LiftedSingleObservable( + val, + self.subscribe_scheduler, + disposable=self.disposable, + ) + + value = self.func(self.observable, val) + + self.observer.on_next(value) + self.observer.on_completed() + + # observable didn't get subscribed + if self.observable.observer is None: + def on_next_if_not_subscribed(self, val): + pass + + self.on_next = types.MethodType(on_next_if_not_subscribed, self) - # else: - def action(_, __): + # if there is no inner subscriber, dispose the source + self.disposable.disposable() + return - if self.observable.observer is not None: - self.observable.observer.on_next(val) - else: - # observable didn't get subscribed - pass + self.elements.append(val) + + def action(_, __): + while True: - self.subscribe_scheduler.schedule(action) + has_elem = True + with self.lock: + if self.elements: + val = self.elements.pop(0) + else: + has_elem = False + + if has_elem: + self.observable.observer.on_next(val) + + else: + break + + schedule_disposable = self.subscribe_scheduler.schedule(action) + self.disposable.add(schedule_disposable) + + else: + has_elem = True + with self.lock: + if self.elements: + self.elements.append(val) + else: + has_elem = False + + if not has_elem: + def on_next_after_all_sent(self, val): + # if self.observable.observer is not None: + self.observable.observer.on_next(val) + + self.on_next = types.MethodType(on_next_after_all_sent, self) + on_next_after_all_sent(self, val) def on_error(self, exc: Exception): if self.is_first: self.observer.on_error(exc) else: def action(_, __): - - if self.observable.observer is not None: - self.observable.observer.on_error(exc) - else: - # observable didn't get subscribed - pass + self.observable.observer.on_error(exc) self.subscribe_scheduler.schedule(action) @@ -96,20 +147,34 @@ def on_completed(self): self.observer.on_completed() else: def action(_, __): - - if self.observable.observer is not None: - self.observable.observer.on_completed() - else: - # observable didn't get subscribed - pass + self.observable.observer.on_completed() self.subscribe_scheduler.schedule(action) - def _subscribe_core(self, - observer: rx.typing.Observer, - scheduler: Optional[rx.typing.Scheduler] = None - ) -> rx.typing.Disposable: + def _subscribe_core( + self, + observer: rx.typing.Observer, + scheduler: Optional[rx.typing.Scheduler] = None + ) -> rx.typing.Disposable: + + observer = self.LiftObserver( + func=self.func, + observer=observer, + subscribe_scheduler=self.scheduler, + disposable=self.disposable, + ) + + source_disposable = self.source.subscribe(observer=observer, scheduler=scheduler) + self.disposable.add(source_disposable) + + def dispose_func(): + + if observer.observable is None: + self.disposable.dispose() + + # if inner observable is subscribed, then it is the job of the inner subscriber + # to dispose the source + else: + pass - observer = self.LiftObserver(func=self.func, observer=observer, subscribe_scheduler=self.subscribe_scheduler) - disposable = self.source.subscribe(observer=observer, scheduler=scheduler) - return Disposable() + return Disposable(dispose_func) diff --git a/rxbp/multicast/testing/testrxobservable.py b/rxbp/multicast/testing/testrxobservable.py index 2c47624..44294f8 100644 --- a/rxbp/multicast/testing/testrxobservable.py +++ b/rxbp/multicast/testing/testrxobservable.py @@ -11,11 +11,26 @@ def __init__(self): self.observer = None + self.is_disposed = False + def _subscribe_core(self, observer: typing.Observer, scheduler: Optional[typing.Scheduler] = None ) -> typing.Disposable: self.observer = observer - return Disposable() + + def dispose_func(): + self.is_disposed = True + + return Disposable(dispose_func) + + def on_next(self, val): + self.observer.on_next(val) + + def on_completed(self): + self.observer.on_completed() + + def on_error(self, exc): + self.observer.on_error(exc) diff --git a/rxbp/observables/debugobservable.py b/rxbp/observables/debugobservable.py index d345f25..ea9724d 100644 --- a/rxbp/observables/debugobservable.py +++ b/rxbp/observables/debugobservable.py @@ -17,7 +17,7 @@ def __init__(self, source: Observable, name: str = None, on_next=None, on_comple self.on_next_func = on_next or (lambda v: print('{}.on_next {}'.format(name, v))) self.on_error_func = on_error or (lambda exc: print('{}.on_error {}'.format(name, exc))) self.on_completed_func = on_completed or (lambda: print('{}.on_completed'.format(name))) - self.on_subscribe_func = on_subscribe or (lambda v: print('{}.on_observe {}'.format(name, v))) + self.on_subscribe_func = on_subscribe or (lambda v: print('{}.on_observe {}'.format(name, v.observer))) self.on_sync_ack = on_ack or (lambda v: print('{}.on_sync_ack {}'.format(name, v))) self.on_async_ack = on_ack or (lambda v: print('{}.on_async_ack {}'.format(name, v))) self.on_raw_ack = on_raw_ack or (lambda v: print('{}.on_raw_ack {}'.format(name, v))) diff --git a/rxbp/observablesubjects/cacheservefirstosubject.py b/rxbp/observablesubjects/cacheservefirstosubject.py index ecafb99..5776b39 100644 --- a/rxbp/observablesubjects/cacheservefirstosubject.py +++ b/rxbp/observablesubjects/cacheservefirstosubject.py @@ -106,6 +106,20 @@ def on_error(self, exception: Exception) -> List: return inactive_subscriptions def should_dequeue(self, index: int): + + """ + Traceback (most recent call last): + File "/home/mike/workspace/python/rxbackpressure/rxbp/observers/backpressurebufferedobserver.py", line 93, in signal_next + ack = self.underlying.on_next(next) + File "/home/mike/workspace/python/rxbackpressure/rxbp/observablesubjects/cacheservefirstosubject.py", line 353, in on_next + dequeue_buffer = self.shared_state.should_dequeue(current_index) + File "/home/mike/workspace/python/rxbackpressure/rxbp/observablesubjects/cacheservefirstosubject.py", line 109, in should_dequeue + result = index <= min(self.current_index.values()) + ValueError: min() arg is an empty sequence + """ + if not self.current_index: + return False + result = index <= min(self.current_index.values()) return result diff --git a/tests/test_multicast/test_defermulticast.py b/tests/test_multicast/test_defermulticast.py new file mode 100644 index 0000000..74786f8 --- /dev/null +++ b/tests/test_multicast/test_defermulticast.py @@ -0,0 +1,134 @@ +import unittest + +import rxbp +from rxbp.flowable import Flowable +from rxbp.multicast.multicast import MultiCast +from rxbp.multicast.multicastInfo import MultiCastInfo +from rxbp.multicast.multicasts.defermulticast import DeferMultiCast +from rxbp.multicast.multicasts.reducemulticast import ReduceMultiCast +from rxbp.multicast.testing.testmulticast import TestMultiCast +from rxbp.multicast.testing.testrxobserver import TestRxObserver +from rxbp.observerinfo import ObserverInfo +from rxbp.subscriber import Subscriber +from rxbp.testing.testflowable import TestFlowable +from rxbp.testing.testobserver import TestObserver +from rxbp.testing.testscheduler import TestScheduler + + +class TestReduceMultiCast(unittest.TestCase): + def setUp(self) -> None: + self.multicast_scheduler = TestScheduler() + self.source_scheduler = TestScheduler() + self.info = MultiCastInfo( + multicast_scheduler=self.multicast_scheduler, + source_scheduler=self.source_scheduler, + ) + self.source_multicast = TestMultiCast() + self.rx_sink = TestRxObserver() + self.source1 = TestFlowable() + self.source2 = TestFlowable() + + def test_send_single_flowable(self): + reduce_multicast = DeferMultiCast( + source=self.source_multicast, + func=lambda m: MultiCast(m).pipe( + rxbp.multicast.op.map(lambda t: t[0]), + ), + initial=[0], + ) + reduce_multicast.get_source(self.info).subscribe(self.rx_sink) + + self.source_multicast.on_next(Flowable(self.source1)) + + self.assertEqual(1, len(self.rx_sink.received)) + + def test_subscribe_single_flowable(self): + reduce_multicast = DeferMultiCast( + source=self.source_multicast, + func=lambda m: MultiCast(m).pipe( + rxbp.multicast.op.map(lambda t: t[0]), + ), + initial=[10], + ) + reduce_multicast.get_source(self.info).subscribe(self.rx_sink) + self.source_multicast.on_next(Flowable(self.source1)) + + sink = TestObserver(immediate_coninue=0) + subscription = self.rx_sink.received[0].unsafe_subscribe(Subscriber( + scheduler=self.source_scheduler, subscribe_scheduler=self.source_scheduler, + )) + subscription.observable.observe(ObserverInfo(sink)) + + self.multicast_scheduler.advance_by(1) + self.source_scheduler.advance_by(1) + + # self.source1.on_next_single(0) + + print(sink.received) + + self.assertEqual([10], sink.received) + + # def test_send_dictionary(self): + # reduce_multicast = ReduceMultiCast(source=self.source_multicast) + # reduce_multicast.get_source(self.info).subscribe(self.rx_sink) + # + # self.source_multicast.on_next({'f1': Flowable(self.source1)}) + # + # self.assertEqual(1, len(self.rx_sink.received)) + # + # def test_reduce_single_flowables_without_maintaining_order(self): + # reduce_multicast = ReduceMultiCast(source=self.source_multicast) + # reduce_multicast.get_source(self.info).subscribe(self.rx_sink) + # self.source_multicast.on_next(Flowable(self.source1)) + # self.source_multicast.on_next(Flowable(self.source2)) + # self.source_multicast.on_completed() + # + # sink = TestObserver() + # subscription = self.rx_sink.received[0].unsafe_subscribe(Subscriber( + # scheduler=self.source_scheduler, + # subscribe_scheduler=self.source_scheduler + # )) + # subscription.observable.observe(ObserverInfo(observer=sink)) + # + # # sending the lifted flowable is scheduled on the multicast_scheduler + # self.multicast_scheduler.advance_by(1) + # + # self.source1.on_next_single(1) + # self.source2.on_next_single('a') + # self.source1.on_next_single(2) + # self.source1.on_completed() + # self.source2.on_next_single('b') + # self.source2.on_completed() + # + # self.assertEqual([1, 'a', 2, 'b'], sink.received) + # self.assertTrue(sink.is_completed) + # + # def test_reduce_single_flowables_with_maintaining_order(self): + # reduce_multicast = ReduceMultiCast( + # source=self.source_multicast, + # maintain_order=True, + # ) + # reduce_multicast.get_source(self.info).subscribe(self.rx_sink) + # self.source_multicast.on_next(Flowable(self.source1)) + # self.source_multicast.on_next(Flowable(self.source2)) + # self.source_multicast.on_completed() + # + # sink = TestObserver() + # subscription = self.rx_sink.received[0].unsafe_subscribe(Subscriber( + # scheduler=self.source_scheduler, + # subscribe_scheduler=self.source_scheduler + # )) + # subscription.observable.observe(ObserverInfo(observer=sink)) + # + # # sending the lifted flowable is scheduled on the multicast_scheduler + # self.multicast_scheduler.advance_by(1) + # + # self.source1.on_next_single(1) + # self.source2.on_next_single('a') + # self.source1.on_next_single(2) + # self.source1.on_completed() + # self.source2.on_next_single('b') + # self.source2.on_completed() + # + # self.assertEqual([1, 2, 'a', 'b'], sink.received) + # self.assertTrue(sink.is_completed) diff --git a/tests/test_multicast/test_observer/__init__.py b/tests/test_multicast/test_observer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_multicast/test_observer/test_flatconcatnobackpressureobserver.py b/tests/test_multicast/test_observer/test_flatconcatnobackpressureobserver.py new file mode 100644 index 0000000..7244162 --- /dev/null +++ b/tests/test_multicast/test_observer/test_flatconcatnobackpressureobserver.py @@ -0,0 +1,148 @@ +import unittest + +from rxbp.ack.ackimpl import Continue +from rxbp.multicast.observer.flatconcatnobackpressureobserver import FlatConcatNoBackpressureObserver +from rxbp.observerinfo import ObserverInfo +from rxbp.testing.testobservable import TestObservable +from rxbp.testing.testobserver import TestObserver +from rxbp.testing.testscheduler import TestScheduler + + +class TestFlatConcatNoBackpressureObserver(unittest.TestCase): + def setUp(self) -> None: + self.scheduler = TestScheduler() + self.source = TestObservable() + + self.source1 = TestObservable() + self.source2 = TestObservable() + self.source3 = TestObservable() + + def test_initialize(self): + sink = TestObserver() + observer = FlatConcatNoBackpressureObserver( + observer=sink, + selector=lambda v: v, + scheduler=self.scheduler, + subscribe_scheduler=self.scheduler, + is_volatile=False, + ) + self.source.observe(ObserverInfo(observer=observer)) + + def test_on_next_does_not_backpressure(self): + sink = TestObserver() + observer = FlatConcatNoBackpressureObserver( + observer=sink, + selector=lambda v: v, + scheduler=self.scheduler, + subscribe_scheduler=self.scheduler, + is_volatile=False, + ) + self.source.observe(ObserverInfo(observer=observer)) + + ack1 = self.source.on_next_single(self.source1) + ack2 = self.source.on_next_single(self.source2) + + self.assertIsInstance(ack1, Continue) + self.assertIsInstance(ack2, Continue) + + def test_inner_on_next(self): + sink = TestObserver() + observer = FlatConcatNoBackpressureObserver( + observer=sink, + selector=lambda v: v, + scheduler=self.scheduler, + subscribe_scheduler=self.scheduler, + is_volatile=False, + ) + self.source.observe(ObserverInfo(observer=observer)) + self.source.on_next_single(self.source1) + + self.source1.on_next_single(1) + + self.assertEqual([1], sink.received) + + def test_on_next_on_second_source(self): + sink = TestObserver() + observer = FlatConcatNoBackpressureObserver( + observer=sink, + selector=lambda v: v, + scheduler=self.scheduler, + subscribe_scheduler=self.scheduler, + is_volatile=False, + ) + self.source.observe(ObserverInfo(observer=observer)) + self.source.on_next_single(self.source1) + self.source1.on_next_single(1) + self.source.on_next_single(self.source2) + + self.source2.on_next_single('a') + + self.assertEqual([1], sink.received) + + def test_complete_first_source(self): + sink = TestObserver() + observer = FlatConcatNoBackpressureObserver( + observer=sink, + selector=lambda v: v, + scheduler=self.scheduler, + subscribe_scheduler=self.scheduler, + is_volatile=False, + ) + self.source.observe(ObserverInfo(observer=observer)) + self.source.on_next_single(self.source1) + self.source1.on_next_single(1) + self.source.on_next_single(self.source2) + self.source.on_completed() + self.source2.on_next_single('a') + + self.source1.on_completed() + + self.assertEqual([1, 'a'], sink.received) + self.assertFalse(sink.is_completed) + + def test_complete_first_source2(self): + sink = TestObserver() + observer = FlatConcatNoBackpressureObserver( + observer=sink, + selector=lambda v: v, + scheduler=self.scheduler, + subscribe_scheduler=self.scheduler, + is_volatile=False, + ) + self.source.observe(ObserverInfo(observer=observer)) + self.source.on_next_single(self.source1) + self.source1.on_next_single(1) + self.source.on_next_single(self.source2) + self.source.on_completed() + self.source2.on_next_single('a') + self.source1.on_next_single(2) + self.source1.on_completed() + self.source2.on_next_single('b') + + self.source2.on_completed() + + self.assertEqual([1, 2, 'a', 'b'], sink.received) + self.assertTrue(sink.is_completed) + + def test_three_sources(self): + sink = TestObserver() + observer = FlatConcatNoBackpressureObserver( + observer=sink, + selector=lambda v: v, + scheduler=self.scheduler, + subscribe_scheduler=self.scheduler, + is_volatile=False, + ) + self.source.observe(ObserverInfo(observer=observer)) + self.source.on_next_single(self.source1) + self.source.on_next_single(self.source2) + self.source.on_next_single(self.source3) + + self.source1.on_next_single(1) + self.source2.on_next_single(2) + self.source3.on_next_single(3) + self.source1.on_completed() + self.source2.on_completed() + self.source3.on_completed() + + self.assertEqual([1, 2, 3], sink.received) \ No newline at end of file diff --git a/tests/test_multicast/test_observer/test_flatmergenobackpressureobserver.py b/tests/test_multicast/test_observer/test_flatmergenobackpressureobserver.py new file mode 100644 index 0000000..6e453c5 --- /dev/null +++ b/tests/test_multicast/test_observer/test_flatmergenobackpressureobserver.py @@ -0,0 +1,146 @@ +import unittest + +from rxbp.ack.ackimpl import Continue +from rxbp.multicast.observer.flatconcatnobackpressureobserver import FlatConcatNoBackpressureObserver +from rxbp.multicast.observer.flatmergenobackpressureobserver import FlatMergeNoBackpressureObserver +from rxbp.observerinfo import ObserverInfo +from rxbp.testing.testobservable import TestObservable +from rxbp.testing.testobserver import TestObserver +from rxbp.testing.testscheduler import TestScheduler + + +class TestFlatMergeNoBackpressureObserver(unittest.TestCase): + def setUp(self) -> None: + self.scheduler = TestScheduler() + self.source = TestObservable() + + self.source1 = TestObservable() + self.source2 = TestObservable() + self.source3 = TestObservable() + + def test_initialize(self): + sink = TestObserver() + observer = FlatMergeNoBackpressureObserver( + observer=sink, + selector=lambda v: v, + scheduler=self.scheduler, + subscribe_scheduler=self.scheduler, + is_volatile=False, + ) + self.source.observe(ObserverInfo(observer=observer)) + + def test_on_next_does_not_backpressure(self): + sink = TestObserver() + observer = FlatMergeNoBackpressureObserver( + observer=sink, + selector=lambda v: v, + scheduler=self.scheduler, + subscribe_scheduler=self.scheduler, + is_volatile=False, + ) + self.source.observe(ObserverInfo(observer=observer)) + + ack1 = self.source.on_next_single(self.source1) + ack2 = self.source.on_next_single(self.source2) + + self.assertIsInstance(ack1, Continue) + self.assertIsInstance(ack2, Continue) + + def test_inner_on_next(self): + sink = TestObserver() + observer = FlatMergeNoBackpressureObserver( + observer=sink, + selector=lambda v: v, + scheduler=self.scheduler, + subscribe_scheduler=self.scheduler, + is_volatile=False, + ) + self.source.observe(ObserverInfo(observer=observer)) + self.source.on_next_single(self.source1) + + self.source1.on_next_single(1) + + self.assertEqual([1], sink.received) + + def test_on_next_on_second_source(self): + sink = TestObserver() + observer = FlatMergeNoBackpressureObserver( + observer=sink, + selector=lambda v: v, + scheduler=self.scheduler, + subscribe_scheduler=self.scheduler, + is_volatile=False, + ) + self.source.observe(ObserverInfo(observer=observer)) + self.source.on_next_single(self.source1) + self.source1.on_next_single(1) + self.source.on_next_single(self.source2) + + self.source2.on_next_single('a') + + self.assertEqual([1, 'a'], sink.received) + + def test_complete_first_source(self): + sink = TestObserver() + observer = FlatMergeNoBackpressureObserver( + observer=sink, + selector=lambda v: v, + scheduler=self.scheduler, + subscribe_scheduler=self.scheduler, + is_volatile=False, + ) + self.source.observe(ObserverInfo(observer=observer)) + self.source.on_next_single(self.source1) + self.source1.on_next_single(1) + self.source.on_next_single(self.source2) + self.source.on_completed() + self.source2.on_next_single('a') + + self.source1.on_completed() + + self.assertEqual([1, 'a'], sink.received) + self.assertFalse(sink.is_completed) + + def test_complete_first_source2(self): + sink = TestObserver() + observer = FlatMergeNoBackpressureObserver( + observer=sink, + selector=lambda v: v, + scheduler=self.scheduler, + subscribe_scheduler=self.scheduler, + is_volatile=False, + ) + self.source.observe(ObserverInfo(observer=observer)) + self.source.on_next_single(self.source1) + self.source1.on_next_single(1) + self.source.on_next_single(self.source2) + self.source.on_completed() + self.source2.on_next_single('a') + self.source1.on_next_single(2) + self.source1.on_completed() + self.source2.on_next_single('b') + + self.source2.on_completed() + + self.assertEqual([1, 'a', 2, 'b'], sink.received) + self.assertTrue(sink.is_completed) + + def test_three_sources(self): + sink = TestObserver() + observer = FlatMergeNoBackpressureObserver( + observer=sink, + selector=lambda v: v, + scheduler=self.scheduler, + subscribe_scheduler=self.scheduler, + is_volatile=False, + ) + self.source.observe(ObserverInfo(observer=observer)) + self.source.on_next_single(self.source1) + self.source.on_next_single(self.source2) + self.source.on_next_single(self.source3) + + self.source1.on_next_single(1) + self.source2.on_next_single(2) + self.source3.on_next_single(3) + + self.assertEqual([1, 2, 3], sink.received) \ No newline at end of file diff --git a/tests/test_multicast/test_reducemulticast.py b/tests/test_multicast/test_reducemulticast.py index 3328800..78ef901 100644 --- a/tests/test_multicast/test_reducemulticast.py +++ b/tests/test_multicast/test_reducemulticast.py @@ -1,6 +1,5 @@ import unittest -from rx.testing import ReactiveTest from rxbp.flowable import Flowable from rxbp.multicast.multicastInfo import MultiCastInfo from rxbp.multicast.multicasts.reducemulticast import ReduceMultiCast @@ -21,44 +20,80 @@ def setUp(self) -> None: multicast_scheduler=self.multicast_scheduler, source_scheduler=self.source_scheduler, ) - # self.rx_sink = TestRxObserver() - # self.sink = TestObserver() + self.source_multicast = TestMultiCast() + self.rx_sink = TestRxObserver() + self.source1 = TestFlowable() + self.source2 = TestFlowable() - def test_1(self): - rx_sink = TestRxObserver() - source_multicast = TestMultiCast() - reduce_multicast = ReduceMultiCast(ReduceMultiCast(source=source_multicast)) - reduce_multicast.get_source(self.info).subscribe(rx_sink) - source_flowable = TestFlowable() + def test_send_single_flowable(self): + reduce_multicast = ReduceMultiCast(source=self.source_multicast) + reduce_multicast.get_source(self.info).subscribe(self.rx_sink) - source_multicast.on_next(Flowable(source_flowable)) + self.source_multicast.on_next(Flowable(self.source1)) - self.assertEqual(1, len(rx_sink.received)) + self.assertEqual(1, len(self.rx_sink.received)) + + def test_send_dictionary(self): + reduce_multicast = ReduceMultiCast(source=self.source_multicast) + reduce_multicast.get_source(self.info).subscribe(self.rx_sink) + + self.source_multicast.on_next({'f1': Flowable(self.source1)}) + + self.assertEqual(1, len(self.rx_sink.received)) + + def test_reduce_single_flowables_without_maintaining_order(self): + reduce_multicast = ReduceMultiCast(source=self.source_multicast) + reduce_multicast.get_source(self.info).subscribe(self.rx_sink) + self.source_multicast.on_next(Flowable(self.source1)) + self.source_multicast.on_next(Flowable(self.source2)) + self.source_multicast.on_completed() - def test_2(self): sink = TestObserver() - rx_sink = TestRxObserver() - source_multicast = TestMultiCast() - reduce_multicast = ReduceMultiCast(ReduceMultiCast(source=source_multicast)) - reduce_multicast.get_source(self.info).subscribe(rx_sink) - source_flowable1 = TestFlowable() - source_flowable2 = TestFlowable() - source_multicast.on_next(Flowable(source_flowable1)) - source_multicast.on_next(Flowable(source_flowable2)) - source_multicast.on_completed() - - subscription = rx_sink.received[0].unsafe_subscribe(Subscriber( + subscription = self.rx_sink.received[0].unsafe_subscribe(Subscriber( scheduler=self.source_scheduler, subscribe_scheduler=self.source_scheduler )) subscription.observable.observe(ObserverInfo(observer=sink)) + + # sending the lifted flowable is scheduled on the multicast_scheduler self.multicast_scheduler.advance_by(1) - source_flowable1.on_next_single(1) - source_flowable2.on_next_single('a') - source_flowable1.on_next_single(2) - source_flowable1.on_completed() - source_flowable2.on_next_single('b') - source_flowable2.on_completed() + + self.source1.on_next_single(1) + self.source2.on_next_single('a') + self.source1.on_next_single(2) + self.source1.on_completed() + self.source2.on_next_single('b') + self.source2.on_completed() self.assertEqual([1, 'a', 2, 'b'], sink.received) self.assertTrue(sink.is_completed) + + def test_reduce_single_flowables_with_maintaining_order(self): + reduce_multicast = ReduceMultiCast( + source=self.source_multicast, + maintain_order=True, + ) + reduce_multicast.get_source(self.info).subscribe(self.rx_sink) + self.source_multicast.on_next(Flowable(self.source1)) + self.source_multicast.on_next(Flowable(self.source2)) + self.source_multicast.on_completed() + + sink = TestObserver() + subscription = self.rx_sink.received[0].unsafe_subscribe(Subscriber( + scheduler=self.source_scheduler, + subscribe_scheduler=self.source_scheduler + )) + subscription.observable.observe(ObserverInfo(observer=sink)) + + # sending the lifted flowable is scheduled on the multicast_scheduler + self.multicast_scheduler.advance_by(1) + + self.source1.on_next_single(1) + self.source2.on_next_single('a') + self.source1.on_next_single(2) + self.source1.on_completed() + self.source2.on_next_single('b') + self.source2.on_completed() + + self.assertEqual([1, 2, 'a', 'b'], sink.received) + self.assertTrue(sink.is_completed) diff --git a/tests/test_multicast/test_rxextensions/test_liftobservable.py b/tests/test_multicast/test_rxextensions/test_liftobservable.py new file mode 100644 index 0000000..196a887 --- /dev/null +++ b/tests/test_multicast/test_rxextensions/test_liftobservable.py @@ -0,0 +1,77 @@ +import unittest + +import rx +from rx import operators as rxop + +from rxbp.multicast.rxextensions.liftobservable import LiftObservable +from rxbp.multicast.testing.testrxobservable import TestRxObservable +from rxbp.multicast.testing.testrxobserver import TestRxObserver +from rxbp.testing.testscheduler import TestScheduler + + +class TestLiftObservable(unittest.TestCase): + def setUp(self): + self.source = TestRxObservable() + self.sink = TestRxObserver() + self.scheduler = TestScheduler() + + def func(obs: rx.typing.Observable, first): + return obs + + self.disposable = LiftObservable( + self.source, + func=func, + scheduler=self.scheduler, + ).pipe( + rxop.flat_map(lambda s: s) + ).subscribe(self.sink) + + def test_scheduled_on_next(self): + self.source.on_next(1) + self.source.on_next(2) + + self.scheduler.advance_by(1) + + self.assertEqual([1, 2], self.sink.received) + self.assertFalse(self.sink.is_completed) + + def test_non_scheduled_on_next(self): + self.source.on_next(1) + self.source.on_next(2) + self.scheduler.advance_by(1) + + self.source.on_next(3) + self.source.on_next(4) + + self.assertEqual([1, 2, 3, 4], self.sink.received) + self.assertFalse(self.sink.is_completed) + + def test_on_completed(self): + self.source.on_completed() + + self.assertTrue(self.source.is_disposed) + self.assertTrue(self.sink.is_completed) + + def test_on_completed_after_on_next(self): + self.source.on_next(1) + self.source.on_next(2) + self.source.on_completed() + + self.scheduler.advance_by(1) + + self.assertEqual([1, 2], self.sink.received) + self.assertTrue(self.sink.is_completed) + + def test_dispose(self): + self.source.on_next(1) + self.source.on_next(2) + + self.disposable.dispose() + + self.assertEqual([], self.sink.received) + self.assertTrue(self.source.is_disposed) + + def test_dispose_without_subscriber(self): + self.disposable.dispose() + + self.assertTrue(self.source.is_disposed)