From 694ec3b021faa7ad27f59e9f86a36ccc734a4c49 Mon Sep 17 00:00:00 2001 From: Dag Brattli Date: Sat, 12 Mar 2022 07:53:56 +0100 Subject: [PATCH 1/2] WithLatestFrom: Subscribe children first Fixes #631 --- reactivex/observable/withlatestfrom.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/reactivex/observable/withlatestfrom.py b/reactivex/observable/withlatestfrom.py index 532c879d0..8aff8721f 100644 --- a/reactivex/observable/withlatestfrom.py +++ b/reactivex/observable/withlatestfrom.py @@ -41,15 +41,14 @@ def on_next(value: Any) -> None: result = (value,) + tuple(values) observer.on_next(result) + children_subscription = [ + subscribechild(i, child) for i, child in enumerate(children) + ] disp = parent.subscribe( on_next, observer.on_error, observer.on_completed, scheduler=scheduler ) parent_subscription.disposable = disp - children_subscription = [ - subscribechild(i, child) for i, child in enumerate(children) - ] - return [parent_subscription] + children_subscription return CompositeDisposable(subscribeall(parent, *sources)) From 038e89471177258c30ecedee108c4c34a175611d Mon Sep 17 00:00:00 2001 From: Dag Brattli Date: Sat, 12 Mar 2022 08:21:07 +0100 Subject: [PATCH 2/2] Add overloads for replay operator --- reactivex/operators/__init__.py | 24 +++++++++++++++++++++++- reactivex/operators/_replay.py | 2 +- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index ea952eafa..5d2258309 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -2427,10 +2427,32 @@ def repeat( return repeat_(repeat_count) +@overload +def replay( + buffer_size: Optional[int] = None, + window: Optional[typing.RelativeTime] = None, + *, + scheduler: Optional[abc.SchedulerBase] = None, +) -> Callable[[Observable[_T1]], ConnectableObservable[_T1]]: + ... + + +@overload def replay( - mapper: Optional[Mapper[Observable[_T1], Observable[_T2]]] = None, buffer_size: Optional[int] = None, window: Optional[typing.RelativeTime] = None, + *, + mapper: Optional[Mapper[Observable[_T1], Observable[_T2]]], + scheduler: Optional[abc.SchedulerBase] = None, +) -> Callable[[Observable[_T1]], Observable[_T2]]: + ... + + +def replay( + buffer_size: Optional[int] = None, + window: Optional[typing.RelativeTime] = None, + *, + mapper: Optional[Mapper[Observable[_T1], Observable[_T2]]] = None, scheduler: Optional[abc.SchedulerBase] = None, ) -> Callable[[Observable[_T1]], Union[Observable[_T2], ConnectableObservable[_T1]]]: """The `replay` operator. diff --git a/reactivex/operators/_replay.py b/reactivex/operators/_replay.py index e5d600ddf..67a271fa7 100644 --- a/reactivex/operators/_replay.py +++ b/reactivex/operators/_replay.py @@ -58,7 +58,7 @@ def subject_factory( return ops.multicast(subject_factory=subject_factory, mapper=mapper) rs: ReplaySubject[_TSource] = ReplaySubject(buffer_size, window, scheduler) - return ops.multicast(rs) + return ops.multicast(subject=rs) __all__ = ["replay_"]