Skip to content

Commit

Permalink
Add some type hints to core.observer
Browse files Browse the repository at this point in the history
  • Loading branch information
erikkemperman committed Feb 18, 2019
1 parent f189412 commit c896009
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 13 deletions.
2 changes: 1 addition & 1 deletion rx/core/observable/observable.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def set_disposable(_: abc.Scheduler = None, __: Any = None):
return Disposable(auto_detach_observer.dispose)


def pipe(self, *operators: Callable[['ObservableBase'], 'ObservableBase']) -> 'ObservableBase':
def pipe(self, *operators: Callable[['Observable'], 'Observable']) -> 'Observable':
"""Compose multiple operators left to right.
Composes zero or more operators into a functional composition.
Expand Down
10 changes: 7 additions & 3 deletions rx/core/observer/autodetachobserver.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any
from typing import Any, Optional

from rx.internal import noop, default_error
from rx.disposable import SingleAssignmentDisposable
Expand All @@ -8,7 +8,11 @@

class AutoDetachObserver(typing.Observer):

def __init__(self, on_next=None, on_error=None, on_completed=None):
def __init__(self,
on_next: Optional[typing.OnNext] = None,
on_error: Optional[typing.OnError] = None,
on_completed: Optional[typing.OnCompleted] = None
) -> Any:
self._on_next = on_next or noop
self._on_error = on_error or default_error
self._on_completed = on_completed or noop
Expand Down Expand Up @@ -46,7 +50,7 @@ def on_completed(self) -> None:
finally:
self.dispose()

def set_disposable(self, value):
def set_disposable(self, value: typing.Disposable):
self._subscription.disposable = value

subscription = property(fset=set_disposable)
Expand Down
1 change: 1 addition & 0 deletions rx/core/observer/observeonobserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


class ObserveOnObserver(ScheduledObserver):

def _on_next_core(self, value):
super()._on_next_core(value)
self.ensure_active()
Expand Down
12 changes: 8 additions & 4 deletions rx/core/observer/observer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, Any
from typing import Any, Callable, NoReturn, Optional

from .. import typing

Expand All @@ -9,7 +9,11 @@ class enforces the grammar of observers where OnError and
OnCompleted are terminal messages.
"""

def __init__(self, on_next=None, on_error=None, on_completed=None):
def __init__(self,
on_next: Optional[typing.OnNext] = None,
on_error: Optional[typing.OnError] = None,
on_completed: Optional[typing.OnCompleted] = None
) -> Any:
self.is_stopped = False
if on_next is not None:
self._on_next_core = on_next
Expand Down Expand Up @@ -37,7 +41,7 @@ def on_error(self, error: Exception) -> None:
self.is_stopped = True
self._on_error_core(error)

def _on_error_core(self, error: Exception) -> None:
def _on_error_core(self, error: Exception) -> NoReturn:
if isinstance(error, BaseException):
raise error
else:
Expand Down Expand Up @@ -66,7 +70,7 @@ def fail(self, exn: Exception) -> bool:

return False

def throw(self, error) -> None:
def throw(self, error: Exception) -> NoReturn:
import traceback
traceback.print_stack()
if error:
Expand Down
9 changes: 4 additions & 5 deletions rx/core/observer/scheduledobserver.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import threading
from typing import List, Any

from rx.core import abc
from rx.core.typing import Action
from rx.core import typing
from rx.disposable import SerialDisposable

from .observer import Observer


class ScheduledObserver(Observer):
def __init__(self, scheduler: abc.Scheduler, observer: abc.Observer) -> None:
def __init__(self, scheduler: typing.Scheduler, observer: typing.Observer) -> None:
super().__init__()

self.scheduler = scheduler
Expand All @@ -18,7 +17,7 @@ def __init__(self, scheduler: abc.Scheduler, observer: abc.Observer) -> None:
self.lock = threading.RLock()
self.is_acquired = False
self.has_faulted = False
self.queue: List[Action] = []
self.queue: List[typing.Action] = []
self.disposable = SerialDisposable()

# Note to self: list append is thread safe
Expand Down Expand Up @@ -50,7 +49,7 @@ def ensure_active(self) -> None:
if is_owner:
self.disposable.disposable = self.scheduler.schedule(self.run)

def run(self, scheduler: abc.Scheduler, state: Any) -> None:
def run(self, scheduler: typing.Scheduler, state: typing.TState) -> None:
parent = self

with self.lock:
Expand Down

0 comments on commit c896009

Please sign in to comment.