From 8d83f698d1107b4557e91f0ef2c0f9fbb0adf207 Mon Sep 17 00:00:00 2001 From: Dag Brattli Date: Sun, 20 Jan 2019 17:02:15 +0100 Subject: [PATCH] Make tracebacks more readable by avoiding AnonymousObserver for the common case and flattening AutoDetachObserver --- rx/core/autodetachobserver.py | 50 ++++++++++++++++++++++-------- rx/core/observable/observable.py | 52 +++++++++++++++----------------- 2 files changed, 62 insertions(+), 40 deletions(-) diff --git a/rx/core/autodetachobserver.py b/rx/core/autodetachobserver.py index acd6e66c5..02e3c7f84 100644 --- a/rx/core/autodetachobserver.py +++ b/rx/core/autodetachobserver.py @@ -1,32 +1,48 @@ +from typing import Any + +from rx.internal import noop, default_error from rx.disposables import SingleAssignmentDisposable -from .observerbase import ObserverBase +from .typing import Observer -class AutoDetachObserver(ObserverBase): +class AutoDetachObserver(Observer): - def __init__(self, observer): - super(AutoDetachObserver, self).__init__() + def __init__(self, on_next=None, on_error=None, on_completed=None): + self._on_next = on_next or noop + self._on_error = on_error or default_error + self._on_completed = on_completed or noop - self._observer = observer self._subscription = SingleAssignmentDisposable() + self.is_stopped = False + + def on_next(self, value: Any) -> None: + if self.is_stopped: + return - def _on_next_core(self, value): try: - self._observer.on_next(value) + self._on_next(value) except Exception: self.dispose() raise - def _on_error_core(self, error): + def on_error(self, error) -> None: + if self.is_stopped: + return + self.is_stopped = True + try: - self._observer.on_error(error) + self._on_error(error) finally: self.dispose() - def _on_completed_core(self): + def on_completed(self) -> None: + if self.is_stopped: + return + self.is_stopped = True + try: - self._observer.on_completed() + self._on_completed() finally: self.dispose() @@ -35,6 +51,14 @@ def set_disposable(self, value): subscription = property(fset=set_disposable) - def dispose(self): - super().dispose() + def dispose(self) -> None: + self.is_stopped = True self._subscription.dispose() + + def fail(self, exn: Exception) -> bool: + if self.is_stopped: + return False + + self.is_stopped = True + self._on_error(exn) + return True diff --git a/rx/core/observable/observable.py b/rx/core/observable/observable.py index abfbf22b5..5ed55f7a9 100644 --- a/rx/core/observable/observable.py +++ b/rx/core/observable/observable.py @@ -125,34 +125,8 @@ def subscribe_(self, Disposable object representing an observer's subscription to the observable sequence. """ - observer = AnonymousObserver(on_next, on_error, on_completed) - return self.subscribe(observer, scheduler) - def subscribe(self, observer: typing.Observer = None, scheduler: typing.Scheduler = None) -> typing.Disposable: - """Subscribe an observer to the observable sequence. - - Examples: - >>> source.subscribe() - >>> source.subscribe(observer) - - Args: - observer: [Optional] The object that is to receive - notifications. You may subscribe using an observer or - callbacks, not both. - - Returns: - Disposable object representing an observer's subscription - to the observable sequence. - """ - - observer = observer or AnonymousObserver() - assert isinstance(observer, (abc.Observer, types.GeneratorType)) - - if isinstance(observer, types.GeneratorType): - if inspect.getgeneratorstate(observer) == inspect.GEN_CREATED: - observer.send(None) - - auto_detach_observer = AutoDetachObserver(observer) + auto_detach_observer = AutoDetachObserver(on_next, on_error, on_completed) def fix_subscriber(subscriber): """Fixes subscriber to make sure it returns a Disposable instead @@ -187,6 +161,30 @@ def set_disposable(_: abc.Scheduler = None, __: Any = None): # Hide the identity of the auto detach observer return Disposable.create(auto_detach_observer.dispose) + + def subscribe(self, observer: typing.Observer = None, scheduler: typing.Scheduler = None) -> typing.Disposable: + """Subscribe an observer to the observable sequence. + + Examples: + >>> source.subscribe() + >>> source.subscribe(observer) + + Args: + observer: [Optional] The object that is to receive + notifications. You may subscribe using an observer or + callbacks, not both. + + Returns: + Disposable object representing an observer's subscription + to the observable sequence. + """ + + observer = observer or AnonymousObserver() + assert isinstance(observer, abc.Observer) + + return self.subscribe_(observer.on_next, observer.on_error, observer.on_completed, scheduler) + + def pipe(self, *operators: Callable[['Observable'], 'Observable']) -> 'Observable': """Compose multiple operators left to right.