diff --git a/.coveragerc b/.coveragerc index 0fa2dc08a..3bf7c9ea5 100644 --- a/.coveragerc +++ b/.coveragerc @@ -7,5 +7,6 @@ exclude_lines = pragma: no cover return NotImplemented raise NotImplementedError + \.\.\. [xml] output = coverage.xml diff --git a/rx/core/observable/observable.py b/rx/core/observable/observable.py index b8fb7dfd9..97e4e12b5 100644 --- a/rx/core/observable/observable.py +++ b/rx/core/observable/observable.py @@ -1,6 +1,6 @@ # By design, pylint: disable=C0302 import threading -from typing import Any, Callable, Optional, Union, cast +from typing import Any, Callable, Optional, Union, TypeVar, cast, overload from rx.disposable import Disposable from rx.concurrency import current_thread_scheduler @@ -8,6 +8,14 @@ from ..observer import AutoDetachObserver from .. import typing, abc +A = TypeVar('A') +B = TypeVar('B') +C = TypeVar('C') +D = TypeVar('D') +E = TypeVar('E') +F = TypeVar('F') +G = TypeVar('G') + class Observable(typing.Observable): """Observable base class. @@ -207,7 +215,85 @@ def set_disposable(_: abc.Scheduler = None, __: Any = None): # Hide the identity of the auto detach observer return Disposable(auto_detach_observer.dispose) - def pipe(self, *operators: Callable[['Observable'], 'Observable']) -> 'Observable': + @overload + def pipe(self, *operators: Callable[['Observable'], 'Observable']) -> 'Observable': # pylint: disable=no-self-use + """Compose multiple operators left to right. + + Composes zero or more operators into a functional composition. + The operators are composed to right. A composition of zero + operators gives back the original source. + + source.pipe() == source + source.pipe(f) == f(source) + source.pipe(g, f) == f(g(source)) + source.pipe(h, g, f) == f(g(h(source))) + ... + + Returns the composed observable. + """ + ... + + @overload + def pipe(self) -> 'Observable': # pylint: disable=function-redefined, no-self-use + ... # pylint: disable=pointless-statement + + @overload + def pipe(self, op1: Callable[['Observable'], A]) -> A: # pylint: disable=function-redefined, no-self-use + ... # pylint: disable=pointless-statement + + @overload + def pipe(self, # pylint: disable=function-redefined, no-self-use + op1: Callable[['Observable'], A], + op2: Callable[[A], B]) -> B: + ... # pylint: disable=pointless-statement + + @overload + def pipe(self, # pylint: disable=function-redefined, no-self-use + op1: Callable[['Observable'], A], + op2: Callable[[A], B], + op3: Callable[[B], C]) -> C: + ... # pylint: disable=pointless-statement + + @overload + def pipe(self, # pylint: disable=function-redefined, no-self-use + op1: Callable[['Observable'], A], + op2: Callable[[A], B], + op3: Callable[[B], C], + op4: Callable[[C], D]) -> D: + ... # pylint: disable=pointless-statement + + @overload + def pipe(self, # pylint: disable=function-redefined, no-self-use, too-many-arguments + op1: Callable[['Observable'], A], + op2: Callable[[A], B], + op3: Callable[[B], C], + op4: Callable[[C], D], + op5: Callable[[D], E]) -> E: + ... # pylint: disable=pointless-statement + + @overload + def pipe(self, # pylint: disable=function-redefined, no-self-use, too-many-arguments + op1: Callable[['Observable'], A], + op2: Callable[[A], B], + op3: Callable[[B], C], + op4: Callable[[C], D], + op5: Callable[[D], E], + op6: Callable[[E], F]) -> F: + ... # pylint: disable=pointless-statement + + @overload + def pipe(self, # pylint: disable=function-redefined, no-self-use, too-many-arguments + op1: Callable[['Observable'], A], + op2: Callable[[A], B], + op3: Callable[[B], C], + op4: Callable[[C], D], + op5: Callable[[D], E], + op6: Callable[[E], F], + op7: Callable[[F], G]) -> G: + ... # pylint: disable=pointless-statement + + # pylint: disable=function-redefined + def pipe(self, *operators: Callable[['Observable'], Any]) -> Any: """Compose multiple operators left to right. Composes zero or more operators into a functional composition. @@ -230,7 +316,7 @@ def run(self) -> Any: Subscribes to the observable source. Then blocks and waits for the observable source to either complete or error. Returns the - last value emitted, or thows exception if any error occured. + last value emitted, or throws exception if any error occurred. Examples: >>> result = run(source) @@ -238,7 +324,7 @@ def run(self) -> Any: Raises: SequenceContainsNoElementsError: if observable completes (on_completed) without any values being emitted. - Exception: raises exception if any error (on_error) occured. + Exception: raises exception if any error (on_error) occurred. Returns: The last element emitted from the observable. diff --git a/rx/core/pipe.py b/rx/core/pipe.py index 0489cfc3a..ed4ec5004 100644 --- a/rx/core/pipe.py +++ b/rx/core/pipe.py @@ -1,9 +1,90 @@ -from typing import Callable +from typing import Callable, Any, TypeVar, overload from functools import reduce -from .observable import Observable +A = TypeVar('A') +B = TypeVar('B') +C = TypeVar('C') +D = TypeVar('D') +E = TypeVar('E') +F = TypeVar('F') +G = TypeVar('G') -def pipe(*operators: Callable[[Observable], Observable]) -> Callable[[Observable], Observable]: + +@overload +def pipe(*operators: Callable[['Observable'], 'Observable']) -> Callable[['Observable'], 'Observable']: # type: ignore + """Compose multiple operators left to right. + + Composes zero or more operators into a functional composition. The + operators are composed to left to right. A composition of zero + operators gives back the source. + + Examples: + >>> pipe()(source) == source + >>> pipe(f)(source) == f(source) + >>> pipe(f, g)(source) == g(f(source)) + >>> pipe(f, g, h)(source) == h(g(f(source))) + ... + + Returns: + The composed observable. + """ + ... + +@overload +def pipe() -> Callable[[A], A]: # pylint: disable=function-redefined + ... # pylint: disable=pointless-statement + + +@overload +def pipe(op1: Callable[[A], B]) -> Callable[[A], B]: # pylint: disable=function-redefined + ... # pylint: disable=pointless-statement + + +@overload +def pipe(op1: Callable[[A], B], op2: Callable[[B], C]) -> Callable[[A], C]: # pylint: disable=function-redefined + ... # pylint: disable=pointless-statement + + +@overload +def pipe(op1: Callable[[A], B], # pylint: disable=function-redefined + op2: Callable[[B], C], + op3: Callable[[C], D] + ) -> Callable[[A], D]: + ... # pylint: disable=pointless-statement + + +@overload +def pipe(op1: Callable[[A], B], # pylint: disable=function-redefined + op2: Callable[[B], C], + op3: Callable[[C], D], + op4: Callable[[D], E] + ) -> Callable[[A], E]: + ... # pylint: disable=pointless-statement + + +@overload +def pipe(op1: Callable[[A], B], # pylint: disable=function-redefined + op2: Callable[[B], C], + op3: Callable[[C], D], + op4: Callable[[D], E], + op5: Callable[[E], F] + ) -> Callable[[A], F]: + ... # pylint: disable=pointless-statement + + +@overload +def pipe(op1: Callable[[A], B], # pylint: disable=function-redefined,too-many-arguments + op2: Callable[[B], C], + op3: Callable[[C], D], + op4: Callable[[D], E], + op5: Callable[[E], F], + op6: Callable[[F], G] + ) -> Callable[[A], G]: + ... # pylint: disable=pointless-statement + + +# pylint: disable=function-redefined +def pipe(*operators: Callable[[Any], Any]) -> Callable[[Any], Any]: """Compose multiple operators left to right. Composes zero or more operators into a functional composition. The @@ -21,6 +102,6 @@ def pipe(*operators: Callable[[Observable], Observable]) -> Callable[[Observable The composed observable. """ - def compose(source: Observable) -> Observable: + def compose(source: Any) -> Any: return reduce(lambda obs, op: op(obs), operators, source) return compose