Skip to content

Commit

Permalink
Fix type hints for pipe using overloads. Fixes #355, #285 (#372)
Browse files Browse the repository at this point in the history
* Fix typing for pipe using overloads.

* Fix type typo in hint overload

* Move pylint disables to the right line for the multi-line overloads

* Add an Observable specific overload first with docstring to make pylint happy.

* Remove ellpises from coverage
  • Loading branch information
dbrattli committed May 11, 2019
1 parent 1de2850 commit 89df178
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 8 deletions.
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ exclude_lines =
pragma: no cover
return NotImplemented
raise NotImplementedError
\.\.\.
[xml]
output = coverage.xml
94 changes: 90 additions & 4 deletions rx/core/observable/observable.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
# By design, pylint: disable=C0302
import threading
from typing import Any, Callable, Optional, Union, cast
from typing import Any, Callable, Optional, Union, TypeVar, cast, overload

from rx.disposable import Disposable
from rx.concurrency import current_thread_scheduler

from ..observer import AutoDetachObserver
from .. import typing, abc

A = TypeVar('A')
B = TypeVar('B')
C = TypeVar('C')
D = TypeVar('D')
E = TypeVar('E')
F = TypeVar('F')
G = TypeVar('G')


class Observable(typing.Observable):
"""Observable base class.
Expand Down Expand Up @@ -207,7 +215,85 @@ def set_disposable(_: abc.Scheduler = None, __: Any = None):
# Hide the identity of the auto detach observer
return Disposable(auto_detach_observer.dispose)

def pipe(self, *operators: Callable[['Observable'], 'Observable']) -> 'Observable':
@overload
def pipe(self, *operators: Callable[['Observable'], 'Observable']) -> 'Observable': # pylint: disable=no-self-use
"""Compose multiple operators left to right.
Composes zero or more operators into a functional composition.
The operators are composed to right. A composition of zero
operators gives back the original source.
source.pipe() == source
source.pipe(f) == f(source)
source.pipe(g, f) == f(g(source))
source.pipe(h, g, f) == f(g(h(source)))
...
Returns the composed observable.
"""
...

@overload
def pipe(self) -> 'Observable': # pylint: disable=function-redefined, no-self-use
... # pylint: disable=pointless-statement

@overload
def pipe(self, op1: Callable[['Observable'], A]) -> A: # pylint: disable=function-redefined, no-self-use
... # pylint: disable=pointless-statement

@overload
def pipe(self, # pylint: disable=function-redefined, no-self-use
op1: Callable[['Observable'], A],
op2: Callable[[A], B]) -> B:
... # pylint: disable=pointless-statement

@overload
def pipe(self, # pylint: disable=function-redefined, no-self-use
op1: Callable[['Observable'], A],
op2: Callable[[A], B],
op3: Callable[[B], C]) -> C:
... # pylint: disable=pointless-statement

@overload
def pipe(self, # pylint: disable=function-redefined, no-self-use
op1: Callable[['Observable'], A],
op2: Callable[[A], B],
op3: Callable[[B], C],
op4: Callable[[C], D]) -> D:
... # pylint: disable=pointless-statement

@overload
def pipe(self, # pylint: disable=function-redefined, no-self-use, too-many-arguments
op1: Callable[['Observable'], A],
op2: Callable[[A], B],
op3: Callable[[B], C],
op4: Callable[[C], D],
op5: Callable[[D], E]) -> E:
... # pylint: disable=pointless-statement

@overload
def pipe(self, # pylint: disable=function-redefined, no-self-use, too-many-arguments
op1: Callable[['Observable'], A],
op2: Callable[[A], B],
op3: Callable[[B], C],
op4: Callable[[C], D],
op5: Callable[[D], E],
op6: Callable[[E], F]) -> F:
... # pylint: disable=pointless-statement

@overload
def pipe(self, # pylint: disable=function-redefined, no-self-use, too-many-arguments
op1: Callable[['Observable'], A],
op2: Callable[[A], B],
op3: Callable[[B], C],
op4: Callable[[C], D],
op5: Callable[[D], E],
op6: Callable[[E], F],
op7: Callable[[F], G]) -> G:
... # pylint: disable=pointless-statement

# pylint: disable=function-redefined
def pipe(self, *operators: Callable[['Observable'], Any]) -> Any:
"""Compose multiple operators left to right.
Composes zero or more operators into a functional composition.
Expand All @@ -230,15 +316,15 @@ def run(self) -> Any:
Subscribes to the observable source. Then blocks and waits for the
observable source to either complete or error. Returns the
last value emitted, or thows exception if any error occured.
last value emitted, or throws exception if any error occurred.
Examples:
>>> result = run(source)
Raises:
SequenceContainsNoElementsError: if observable completes
(on_completed) without any values being emitted.
Exception: raises exception if any error (on_error) occured.
Exception: raises exception if any error (on_error) occurred.
Returns:
The last element emitted from the observable.
Expand Down
89 changes: 85 additions & 4 deletions rx/core/pipe.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,90 @@
from typing import Callable
from typing import Callable, Any, TypeVar, overload
from functools import reduce
from .observable import Observable

A = TypeVar('A')
B = TypeVar('B')
C = TypeVar('C')
D = TypeVar('D')
E = TypeVar('E')
F = TypeVar('F')
G = TypeVar('G')

def pipe(*operators: Callable[[Observable], Observable]) -> Callable[[Observable], Observable]:

@overload
def pipe(*operators: Callable[['Observable'], 'Observable']) -> Callable[['Observable'], 'Observable']: # type: ignore
"""Compose multiple operators left to right.
Composes zero or more operators into a functional composition. The
operators are composed to left to right. A composition of zero
operators gives back the source.
Examples:
>>> pipe()(source) == source
>>> pipe(f)(source) == f(source)
>>> pipe(f, g)(source) == g(f(source))
>>> pipe(f, g, h)(source) == h(g(f(source)))
...
Returns:
The composed observable.
"""
...

@overload
def pipe() -> Callable[[A], A]: # pylint: disable=function-redefined
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B]) -> Callable[[A], B]: # pylint: disable=function-redefined
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B], op2: Callable[[B], C]) -> Callable[[A], C]: # pylint: disable=function-redefined
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B], # pylint: disable=function-redefined
op2: Callable[[B], C],
op3: Callable[[C], D]
) -> Callable[[A], D]:
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B], # pylint: disable=function-redefined
op2: Callable[[B], C],
op3: Callable[[C], D],
op4: Callable[[D], E]
) -> Callable[[A], E]:
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B], # pylint: disable=function-redefined
op2: Callable[[B], C],
op3: Callable[[C], D],
op4: Callable[[D], E],
op5: Callable[[E], F]
) -> Callable[[A], F]:
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B], # pylint: disable=function-redefined,too-many-arguments
op2: Callable[[B], C],
op3: Callable[[C], D],
op4: Callable[[D], E],
op5: Callable[[E], F],
op6: Callable[[F], G]
) -> Callable[[A], G]:
... # pylint: disable=pointless-statement


# pylint: disable=function-redefined
def pipe(*operators: Callable[[Any], Any]) -> Callable[[Any], Any]:
"""Compose multiple operators left to right.
Composes zero or more operators into a functional composition. The
Expand All @@ -21,6 +102,6 @@ def pipe(*operators: Callable[[Observable], Observable]) -> Callable[[Observable
The composed observable.
"""

def compose(source: Observable) -> Observable:
def compose(source: Any) -> Any:
return reduce(lambda obs, op: op(obs), operators, source)
return compose

0 comments on commit 89df178

Please sign in to comment.