Skip to content

Commit

Permalink
Fix typing for pipe using overloads.
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed May 10, 2019
1 parent 93f94ca commit c649237
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 11 deletions.
79 changes: 72 additions & 7 deletions rx/core/observable/observable.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
# By design, pylint: disable=C0302
import threading
from typing import Any, Callable, Optional, Union, cast
from typing import Any, Callable, Optional, Union, TypeVar, cast, overload

from rx.disposable import Disposable
from rx.concurrency import current_thread_scheduler

from ..observer import AutoDetachObserver
from .. import typing, abc

A = TypeVar('A')
B = TypeVar('B')
C = TypeVar('C')
D = TypeVar('D')
E = TypeVar('E')
F = TypeVar('F')
G = TypeVar('G')


class Observable(typing.Observable):
"""Observable base class.
Expand Down Expand Up @@ -145,7 +153,6 @@ def subscribe(self, # pylint: disable=too-many-arguments,arguments-differ

return self.subscribe_(on_next, on_error, on_completed, scheduler)


def subscribe_(self,
on_next: typing.OnNext = None,
on_error: typing.OnError = None,
Expand Down Expand Up @@ -208,8 +215,67 @@ def set_disposable(_: abc.Scheduler = None, __: Any = None):
# Hide the identity of the auto detach observer
return Disposable(auto_detach_observer.dispose)


def pipe(self, *operators: Callable[['Observable'], 'Observable']) -> 'Observable':
@overload
def pipe(self) -> 'Observable': # pylint: disable=no-self-use
... # pylint: disable=pointless-statement

@overload
def pipe(self, op1: Callable[['Observable'], A]) -> A: # pylint: disable=function-redefined, no-self-use
... # pylint: disable=pointless-statement

@overload
def pipe(self,
op1: Callable[['Observable'], A],
op2: Callable[[A], B]) -> B: # pylint: disable=function-redefined, no-self-use
... # pylint: disable=pointless-statement

@overload
def pipe(self,
op1: Callable[['Observable'], A],
op2: Callable[[A], B],
op3: Callable[[B], C]) -> C: # pylint: disable=function-redefined, no-self-use
... # pylint: disable=pointless-statement

@overload
def pipe(self,
op1: Callable[['Observable'], A],
op2: Callable[[A], B],
op3: Callable[[B], C],
op4: Callable[[C], D]) -> D: # pylint: disable=function-redefined, no-self-use
... # pylint: disable=pointless-statement

@overload
def pipe(self,
op1: Callable[['Observable'], A],
op2: Callable[[A], B],
op3: Callable[[B], C],
op4: Callable[[C], D],
op5: Callable[[D], E]) -> E: # pylint: disable=function-redefined, no-self-use
... # pylint: disable=pointless-statement

@overload
def pipe(self,
op1: Callable[['Observable'], A],
op2: Callable[[A], B],
op3: Callable[[B], C],
op4: Callable[[C], D],
op5: Callable[[D], E],
op6: Callable[[E], F]) -> F: # pylint: disable=function-redefined, no-self-use
... # pylint: disable=pointless-statement

@overload
def pipe(self,
op1: Callable[['Observable'], A],
op2: Callable[[A], B],
op3: Callable[[B], C],
op4: Callable[[C], D],
op5: Callable[[D], E],
op6: Callable[[E], F],
op7: Callable[[E], G]) -> G: # pylint: disable=function-redefined, no-self-use
... # pylint: disable=pointless-statement

# pylint: disable=function-redefined
def pipe(self, *operators: Callable[['Observable'], Any]) -> Any: # type: ignore
"""Compose multiple operators left to right.
Composes zero or more operators into a functional composition.
Expand All @@ -227,21 +293,20 @@ def pipe(self, *operators: Callable[['Observable'], 'Observable']) -> 'Observabl
from ..pipe import pipe
return pipe(*operators)(self)


def run(self) -> Any:
"""Run source synchronously.
Subscribes to the observable source. Then blocks and waits for the
observable source to either complete or error. Returns the
last value emitted, or thows exception if any error occured.
last value emitted, or throws exception if any error occurred.
Examples:
>>> result = run(source)
Raises:
SequenceContainsNoElementsError: if observable completes
(on_completed) without any values being emitted.
Exception: raises exception if any error (on_error) occured.
Exception: raises exception if any error (on_error) occurred.
Returns:
The last element emitted from the observable.
Expand Down
69 changes: 65 additions & 4 deletions rx/core/pipe.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,70 @@
from typing import Callable
from typing import Callable, Any, TypeVar, overload
from functools import reduce
from .observable import Observable

A = TypeVar('A')
B = TypeVar('B')
C = TypeVar('C')
D = TypeVar('D')
E = TypeVar('E')
F = TypeVar('F')
G = TypeVar('G')

def pipe(*operators: Callable[[Observable], Observable]) -> Callable[[Observable], Observable]:

@overload
def pipe() -> Callable[[A], A]:
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B]) -> Callable[[A], B]: # pylint: disable=function-redefined
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B], op2: Callable[[B], C]) -> Callable[[A], C]: # pylint: disable=function-redefined
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B],
op2: Callable[[B], C],
op3: Callable[[C], D]
) -> Callable[[A], D]: # pylint: disable=function-redefined
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B],
op2: Callable[[B], C],
op3: Callable[[C], D],
op4: Callable[[D], E]
) -> Callable[[A], E]: # pylint: disable=function-redefined
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B],
op2: Callable[[B], C],
op3: Callable[[C], D],
op4: Callable[[D], E],
op5: Callable[[E], F]
) -> Callable[[A], F]: # pylint: disable=function-redefined
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B],
op2: Callable[[B], C],
op3: Callable[[C], D],
op4: Callable[[D], E],
op5: Callable[[E], F],
op6: Callable[[F], G]
) -> Callable[[A], G]: # pylint: disable=function-redefined
... # pylint: disable=pointless-statement


# pylint: disable=function-redefined
def pipe(*operators: Callable[[Any], Any]) -> Callable[[Any], Any]: # type: ignore
"""Compose multiple operators left to right.
Composes zero or more operators into a functional composition. The
Expand All @@ -21,6 +82,6 @@ def pipe(*operators: Callable[[Observable], Observable]) -> Callable[[Observable
The composed observable.
"""

def compose(source: Observable) -> Observable:
def compose(source: Any) -> Any:
return reduce(lambda obs, op: op(obs), operators, source)
return compose

0 comments on commit c649237

Please sign in to comment.