Skip to content

Commit

Permalink
Make tracebacks more readable by avoiding AnonymousObserver for the c…
Browse files Browse the repository at this point in the history
…ommon case and flattening AutoDetachObserver
  • Loading branch information
dbrattli committed Jan 20, 2019
1 parent 9711f15 commit 8d83f69
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 40 deletions.
50 changes: 37 additions & 13 deletions rx/core/autodetachobserver.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,48 @@
from typing import Any

from rx.internal import noop, default_error
from rx.disposables import SingleAssignmentDisposable

from .observerbase import ObserverBase
from .typing import Observer


class AutoDetachObserver(ObserverBase):
class AutoDetachObserver(Observer):

def __init__(self, observer):
super(AutoDetachObserver, self).__init__()
def __init__(self, on_next=None, on_error=None, on_completed=None):
self._on_next = on_next or noop
self._on_error = on_error or default_error
self._on_completed = on_completed or noop

self._observer = observer
self._subscription = SingleAssignmentDisposable()
self.is_stopped = False

def on_next(self, value: Any) -> None:
if self.is_stopped:
return

def _on_next_core(self, value):
try:
self._observer.on_next(value)
self._on_next(value)
except Exception:
self.dispose()
raise

def _on_error_core(self, error):
def on_error(self, error) -> None:
if self.is_stopped:
return
self.is_stopped = True

try:
self._observer.on_error(error)
self._on_error(error)
finally:
self.dispose()

def _on_completed_core(self):
def on_completed(self) -> None:
if self.is_stopped:
return
self.is_stopped = True

try:
self._observer.on_completed()
self._on_completed()
finally:
self.dispose()

Expand All @@ -35,6 +51,14 @@ def set_disposable(self, value):

subscription = property(fset=set_disposable)

def dispose(self):
super().dispose()
def dispose(self) -> None:
self.is_stopped = True
self._subscription.dispose()

def fail(self, exn: Exception) -> bool:
if self.is_stopped:
return False

self.is_stopped = True
self._on_error(exn)
return True
52 changes: 25 additions & 27 deletions rx/core/observable/observable.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,34 +125,8 @@ def subscribe_(self,
Disposable object representing an observer's subscription
to the observable sequence.
"""
observer = AnonymousObserver(on_next, on_error, on_completed)
return self.subscribe(observer, scheduler)

def subscribe(self, observer: typing.Observer = None, scheduler: typing.Scheduler = None) -> typing.Disposable:
"""Subscribe an observer to the observable sequence.
Examples:
>>> source.subscribe()
>>> source.subscribe(observer)
Args:
observer: [Optional] The object that is to receive
notifications. You may subscribe using an observer or
callbacks, not both.
Returns:
Disposable object representing an observer's subscription
to the observable sequence.
"""

observer = observer or AnonymousObserver()
assert isinstance(observer, (abc.Observer, types.GeneratorType))

if isinstance(observer, types.GeneratorType):
if inspect.getgeneratorstate(observer) == inspect.GEN_CREATED:
observer.send(None)

auto_detach_observer = AutoDetachObserver(observer)
auto_detach_observer = AutoDetachObserver(on_next, on_error, on_completed)

def fix_subscriber(subscriber):
"""Fixes subscriber to make sure it returns a Disposable instead
Expand Down Expand Up @@ -187,6 +161,30 @@ def set_disposable(_: abc.Scheduler = None, __: Any = None):
# Hide the identity of the auto detach observer
return Disposable.create(auto_detach_observer.dispose)


def subscribe(self, observer: typing.Observer = None, scheduler: typing.Scheduler = None) -> typing.Disposable:
"""Subscribe an observer to the observable sequence.
Examples:
>>> source.subscribe()
>>> source.subscribe(observer)
Args:
observer: [Optional] The object that is to receive
notifications. You may subscribe using an observer or
callbacks, not both.
Returns:
Disposable object representing an observer's subscription
to the observable sequence.
"""

observer = observer or AnonymousObserver()
assert isinstance(observer, abc.Observer)

return self.subscribe_(observer.on_next, observer.on_error, observer.on_completed, scheduler)


def pipe(self, *operators: Callable[['Observable'], 'Observable']) -> 'Observable':
"""Compose multiple operators left to right.
Expand Down

0 comments on commit 8d83f69

Please sign in to comment.