From e943e5719fbb5530e77c07b527c3334ad8955b46 Mon Sep 17 00:00:00 2001 From: mat Date: Mon, 6 Feb 2023 17:57:45 +0800 Subject: [PATCH 01/10] Run emits _T_out --- reactivex/observable/observable.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reactivex/observable/observable.py b/reactivex/observable/observable.py index b32e237e..8cf2086d 100644 --- a/reactivex/observable/observable.py +++ b/reactivex/observable/observable.py @@ -236,7 +236,7 @@ def pipe(self, *operators: Callable[[Any], Any]) -> Any: return pipe_(self, *operators) - def run(self) -> Any: + def run(self) -> _T_out: """Run source synchronously. Subscribes to the observable source. Then blocks and waits for the From 3487ae60707b38c680604cf0733f7c7879705234 Mon Sep 17 00:00:00 2001 From: mat Date: Mon, 6 Feb 2023 18:24:53 +0800 Subject: [PATCH 02/10] Throttle first curry flipped --- reactivex/operators/__init__.py | 18 ++++- reactivex/operators/_throttlefirst.py | 87 +++++++++++---------- tests/test_observable/test_all.py | 17 ++++ tests/test_observable/test_throttlefirst.py | 5 -- 4 files changed, 76 insertions(+), 51 deletions(-) diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index c4847454..8bd2141d 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -63,9 +63,14 @@ def all(predicate: Predicate[_T]) -> Callable[[Observable[_T]], Observable[bool] .. marble:: :alt: all - --1--2--3--4--5-| - [ all(i: i<10) ] - ----------------true-| + --1--2--3--4--5--6----| + [ all(i: i<8) ] + ------------------true| + + + --1--2--3--4--5--6----| + [ all(i: i<4) ] + ------false| Example: >>> op = all(lambda value: value.length > 3) @@ -78,6 +83,13 @@ def all(predicate: Predicate[_T]) -> Callable[[Observable[_T]], Observable[bool] returns an observable sequence containing a single element determining whether all elements in the source sequence pass the test in the specified predicate. + + If a predicate returns false, the result sequence emits false + and completes immediately, regardless of the state of the + source sequence. + + If all items pass the predicate test, the emission of true + will only happen as the source completes. """ from ._all import all_ diff --git a/reactivex/operators/_throttlefirst.py b/reactivex/operators/_throttlefirst.py index 58fec06a..f7df8fd6 100644 --- a/reactivex/operators/_throttlefirst.py +++ b/reactivex/operators/_throttlefirst.py @@ -3,55 +3,56 @@ from reactivex import Observable, abc, typing from reactivex.scheduler import TimeoutScheduler +from reactivex.curry import curry_flip _T = TypeVar("_T") +@curry_flip(1) def throttle_first_( - window_duration: typing.RelativeTime, scheduler: Optional[abc.SchedulerBase] = None + source: Observable[_T], + window_duration: typing.RelativeTime, + scheduler: Optional[abc.SchedulerBase] = None, ) -> Callable[[Observable[_T]], Observable[_T]]: - def throttle_first(source: Observable[_T]) -> Observable[_T]: - """Returns an observable that emits only the first item emitted - by the source Observable during sequential time windows of a - specified duration. - - Args: - source: Source observable to throttle. - - Returns: - An Observable that performs the throttle operation. - """ - - def subscribe( - observer: abc.ObserverBase[_T], - scheduler_: Optional[abc.SchedulerBase] = None, - ) -> abc.DisposableBase: - _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton() - - duration = _scheduler.to_timedelta(window_duration or 0.0) - if duration <= _scheduler.to_timedelta(0): - raise ValueError("window_duration cannot be less or equal zero.") - last_on_next: Optional[datetime] = None - - def on_next(x: _T) -> None: - nonlocal last_on_next - emit = False - now = _scheduler.now - - with source.lock: - if not last_on_next or now - last_on_next >= duration: - last_on_next = now - emit = True - if emit: - observer.on_next(x) - - return source.subscribe( - on_next, observer.on_error, observer.on_completed, scheduler=_scheduler - ) - - return Observable(subscribe) - - return throttle_first + """Returns an observable that emits only the first item emitted + by the source Observable during sequential time windows of a + specified duration. + + Args: + source: Source observable to throttle. + + Returns: + An Observable that performs the throttle operation. + """ + + def subscribe( + observer: abc.ObserverBase[_T], + scheduler_: Optional[abc.SchedulerBase] = None, + ) -> abc.DisposableBase: + _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton() + + duration = _scheduler.to_timedelta(window_duration or 0.0) + if duration <= _scheduler.to_timedelta(0): + raise ValueError("window_duration cannot be less or equal zero.") + last_on_next: Optional[datetime] = None + + def on_next(x: _T) -> None: + nonlocal last_on_next + emit = False + now = _scheduler.now + + with source.lock: + if not last_on_next or now - last_on_next >= duration: + last_on_next = now + emit = True + if emit: + observer.on_next(x) + + return source.subscribe( + on_next, observer.on_error, observer.on_completed, scheduler=_scheduler + ) + + return Observable(subscribe) __all__ = ["throttle_first_"] diff --git a/tests/test_observable/test_all.py b/tests/test_observable/test_all.py index 451b63a2..799fba85 100644 --- a/tests/test_observable/test_all.py +++ b/tests/test_observable/test_all.py @@ -2,6 +2,7 @@ from reactivex import operators as _ from reactivex.testing import ReactiveTest, TestScheduler +from reactivex.testing.subscription import Subscription on_next = ReactiveTest.on_next on_completed = ReactiveTest.on_completed @@ -24,6 +25,17 @@ def create(): res = scheduler.start(create=create).messages assert res == [on_next(250, True), on_completed(250)] + def test_all_no_emit(self): + """Should emit true if no item is emitted""" + scheduler = TestScheduler() + xs = scheduler.create_hot_observable(on_completed(250)) + + def create(): + return xs.pipe(_.all(lambda x: x > 0)) + + res = scheduler.start(create=create).messages + assert res == [on_next(250, True), on_completed(250)] + def test_all_return(self): scheduler = TestScheduler() msgs = [on_next(150, 1), on_next(210, 2), on_completed(250)] @@ -79,8 +91,12 @@ def create(): res = scheduler.start(create=create).messages assert res == [on_next(210, False), on_completed(210)] + assert xs.subscriptions == [Subscription(200, 210)] def test_all_some_all_match(self): + """Should emit true and complete after the source completes if all + items pass the predicate test + """ scheduler = TestScheduler() msgs = [ on_next(150, 1), @@ -119,6 +135,7 @@ def create(): res = scheduler.start(create=create).messages assert res == [] + assert xs.subscriptions == [Subscription(200, 1000)] if __name__ == "__main__": diff --git a/tests/test_observable/test_throttlefirst.py b/tests/test_observable/test_throttlefirst.py index 25f65d6a..dd063594 100644 --- a/tests/test_observable/test_throttlefirst.py +++ b/tests/test_observable/test_throttlefirst.py @@ -16,11 +16,6 @@ class RxException(Exception): pass -# Helper function for raising exceptions within lambdas -def _raise(ex): - raise RxException(ex) - - class TestThrottleFirst(unittest.TestCase): def test_throttle_first_completed(self): scheduler = TestScheduler() From 535683060d92449f09f3be94a3c281020580d78e Mon Sep 17 00:00:00 2001 From: mat Date: Mon, 6 Feb 2023 18:40:03 +0800 Subject: [PATCH 03/10] Timestamp curry flipped --- reactivex/operators/_throttlefirst.py | 2 +- reactivex/operators/_timestamp.py | 40 +++++++++++++------------ tests/test_observable/test_timestamp.py | 15 ++++++---- 3 files changed, 32 insertions(+), 25 deletions(-) diff --git a/reactivex/operators/_throttlefirst.py b/reactivex/operators/_throttlefirst.py index f7df8fd6..ff4ff9db 100644 --- a/reactivex/operators/_throttlefirst.py +++ b/reactivex/operators/_throttlefirst.py @@ -2,8 +2,8 @@ from typing import Callable, Optional, TypeVar from reactivex import Observable, abc, typing -from reactivex.scheduler import TimeoutScheduler from reactivex.curry import curry_flip +from reactivex.scheduler import TimeoutScheduler _T = TypeVar("_T") diff --git a/reactivex/operators/_timestamp.py b/reactivex/operators/_timestamp.py index 2ae40607..e09baaae 100644 --- a/reactivex/operators/_timestamp.py +++ b/reactivex/operators/_timestamp.py @@ -3,6 +3,7 @@ from typing import Callable, Generic, Optional, TypeVar from reactivex import Observable, abc, defer, operators +from reactivex.curry import curry_flip from reactivex.scheduler import TimeoutScheduler _T = TypeVar("_T") @@ -14,36 +15,37 @@ class Timestamp(Generic[_T]): timestamp: datetime +@curry_flip(1) def timestamp_( + source: Observable[_T], scheduler: Optional[abc.SchedulerBase] = None, ) -> Callable[[Observable[_T]], Observable[Timestamp[_T]]]: - def timestamp(source: Observable[_T]) -> Observable[Timestamp[_T]]: - """Records the timestamp for each value in an observable sequence. + """Records the timestamp for each value in an observable sequence. - Examples: - >>> timestamp(source) + Examples: + >>> timestamp(source) - Produces objects with attributes `value` and `timestamp`, where - value is the original value. + Produces objects with attributes `value` and `timestamp`, where + value is the original value. - Args: - source: Observable source to timestamp. + Args: + source: Observable source to timestamp. - Returns: - An observable sequence with timestamp information on values. - """ + Returns: + An observable sequence with timestamp information on values. + Each emitted item is a Timestamp object with `.value` and + `.timestamp` attributes + """ - def factory(scheduler_: Optional[abc.SchedulerBase] = None): - _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton() + def factory(scheduler_: Optional[abc.SchedulerBase] = None): + _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton() - def mapper(value: _T) -> Timestamp[_T]: - return Timestamp(value=value, timestamp=_scheduler.now) + def mapper(value: _T) -> Timestamp[_T]: + return Timestamp(value=value, timestamp=_scheduler.now) - return source.pipe(operators.map(mapper)) + return source.pipe(operators.map(mapper)) - return defer(factory) - - return timestamp + return defer(factory) __all__ = ["timestamp_"] diff --git a/tests/test_observable/test_timestamp.py b/tests/test_observable/test_timestamp.py index 87fff722..67ea0895 100644 --- a/tests/test_observable/test_timestamp.py +++ b/tests/test_observable/test_timestamp.py @@ -1,8 +1,10 @@ import unittest from datetime import datetime +from typing import Any, Union import reactivex from reactivex import operators as ops +from reactivex.operators._timestamp import Timestamp as OriginalTimestamp from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -15,11 +17,11 @@ class Timestamp(object): - def __init__(self, value, timestamp): + def __init__(self, value: Any, timestamp: Union[datetime, int]): if isinstance(timestamp, datetime): - timestamp = timestamp - datetime.utcfromtimestamp(0) + time_delta = timestamp - datetime.utcfromtimestamp(0) timestamp = int( - timestamp.seconds + time_delta.seconds ) # FIXME: Must fix when tests run at fraction of seconds. self.value = value @@ -28,7 +30,7 @@ def __init__(self, value, timestamp): def __str__(self): return "%s@%s" % (self.value, self.timestamp) - def equals(self, other): + def equals(self, other: "Timestamp"): return other.timestamp == self.timestamp and other.value == self.value @@ -46,7 +48,7 @@ def test_timestamp_regular(self): ) def create(): - def mapper(x): + def mapper(x: OriginalTimestamp[int]): return Timestamp(x.value, x.timestamp) return xs.pipe( @@ -64,6 +66,8 @@ def mapper(x): on_completed(400), ] + assert xs.subscriptions == [subscribe(200, 400)] + def test_timestamp_empty(self): scheduler = TestScheduler() @@ -74,6 +78,7 @@ def create(): assert results.messages == [on_completed(200)] def test_timestamp_error(self): + """Should not timestamp errors""" ex = "ex" scheduler = TestScheduler() From fe265ef28af46738ba52cb3726f6b359251c6c80 Mon Sep 17 00:00:00 2001 From: mat Date: Wed, 8 Feb 2023 05:55:27 +0800 Subject: [PATCH 04/10] Remove average extra check Fix typing on timestamp --- reactivex/operators/_average.py | 67 +++++++++++++-------------- reactivex/operators/_take.py | 61 ++++++++++++------------ reactivex/operators/_throttlefirst.py | 4 +- reactivex/operators/_timestamp.py | 4 +- tests/test_observable/test_average.py | 5 +- 5 files changed, 68 insertions(+), 73 deletions(-) diff --git a/reactivex/operators/_average.py b/reactivex/operators/_average.py index 99ab672e..291de905 100644 --- a/reactivex/operators/_average.py +++ b/reactivex/operators/_average.py @@ -1,7 +1,8 @@ from dataclasses import dataclass -from typing import Any, Callable, Optional, TypeVar, cast +from typing import Any, Optional, TypeVar, cast from reactivex import Observable, operators, typing +from reactivex.curry import curry_flip _T = TypeVar("_T") @@ -12,51 +13,47 @@ class AverageValue: count: int +@curry_flip(1) def average_( + source: Observable[Any], key_mapper: Optional[typing.Mapper[_T, float]] = None, -) -> Callable[[Observable[_T]], Observable[float]]: - def average(source: Observable[Any]) -> Observable[float]: - """Partially applied average operator. +) -> Observable[float]: + """Partially applied average operator. - Computes the average of an observable sequence of values that - are in the sequence or obtained by invoking a transform - function on each element of the input sequence if present. + Computes the average of an observable sequence of values that + are in the sequence or obtained by invoking a transform + function on each element of the input sequence if present. - Examples: - >>> res = average(source) + Examples: + >>> res = average(source) - Args: - source: Source observable to average. + Args: + source: Source observable to average. - Returns: - An observable sequence containing a single element with the - average of the sequence of values. - """ + Returns: + An observable sequence containing a single element with the + average of the sequence of values. + """ - key_mapper_: typing.Mapper[_T, float] = key_mapper or ( - lambda x: float(cast(Any, x)) - ) + key_mapper_: typing.Mapper[_T, float] = key_mapper or ( + lambda x: float(cast(Any, x)) + ) - def accumulator(prev: AverageValue, cur: float) -> AverageValue: - return AverageValue(sum=prev.sum + cur, count=prev.count + 1) + def accumulator(prev: AverageValue, cur: float) -> AverageValue: + return AverageValue(sum=prev.sum + cur, count=prev.count + 1) - def mapper(s: AverageValue) -> float: - if s.count == 0: - raise Exception("The input sequence was empty") + def mapper(s: AverageValue) -> float: + return s.sum / float(s.count) - return s.sum / float(s.count) + seed = AverageValue(sum=0, count=0) - seed = AverageValue(sum=0, count=0) - - ret = source.pipe( - operators.map(key_mapper_), - operators.scan(accumulator, seed), - operators.last(), - operators.map(mapper), - ) - return ret - - return average + ret = source.pipe( + operators.map(key_mapper_), + operators.scan(accumulator, seed), + operators.last(), + operators.map(mapper), + ) + return ret __all__ = ["average_"] diff --git a/reactivex/operators/_take.py b/reactivex/operators/_take.py index 744d7492..19086985 100644 --- a/reactivex/operators/_take.py +++ b/reactivex/operators/_take.py @@ -1,53 +1,52 @@ -from typing import Callable, Optional, TypeVar +from typing import Optional, TypeVar, cast from reactivex import Observable, abc, empty +from reactivex.curry import curry_flip from reactivex.internal import ArgumentOutOfRangeException _T = TypeVar("_T") -def take_(count: int) -> Callable[[Observable[_T]], Observable[_T]]: +@curry_flip(1) +def take_(source: Observable[_T], count: int) -> Observable[_T]: if count < 0: raise ArgumentOutOfRangeException() - def take(source: Observable[_T]) -> Observable[_T]: - """Returns a specified number of contiguous elements from the start of - an observable sequence. + """Returns a specified number of contiguous elements from the start of + an observable sequence. - >>> take(source) + >>> take(source) - Keyword arguments: - count -- The number of elements to return. + Keyword arguments: + count -- The number of elements to return. - Returns an observable sequence that contains the specified number of - elements from the start of the input sequence. - """ + Returns an observable sequence that contains the specified number of + elements from the start of the input sequence. + """ - if not count: - return empty() + if not count: + return cast(Observable[_T], empty()) - def subscribe( - observer: abc.ObserverBase[_T], - scheduler: Optional[abc.SchedulerBase] = None, - ): - remaining = count + def subscribe( + observer: abc.ObserverBase[_T], + scheduler: Optional[abc.SchedulerBase] = None, + ): + remaining = count - def on_next(value: _T) -> None: - nonlocal remaining + def on_next(value: _T) -> None: + nonlocal remaining - if remaining > 0: - remaining -= 1 - observer.on_next(value) - if not remaining: - observer.on_completed() + if remaining > 0: + remaining -= 1 + observer.on_next(value) + if not remaining: + observer.on_completed() - return source.subscribe( - on_next, observer.on_error, observer.on_completed, scheduler=scheduler - ) + return source.subscribe( + on_next, observer.on_error, observer.on_completed, scheduler=scheduler + ) - return Observable(subscribe) - - return take + return Observable(subscribe) __all__ = ["take_"] diff --git a/reactivex/operators/_throttlefirst.py b/reactivex/operators/_throttlefirst.py index ff4ff9db..65b5ce13 100644 --- a/reactivex/operators/_throttlefirst.py +++ b/reactivex/operators/_throttlefirst.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Callable, Optional, TypeVar +from typing import Optional, TypeVar from reactivex import Observable, abc, typing from reactivex.curry import curry_flip @@ -13,7 +13,7 @@ def throttle_first_( source: Observable[_T], window_duration: typing.RelativeTime, scheduler: Optional[abc.SchedulerBase] = None, -) -> Callable[[Observable[_T]], Observable[_T]]: +) -> Observable[_T]: """Returns an observable that emits only the first item emitted by the source Observable during sequential time windows of a specified duration. diff --git a/reactivex/operators/_timestamp.py b/reactivex/operators/_timestamp.py index e09baaae..cf3f6a54 100644 --- a/reactivex/operators/_timestamp.py +++ b/reactivex/operators/_timestamp.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from datetime import datetime -from typing import Callable, Generic, Optional, TypeVar +from typing import Generic, Optional, TypeVar from reactivex import Observable, abc, defer, operators from reactivex.curry import curry_flip @@ -19,7 +19,7 @@ class Timestamp(Generic[_T]): def timestamp_( source: Observable[_T], scheduler: Optional[abc.SchedulerBase] = None, -) -> Callable[[Observable[_T]], Observable[Timestamp[_T]]]: +) -> Observable[Timestamp[_T]]: """Records the timestamp for each value in an observable sequence. Examples: diff --git a/tests/test_observable/test_average.py b/tests/test_observable/test_average.py index ebe75cbf..4bba22aa 100644 --- a/tests/test_observable/test_average.py +++ b/tests/test_observable/test_average.py @@ -1,6 +1,7 @@ import unittest from reactivex import operators as _ +from reactivex.internal.exceptions import SequenceContainsNoElementsError from reactivex.testing import ReactiveTest, TestScheduler on_next = ReactiveTest.on_next @@ -19,9 +20,7 @@ def test_average_int32_empty(self): xs = scheduler.create_hot_observable(msgs) res = scheduler.start(create=lambda: xs.pipe(_.average())).messages - assert len(res) == 1 - assert res[0].value.kind == "E" and res[0].value.exception != None - assert res[0].time == 250 + assert res == [on_error(250, SequenceContainsNoElementsError())] def test_average_int32_return(self): scheduler = TestScheduler() From 4a487e350e40c2457c90e7b5f77c58f04dcc4dc4 Mon Sep 17 00:00:00 2001 From: mat Date: Wed, 8 Feb 2023 06:39:29 +0800 Subject: [PATCH 05/10] amb typing to match amb_ (add Future) Curry flip amb Typing on fromfuture tests Doc: Remove 3rd observable in amb since it takes only 2 Doc: show that completion follows winner --- reactivex/operators/__init__.py | 3 +- reactivex/operators/_amb.py | 147 +++++++++++------------ tests/test_observable/test_fromfuture.py | 25 ++-- 3 files changed, 87 insertions(+), 88 deletions(-) diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index 8bd2141d..6f27e320 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -102,9 +102,8 @@ def amb(right_source: Observable[_T]) -> Callable[[Observable[_T]], Observable[_ .. marble:: :alt: amb - ---8--6--9-----------| + ---8--6--9---------| --1--2--3---5--------| - ----------10-20-30---| [ amb() ] --1--2--3---5--------| diff --git a/reactivex/operators/_amb.py b/reactivex/operators/_amb.py index 49bf94ad..3dc9398d 100644 --- a/reactivex/operators/_amb.py +++ b/reactivex/operators/_amb.py @@ -1,92 +1,91 @@ from asyncio import Future -from typing import Callable, List, Optional, TypeVar, Union +from typing import List, Optional, TypeVar, Union from reactivex import Observable, abc, from_future +from reactivex.curry import curry_flip from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable _T = TypeVar("_T") +@curry_flip(1) def amb_( - right_source: Union[Observable[_T], "Future[_T]"] -) -> Callable[[Observable[_T]], Observable[_T]]: + left_source: Observable[_T], right_source: Union[Observable[_T], "Future[_T]"] +) -> Observable[_T]: if isinstance(right_source, Future): obs: Observable[_T] = from_future(right_source) else: obs = right_source - def amb(left_source: Observable[_T]) -> Observable[_T]: - def subscribe( - observer: abc.ObserverBase[_T], - scheduler: Optional[abc.SchedulerBase] = None, - ) -> abc.DisposableBase: - choice: List[Optional[str]] = [None] - left_choice = "L" - right_choice = "R" - left_subscription = SingleAssignmentDisposable() - right_subscription = SingleAssignmentDisposable() - - def choice_left(): - if not choice[0]: - choice[0] = left_choice - right_subscription.dispose() - - def choice_right(): - if not choice[0]: - choice[0] = right_choice - left_subscription.dispose() - - def on_next_left(value: _T) -> None: - with left_source.lock: - choice_left() - if choice[0] == left_choice: - observer.on_next(value) - - def on_error_left(err: Exception) -> None: - with left_source.lock: - choice_left() - if choice[0] == left_choice: - observer.on_error(err) - - def on_completed_left() -> None: - with left_source.lock: - choice_left() - if choice[0] == left_choice: - observer.on_completed() - - left_d = left_source.subscribe( - on_next_left, on_error_left, on_completed_left, scheduler=scheduler - ) - left_subscription.disposable = left_d - - def send_right(value: _T) -> None: - with left_source.lock: - choice_right() - if choice[0] == right_choice: - observer.on_next(value) - - def on_error_right(err: Exception) -> None: - with left_source.lock: - choice_right() - if choice[0] == right_choice: - observer.on_error(err) - - def on_completed_right() -> None: - with left_source.lock: - choice_right() - if choice[0] == right_choice: - observer.on_completed() - - right_d = obs.subscribe( - send_right, on_error_right, on_completed_right, scheduler=scheduler - ) - right_subscription.disposable = right_d - return CompositeDisposable(left_subscription, right_subscription) - - return Observable(subscribe) - - return amb + def subscribe( + observer: abc.ObserverBase[_T], + scheduler: Optional[abc.SchedulerBase] = None, + ) -> abc.DisposableBase: + choice: List[Optional[str]] = [None] + left_choice = "L" + right_choice = "R" + left_subscription = SingleAssignmentDisposable() + right_subscription = SingleAssignmentDisposable() + + def choice_left(): + if not choice[0]: + choice[0] = left_choice + right_subscription.dispose() + + def choice_right(): + if not choice[0]: + choice[0] = right_choice + left_subscription.dispose() + + def on_next_left(value: _T) -> None: + with left_source.lock: + choice_left() + if choice[0] == left_choice: + observer.on_next(value) + + def on_error_left(err: Exception) -> None: + with left_source.lock: + choice_left() + if choice[0] == left_choice: + observer.on_error(err) + + def on_completed_left() -> None: + with left_source.lock: + choice_left() + if choice[0] == left_choice: + observer.on_completed() + + left_d = left_source.subscribe( + on_next_left, on_error_left, on_completed_left, scheduler=scheduler + ) + left_subscription.disposable = left_d + + def send_right(value: _T) -> None: + with left_source.lock: + choice_right() + if choice[0] == right_choice: + observer.on_next(value) + + def on_error_right(err: Exception) -> None: + with left_source.lock: + choice_right() + if choice[0] == right_choice: + observer.on_error(err) + + def on_completed_right() -> None: + with left_source.lock: + choice_right() + if choice[0] == right_choice: + observer.on_completed() + + right_d = obs.subscribe( + send_right, on_error_right, on_completed_right, scheduler=scheduler + ) + right_subscription.disposable = right_d + return CompositeDisposable(left_subscription, right_subscription) + + return Observable(subscribe) __all__ = ["amb_"] diff --git a/tests/test_observable/test_fromfuture.py b/tests/test_observable/test_fromfuture.py index 7f85d80b..ecdc94cd 100644 --- a/tests/test_observable/test_fromfuture.py +++ b/tests/test_observable/test_fromfuture.py @@ -1,6 +1,7 @@ import asyncio import unittest from asyncio import Future +from typing import Any import reactivex @@ -11,15 +12,15 @@ def test_future_success(self): success = [False, True, False] async def go(): - future = Future() + future: Future[int] = Future() future.set_result(42) source = reactivex.from_future(future) - def on_next(x): + def on_next(x: int): success[0] = x == 42 - def on_error(err): + def on_error(_err: Exception): success[1] = False def on_completed(): @@ -37,15 +38,15 @@ def test_future_failure(self): async def go(): error = Exception("woops") - future = Future() + future: Future[Any] = Future() future.set_exception(error) source = reactivex.from_future(future) - def on_next(x): + def on_next(x: Any): success[0] = False - def on_error(err): + def on_error(err: Exception): success[1] = str(err) == str(error) def on_completed(): @@ -61,13 +62,13 @@ def test_future_cancel(self): success = [True, False, True] async def go(): - future = Future() + future: Future[Any] = Future() source = reactivex.from_future(future) - def on_next(x): + def on_next(x: Any): success[0] = False - def on_error(err): + def on_error(err: Any): success[1] = type(err) == asyncio.CancelledError def on_completed(): @@ -84,15 +85,15 @@ def test_future_dispose(self): success = [True, True, True] async def go(): - future = Future() + future: Future[int] = Future() future.set_result(42) source = reactivex.from_future(future) - def on_next(x): + def on_next(x: int): success[0] = False - def on_error(err): + def on_error(err: Exception): success[1] = False def on_completed(): From cf1916b13649de9b8a2c1481dff6a93869866259 Mon Sep 17 00:00:00 2001 From: mat Date: Wed, 8 Feb 2023 07:13:01 +0800 Subject: [PATCH 06/10] todict curry flipped add test for no element mapper --- reactivex/operators/_todict.py | 87 ++++++++++++++-------------- tests/test_observable/test_todict.py | 30 +++++++++- 2 files changed, 72 insertions(+), 45 deletions(-) diff --git a/reactivex/operators/_todict.py b/reactivex/operators/_todict.py index 74971978..07f3ec9d 100644 --- a/reactivex/operators/_todict.py +++ b/reactivex/operators/_todict.py @@ -1,64 +1,65 @@ -from typing import Callable, Dict, Optional, TypeVar, cast +from typing import Dict, Optional, TypeVar, cast from reactivex import Observable, abc from reactivex.typing import Mapper +from reactivex.curry import curry_flip _T = TypeVar("_T") _TKey = TypeVar("_TKey") _TValue = TypeVar("_TValue") +@curry_flip(1) def to_dict_( - key_mapper: Mapper[_T, _TKey], element_mapper: Optional[Mapper[_T, _TValue]] = None -) -> Callable[[Observable[_T]], Observable[Dict[_TKey, _TValue]]]: - def to_dict(source: Observable[_T]) -> Observable[Dict[_TKey, _TValue]]: - """Converts the observable sequence to a Map if it exists. - - Args: - source: Source observable to convert. - - Returns: - An observable sequence with a single value of a dictionary - containing the values from the observable sequence. - """ - - def subscribe( - observer: abc.ObserverBase[Dict[_TKey, _TValue]], - scheduler: Optional[abc.SchedulerBase] = None, - ) -> abc.DisposableBase: - m: Dict[_TKey, _TValue] = dict() - - def on_next(x: _T) -> None: + source: Observable[_T], + key_mapper: Mapper[_T, _TKey], + element_mapper: Optional[Mapper[_T, _TValue]] = None, +) -> Observable[Dict[_TKey, _TValue]]: + """Converts the observable sequence to a Map if it exists. + + Args: + source: Source observable to convert. + + Returns: + An observable sequence with a single value of a dictionary + containing the values from the observable sequence. + """ + + def subscribe( + observer: abc.ObserverBase[Dict[_TKey, _TValue]], + scheduler: Optional[abc.SchedulerBase] = None, + ) -> abc.DisposableBase: + m: Dict[_TKey, _TValue] = dict() + + def on_next(x: _T) -> None: + try: + key = key_mapper(x) + except Exception as ex: # pylint: disable=broad-except + observer.on_error(ex) + return + + if element_mapper: try: - key = key_mapper(x) + element = element_mapper(x) except Exception as ex: # pylint: disable=broad-except observer.on_error(ex) return + else: + element = cast(_TValue, x) - if element_mapper: - try: - element = element_mapper(x) - except Exception as ex: # pylint: disable=broad-except - observer.on_error(ex) - return - else: - element = cast(_TValue, x) - - m[key] = cast(_TValue, element) - - def on_completed() -> None: - nonlocal m - observer.on_next(m) - m = dict() - observer.on_completed() + m[key] = cast(_TValue, element) - return source.subscribe( - on_next, observer.on_error, on_completed, scheduler=scheduler - ) + def on_completed() -> None: + nonlocal m + observer.on_next(m) + m = dict() + observer.on_completed() - return Observable(subscribe) + return source.subscribe( + on_next, observer.on_error, on_completed, scheduler=scheduler + ) - return to_dict + return Observable(subscribe) __all__ = ["to_dict_"] diff --git a/tests/test_observable/test_todict.py b/tests/test_observable/test_todict.py index 13e79abc..6b715ef5 100644 --- a/tests/test_observable/test_todict.py +++ b/tests/test_observable/test_todict.py @@ -74,7 +74,7 @@ def test_to_dict_keymapperthrows(self): ) def create(): - def key_mapper(x): + def key_mapper(x: int): if x < 4: return x * 2 else: @@ -102,7 +102,7 @@ def test_to_dict_elementmapperthrows(self): on_completed(600), ) - def value_mapper(x): + def value_mapper(x: int): if x < 4: return x * 4 else: @@ -136,3 +136,29 @@ def create(): assert res.messages == [] assert xs.subscriptions == [subscribe(200, 1000)] + + def test_to_dict_no_element_mapper(self): + scheduler = TestScheduler() + + xs = scheduler.create_hot_observable( + on_next(110, 1), + on_next(220, 2), + on_next(330, 3), + on_next(440, 4), + on_next(550, 5), + on_completed(660), + ) + + def key_mapper(x: int): + return x * 2 + + def create(): + return xs.pipe(ops.to_dict(key_mapper)) + + res = scheduler.start(create) + assert res.messages == [ + on_next(660, {4: 2, 6: 3, 8: 4, 10: 5}), + on_completed(660), + ] + + assert xs.subscriptions == [subscribe(200, 660)] From 2daae3dc861f44521ab22f068ddf58f690b63210 Mon Sep 17 00:00:00 2001 From: mat Date: Wed, 8 Feb 2023 09:50:03 +0800 Subject: [PATCH 07/10] Take while curry flipped Proposed rewrite of takewhile index to avoid duplicated code --- reactivex/operators/_takewhile.py | 185 ++++++++++-------------- tests/test_observable/test_takewhile.py | 18 +-- 2 files changed, 89 insertions(+), 114 deletions(-) diff --git a/reactivex/operators/_takewhile.py b/reactivex/operators/_takewhile.py index 1b395d03..a25a184d 100644 --- a/reactivex/operators/_takewhile.py +++ b/reactivex/operators/_takewhile.py @@ -1,121 +1,96 @@ -from typing import Any, Callable, Optional, TypeVar +from typing import Optional, TypeVar -from reactivex import Observable, abc +from reactivex import Observable, abc, operators from reactivex.typing import Predicate, PredicateIndexed +from reactivex.curry import curry_flip _T = TypeVar("_T") +@curry_flip(1) def take_while_( - predicate: Predicate[_T], inclusive: bool = False -) -> Callable[[Observable[_T]], Observable[_T]]: - def take_while(source: Observable[_T]) -> Observable[_T]: - """Returns elements from an observable sequence as long as a - specified condition is true. - - Example: - >>> take_while(source) - - Args: - source: The source observable to take from. - - Returns: - An observable sequence that contains the elements from the - input sequence that occur before the element at which the - test no longer passes. - """ - - def subscribe( - observer: abc.ObserverBase[_T], - scheduler: Optional[abc.SchedulerBase] = None, - ) -> abc.DisposableBase: - running = True - - def on_next(value: _T): - nonlocal running - - with source.lock: - if not running: - return - - try: - running = predicate(value) - except Exception as exn: - observer.on_error(exn) - return - - if running: + source: Observable[_T], predicate: Predicate[_T], inclusive: bool = False +) -> Observable[_T]: + """Returns elements from an observable sequence as long as a + specified condition is true. + + Example: + >>> take_while(source) + + Args: + source: The source observable to take from. + + Returns: + An observable sequence that contains the elements from the + input sequence that occur before the element at which the + test no longer passes. + """ + + def subscribe( + observer: abc.ObserverBase[_T], + scheduler: Optional[abc.SchedulerBase] = None, + ) -> abc.DisposableBase: + running = True + + def on_next(value: _T): + nonlocal running + + with source.lock: + if not running: + return + + try: + running = predicate(value) + except Exception as exn: + observer.on_error(exn) + return + + if running: + observer.on_next(value) + else: + if inclusive: observer.on_next(value) - else: - if inclusive: - observer.on_next(value) - observer.on_completed() + observer.on_completed() - return source.subscribe( - on_next, observer.on_error, observer.on_completed, scheduler=scheduler - ) + return source.subscribe( + on_next, observer.on_error, observer.on_completed, scheduler=scheduler + ) - return Observable(subscribe) - - return take_while + return Observable(subscribe) +@curry_flip(1) def take_while_indexed_( - predicate: PredicateIndexed[_T], inclusive: bool = False -) -> Callable[[Observable[_T]], Observable[_T]]: - def take_while_indexed(source: Observable[_T]) -> Observable[_T]: - """Returns elements from an observable sequence as long as a - specified condition is true. The element's index is used in the - logic of the predicate function. - - Example: - >>> take_while(source) - - Args: - source: Source observable to take from. - - Returns: - An observable sequence that contains the elements from the - input sequence that occur before the element at which the - test no longer passes. - """ - - def subscribe( - observer: abc.ObserverBase[_T], - scheduler: Optional[abc.SchedulerBase] = None, - ) -> abc.DisposableBase: - running = True - i = 0 - - def on_next(value: Any) -> None: - nonlocal running, i - - with source.lock: - if not running: - return - - try: - running = predicate(value, i) - except Exception as exn: - observer.on_error(exn) - return - else: - i += 1 - - if running: - observer.on_next(value) - else: - if inclusive: - observer.on_next(value) - observer.on_completed() - - return source.subscribe( - on_next, observer.on_error, observer.on_completed, scheduler=scheduler - ) - - return Observable(subscribe) - - return take_while_indexed + source: Observable[_T], predicate: PredicateIndexed[_T], inclusive: bool = False +) -> Observable[_T]: + """Returns elements from an observable sequence as long as a + specified condition is true. The element's index is used in the + logic of the predicate function. + + Example: + >>> take_while(source) + + Args: + source: Source observable to take from. + + Returns: + An observable sequence that contains the elements from the + input sequence that occur before the element at which the + test no longer passes. + """ + i = 0 + + def increment(_: _T): + nonlocal i + i += 1 + + def predicate_with_index(x: _T): + return predicate(x, i) + + return source.pipe( + take_while_(predicate_with_index, inclusive=inclusive), + operators.do_action(on_next=increment), + ) __all__ = ["take_while_", "take_while_indexed_"] diff --git a/tests/test_observable/test_takewhile.py b/tests/test_observable/test_takewhile.py index 268bd5b8..8ed2de15 100644 --- a/tests/test_observable/test_takewhile.py +++ b/tests/test_observable/test_takewhile.py @@ -33,7 +33,7 @@ def test_take_while_complete_Before(self): invoked = 0 def factory(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 @@ -72,7 +72,7 @@ def test_take_while_complete_after(self): invoked = 0 def factory(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 @@ -113,7 +113,7 @@ def test_take_while_error_before(self): invoked = 0 def factory(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 @@ -146,7 +146,7 @@ def test_take_while_error_after(self): invoked = 0 def factory(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 @@ -186,7 +186,7 @@ def test_take_while_dispose_before(self): invoked = 0 def create(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 @@ -218,7 +218,7 @@ def test_take_while_dispose_after(self): invoked = 0 def create(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 @@ -258,7 +258,7 @@ def test_take_while_zero(self): invoked = 0 def create(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 @@ -291,7 +291,7 @@ def test_take_while_on_error(self): invoked = 0 def factory(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 @@ -426,7 +426,7 @@ def test_take_while_complete_after_inclusive_true(self): invoked = 0 def factory(): - def predicate(x): + def predicate(x: int): nonlocal invoked invoked += 1 From 375f2d7c64a472236d015ec1d01f705269a5f643 Mon Sep 17 00:00:00 2001 From: mat Date: Thu, 9 Feb 2023 06:20:39 +0800 Subject: [PATCH 08/10] Add a test which uses both index and value --- tests/test_observable/test_takewhile.py | 29 +++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tests/test_observable/test_takewhile.py b/tests/test_observable/test_takewhile.py index 8ed2de15..835fa9a4 100644 --- a/tests/test_observable/test_takewhile.py +++ b/tests/test_observable/test_takewhile.py @@ -374,6 +374,35 @@ def factory(): ] assert xs.subscriptions == [subscribe(200, 350)] + def test_take_while_index_use_index_and_value(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(90, -1), + on_next(110, -1), + on_next(205, 100), + on_next(210, 2), + on_next(260, 5), + on_next(290, 13), + on_next(320, 3), + on_next(350, 7), + on_next(390, 4), + on_completed(600), + ) + + def create(): + return xs.pipe(ops.take_while_indexed(lambda x, i: x > i)) + + results = scheduler.start(create) + + assert results.messages == [ + on_next(205, 100), + on_next(210, 2), + on_next(260, 5), + on_next(290, 13), + on_completed(320), + ] + assert xs.subscriptions == [subscribe(200, 320)] + def test_take_while_index_inclusive_true(self): scheduler = TestScheduler() xs = scheduler.create_hot_observable( From 51097a58e3bd9dd86f6af59427b6a1f81328a209 Mon Sep 17 00:00:00 2001 From: mat Date: Thu, 13 Apr 2023 15:17:32 +0800 Subject: [PATCH 09/10] More accurate documentation on single --- reactivex/operators/__init__.py | 23 ++++++++++++++++++++--- tests/test_observable/test_single.py | 19 ++++++++++++++++++- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index 6f27e320..2cae6c17 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -2622,7 +2622,7 @@ def scan( Applies an accumulator function over an observable sequence and returns each intermediate result. The optional seed value is used as the initial accumulator value. For aggregation behavior with no - intermediate results, see `aggregate()` or `Observable()`. + intermediate results, see `reduce()` or `Observable()`. .. marble:: :alt: scan @@ -2716,12 +2716,29 @@ def single( the condition in the optional predicate, and reports an exception if there is not exactly one element in the observable sequence. + If the predicates does not match any item, the resulting sequence + errors once the source completes. + + If the predicate matches more than one item, the resulting sequence + errors immediately, without waiting for the source to complete. + + If the source never completes, the resulting sequence does not + emit anything, nor completes. + .. marble:: :alt: single ----1--2--3--4-----| - [ single(3) ] - ----------3--------| + [ single(x==3) ] + -----------------3-| + + ----1--3--3--4-----| + [ single(x==3) ] + ----------x + + ----1--1--1--1-----| + [ single(x==3) ] + -------------------x Example: >>> res = single() diff --git a/tests/test_observable/test_single.py b/tests/test_observable/test_single.py index 7c39b197..9718453d 100644 --- a/tests/test_observable/test_single.py +++ b/tests/test_observable/test_single.py @@ -22,6 +22,18 @@ def _raise(ex): class TestSingle(unittest.TestCase): + def test_single_source_never_completes(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable(on_next(150, 1), on_next(250, 2)) + + def create(): + return xs.pipe(ops.single()) + + res = scheduler.start(create=create) + + assert [] == res.messages + assert xs.subscriptions == [subscribe(200, 1000)] + def test_single_async_empty(self): scheduler = TestScheduler() xs = scheduler.create_hot_observable(on_next(150, 1), on_completed(250)) @@ -52,9 +64,14 @@ def create(): assert xs.subscriptions == [subscribe(200, 250)] def test_single_async_many(self): + """Should error as soon as a second "valid" element is encountered.""" scheduler = TestScheduler() xs = scheduler.create_hot_observable( - on_next(150, 1), on_next(210, 2), on_next(220, 3), on_completed(250) + on_next(150, 1), + on_next(210, 2), + on_next(220, 3), + on_next(230, 3), + on_completed(250), ) def create(): From 8600ec21eaefc2d55910f4ba876d71936158e983 Mon Sep 17 00:00:00 2001 From: mat Date: Thu, 13 Apr 2023 15:24:32 +0800 Subject: [PATCH 10/10] Black fixes --- reactivex/operators/_takewhile.py | 2 +- reactivex/operators/_todict.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/reactivex/operators/_takewhile.py b/reactivex/operators/_takewhile.py index a25a184d..1cd92356 100644 --- a/reactivex/operators/_takewhile.py +++ b/reactivex/operators/_takewhile.py @@ -1,8 +1,8 @@ from typing import Optional, TypeVar from reactivex import Observable, abc, operators -from reactivex.typing import Predicate, PredicateIndexed from reactivex.curry import curry_flip +from reactivex.typing import Predicate, PredicateIndexed _T = TypeVar("_T") diff --git a/reactivex/operators/_todict.py b/reactivex/operators/_todict.py index 07f3ec9d..a40f04e3 100644 --- a/reactivex/operators/_todict.py +++ b/reactivex/operators/_todict.py @@ -1,8 +1,8 @@ from typing import Dict, Optional, TypeVar, cast from reactivex import Observable, abc -from reactivex.typing import Mapper from reactivex.curry import curry_flip +from reactivex.typing import Mapper _T = TypeVar("_T") _TKey = TypeVar("_TKey")