Skip to content

Commit

Permalink
Merge b9fb578 into 93f94ca
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed May 11, 2019
2 parents 93f94ca + b9fb578 commit 188e74e
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 10 deletions.
95 changes: 89 additions & 6 deletions rx/core/observable/observable.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
# By design, pylint: disable=C0302
import threading
from typing import Any, Callable, Optional, Union, cast
from typing import Any, Callable, Optional, Union, TypeVar, cast, overload

from rx.disposable import Disposable
from rx.concurrency import current_thread_scheduler

from ..observer import AutoDetachObserver
from .. import typing, abc

A = TypeVar('A')
B = TypeVar('B')
C = TypeVar('C')
D = TypeVar('D')
E = TypeVar('E')
F = TypeVar('F')
G = TypeVar('G')


class Observable(typing.Observable):
"""Observable base class.
Expand Down Expand Up @@ -145,7 +153,6 @@ def subscribe(self, # pylint: disable=too-many-arguments,arguments-differ

return self.subscribe_(on_next, on_error, on_completed, scheduler)


def subscribe_(self,
on_next: typing.OnNext = None,
on_error: typing.OnError = None,
Expand Down Expand Up @@ -208,8 +215,85 @@ def set_disposable(_: abc.Scheduler = None, __: Any = None):
# Hide the identity of the auto detach observer
return Disposable(auto_detach_observer.dispose)

@overload
def pipe(self, *operators: Callable[['Observable'], 'Observable']) -> 'Observable': # pylint: disable=no-self-use
"""Compose multiple operators left to right.
Composes zero or more operators into a functional composition.
The operators are composed to right. A composition of zero
operators gives back the original source.
source.pipe() == source
source.pipe(f) == f(source)
source.pipe(g, f) == f(g(source))
source.pipe(h, g, f) == f(g(h(source)))
...
Returns the composed observable.
"""
...

def pipe(self, *operators: Callable[['Observable'], 'Observable']) -> 'Observable':
@overload
def pipe(self) -> 'Observable': # pylint: disable=function-redefined, no-self-use
... # pylint: disable=pointless-statement

@overload
def pipe(self, op1: Callable[['Observable'], A]) -> A: # pylint: disable=function-redefined, no-self-use
... # pylint: disable=pointless-statement

@overload
def pipe(self, # pylint: disable=function-redefined, no-self-use
op1: Callable[['Observable'], A],
op2: Callable[[A], B]) -> B:
... # pylint: disable=pointless-statement

@overload
def pipe(self, # pylint: disable=function-redefined, no-self-use
op1: Callable[['Observable'], A],
op2: Callable[[A], B],
op3: Callable[[B], C]) -> C:
... # pylint: disable=pointless-statement

@overload
def pipe(self, # pylint: disable=function-redefined, no-self-use
op1: Callable[['Observable'], A],
op2: Callable[[A], B],
op3: Callable[[B], C],
op4: Callable[[C], D]) -> D:
... # pylint: disable=pointless-statement

@overload
def pipe(self, # pylint: disable=function-redefined, no-self-use, too-many-arguments
op1: Callable[['Observable'], A],
op2: Callable[[A], B],
op3: Callable[[B], C],
op4: Callable[[C], D],
op5: Callable[[D], E]) -> E:
... # pylint: disable=pointless-statement

@overload
def pipe(self, # pylint: disable=function-redefined, no-self-use, too-many-arguments
op1: Callable[['Observable'], A],
op2: Callable[[A], B],
op3: Callable[[B], C],
op4: Callable[[C], D],
op5: Callable[[D], E],
op6: Callable[[E], F]) -> F:
... # pylint: disable=pointless-statement

@overload
def pipe(self, # pylint: disable=function-redefined, no-self-use, too-many-arguments
op1: Callable[['Observable'], A],
op2: Callable[[A], B],
op3: Callable[[B], C],
op4: Callable[[C], D],
op5: Callable[[D], E],
op6: Callable[[E], F],
op7: Callable[[F], G]) -> G:
... # pylint: disable=pointless-statement

# pylint: disable=function-redefined
def pipe(self, *operators: Callable[['Observable'], Any]) -> Any:
"""Compose multiple operators left to right.
Composes zero or more operators into a functional composition.
Expand All @@ -227,21 +311,20 @@ def pipe(self, *operators: Callable[['Observable'], 'Observable']) -> 'Observabl
from ..pipe import pipe
return pipe(*operators)(self)


def run(self) -> Any:
"""Run source synchronously.
Subscribes to the observable source. Then blocks and waits for the
observable source to either complete or error. Returns the
last value emitted, or thows exception if any error occured.
last value emitted, or throws exception if any error occurred.
Examples:
>>> result = run(source)
Raises:
SequenceContainsNoElementsError: if observable completes
(on_completed) without any values being emitted.
Exception: raises exception if any error (on_error) occured.
Exception: raises exception if any error (on_error) occurred.
Returns:
The last element emitted from the observable.
Expand Down
89 changes: 85 additions & 4 deletions rx/core/pipe.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,90 @@
from typing import Callable
from typing import Callable, Any, TypeVar, overload
from functools import reduce
from .observable import Observable

A = TypeVar('A')
B = TypeVar('B')
C = TypeVar('C')
D = TypeVar('D')
E = TypeVar('E')
F = TypeVar('F')
G = TypeVar('G')

def pipe(*operators: Callable[[Observable], Observable]) -> Callable[[Observable], Observable]:

@overload
def pipe(*operators: Callable[['Observable'], 'Observable']) -> Callable[['Observable'], 'Observable']: # type: ignore
"""Compose multiple operators left to right.
Composes zero or more operators into a functional composition. The
operators are composed to left to right. A composition of zero
operators gives back the source.
Examples:
>>> pipe()(source) == source
>>> pipe(f)(source) == f(source)
>>> pipe(f, g)(source) == g(f(source))
>>> pipe(f, g, h)(source) == h(g(f(source)))
...
Returns:
The composed observable.
"""
...

@overload
def pipe() -> Callable[[A], A]: # pylint: disable=function-redefined
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B]) -> Callable[[A], B]: # pylint: disable=function-redefined
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B], op2: Callable[[B], C]) -> Callable[[A], C]: # pylint: disable=function-redefined
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B], # pylint: disable=function-redefined
op2: Callable[[B], C],
op3: Callable[[C], D]
) -> Callable[[A], D]:
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B], # pylint: disable=function-redefined
op2: Callable[[B], C],
op3: Callable[[C], D],
op4: Callable[[D], E]
) -> Callable[[A], E]:
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B], # pylint: disable=function-redefined
op2: Callable[[B], C],
op3: Callable[[C], D],
op4: Callable[[D], E],
op5: Callable[[E], F]
) -> Callable[[A], F]:
... # pylint: disable=pointless-statement


@overload
def pipe(op1: Callable[[A], B], # pylint: disable=function-redefined,too-many-arguments
op2: Callable[[B], C],
op3: Callable[[C], D],
op4: Callable[[D], E],
op5: Callable[[E], F],
op6: Callable[[F], G]
) -> Callable[[A], G]:
... # pylint: disable=pointless-statement


# pylint: disable=function-redefined
def pipe(*operators: Callable[[Any], Any]) -> Callable[[Any], Any]:
"""Compose multiple operators left to right.
Composes zero or more operators into a functional composition. The
Expand All @@ -21,6 +102,6 @@ def pipe(*operators: Callable[[Observable], Observable]) -> Callable[[Observable
The composed observable.
"""

def compose(source: Observable) -> Observable:
def compose(source: Any) -> Any:
return reduce(lambda obs, op: op(obs), operators, source)
return compose

0 comments on commit 188e74e

Please sign in to comment.