Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix type hints for pipe #355

Closed
dbrattli opened this issue Apr 20, 2019 · 7 comments
Closed

Fix type hints for pipe #355

dbrattli opened this issue Apr 20, 2019 · 7 comments

Comments

@dbrattli
Copy link
Collaborator

dbrattli commented Apr 20, 2019

The current type hints for the pipe operator is not correct since it assumes that the final type will always be an Observable. Thus there's problems when piping to functions such as to_future that returns a Future. The current typing hints are:

def pipe(*operators: Callable[[Observable], Observable]) -> Callable[[Observable], Observable]:

The correct would be that the callables were Callable[[A], B] and Callable[[B], C] etc, but this is not possible when using the star operator as above. We would probably need to do something like using overloads and specifying the N first cases as with RxJS and typescript e.g:

/* tslint:disable:max-line-length */
  pipe(): Observable<T>;
  pipe<A>(op1: OperatorFunction<T, A>): Observable<A>;
  pipe<A, B>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>): Observable<B>;
  pipe<A, B, C>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>): Observable<C>;
  pipe<A, B, C, D>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>): Observable<D>;
  pipe<A, B, C, D, E>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>): Observable<E>;
  pipe<A, B, C, D, E, F>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>): Observable<F>;
  pipe<A, B, C, D, E, F, G>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>): Observable<G>;
  pipe<A, B, C, D, E, F, G, H>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>): Observable<H>;
  pipe<A, B, C, D, E, F, G, H, I>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>, op9: OperatorFunction<H, I>): Observable<I>;
  pipe<A, B, C, D, E, F, G, H, I>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>, op9: OperatorFunction<H, I>, ...operations: OperatorFunction<any, any>[]): Observable<{}>;

But I'm unsure how to do this properly in Python so that IDEs, mypy and linters acually can handle this so you get proper completion on the stuff that comes out of the pipe.

@jcafhe
Copy link
Collaborator

jcafhe commented Apr 20, 2019

I was thinking that when we chain operators, only the last one could have a different signature, exiting the Observable chain.

What about adding an extra optional keyword parameter when one wants to transform the result into something else (list, future, etc.). Thus it should be easy to guaranty that only this last operator would be different. However, that would break the current API.

def pipe(*operators: Callable[[Observable], Observable],  tail: Callable[[Observable], T] = None,
         ) -> Union[Callable[[Observable], Observable], Callable[[Observable], T]]:

Just below a 'working' prototype that seems to work in IDE & mypy.

from typing import Callable, TypeVar, Union
from functools import reduce

class Observable():
    def foo(self):
        pass

# mimicks an operator that return an Observable
def obs_obs(obs: Observable) -> Observable:
    return Observable()

# mimicks an operator that return a List
def obs_list(obs:Observable) -> list:
    return [0, 1, 2]


T = TypeVar('T')

def pipe(*operators: Callable[[Observable], Observable], tail: Callable[[Observable], T] = None,
         ) -> Union[Callable[[Observable], Observable], Callable[[Observable], T]]:

    def compose(source: Observable) -> Observable:
        return reduce(lambda obs, op: op(obs), operators, source)

    if tail:
        return lambda s: tail(compose(s))

    return compose

composition = pipe(obs_obs, obs_obs)
composition_to_list = pipe(obs_obs, obs_obs, tail=obs_list)

res_obs = composition(Observable())
res_list = composition_to_list(Observable())

EDIT:
I realize it would be more robust to just add an explicit function pipe_to(*operators, to=to_operator) with a non-optional to parameter.

@dbrattli
Copy link
Collaborator Author

Actually I like your first suggestion and I would like to avoid having to use a separate pipe_tomethod/function since that easily will confuse people. The only problem is that we cannot pipe to some T and then back into an Observable, but that should be acceptable imo. It also looks much better than having tons of overloads for a fully generic pipe like RxJS.

@jcafhe
Copy link
Collaborator

jcafhe commented Apr 21, 2019

The only problem is that we cannot pipe to some T and then back into an Observable, but that should be acceptable imo.

I didn't thought about that. However, the creation operators return an Observable, so I may be wrong but I believe it's not compatible with pipe anyway, as it expects a pipeable operator Callable[[Observable], Observable]?

EDIT: forget what I've said, it should be compatible with the reduce(lambda obs, op: op(obs), operators, source) thing.

So if we go this way, how should we name this keyword parameter? What about to?

@MainRo
Copy link
Collaborator

MainRo commented Apr 22, 2019

I also prefer @jcafhe solution rather than declaring many signatures. Alternatively, would it make sense to make these operators non pipable (since they do not follow the observable in -> observable out convention), and just expose them as regular functions ?

for example:

'''
foo = obs.pipe(
map(),
filter(),
)
a, b = partition(foo, lambda i: i>2)
'''
So the core implementation of these operators would be public, and the pipe operator would have a single signature.

@dbrattli
Copy link
Collaborator Author

A note to the impl. by @jcafhe is that you should probably not need the Union an T can also be an Observable or?

Here is my try on an overloaded version. Mypy seems to like the overloads, but pylint complains with Too many positional arguments for method call pylint(too-many-function-args) when using the pipe and only sees the first overload. pylint-dev/pylint#2239

@overload
def pipe() -> Callable[[A], A]:
    ...  # pylint: disable=pointless-statement
@overload
def pipe(op1: Callable[[A], B]) -> Callable[[A], B]:  # pylint: disable=function-redefined
    ...  # pylint: disable=pointless-statement
@overload
def pipe(op1: Callable[[A], B], op2: Callable[[B], C]) -> Callable[[A], C]:  # pylint: disable=function-redefined
    ...  # pylint: disable=pointless-statement
@overload
def pipe(op1: Callable[[A], B], op2: Callable[[B], C], op3: Callable[[C], D]) -> Callable[[A], D]:  # pylint: disable=function-redefined
    ... # pylint: disable=pointless-statement
@overload
def pipe(op1: Callable[[A], B], op2: Callable[[B], C], op3: Callable[[C], D], op4: Callable[[D], E]) -> Callable[[A], E]:  # pylint: disable=function-redefined
    ... # pylint: disable=pointless-statement
# pylint: disable=function-redefined
def pipe(*operators: Callable[[Any], Any]) -> Callable[[Any], Any]: # type: ignore
   ...

and for Observable:

    @overload
    def pipe(self) -> 'Observable':  # pylint: disable=no-self-use
        ... # pylint: disable=pointless-statement
    @overload
    def pipe(self, op1: Callable[['Observable'], A]) -> A: # pylint: disable=function-redefined, no-self-use
        ... # pylint: disable=pointless-statement
    @overload
    def pipe(self, op1: Callable[['Observable'], A], op2: Callable[[A], B]) -> B: # pylint: disable=function-redefined, no-self-use
        ... # pylint: disable=pointless-statement
    @overload
    def pipe(self, op1: Callable[['Observable'], A], op2: Callable[[A], B], op3: Callable[[B], C]) -> C: # pylint: disable=function-redefined, no-self-use
        ... # pylint: disable=pointless-statement
    @overload
    def pipe(self, op1: Callable[['Observable'], A], op2: Callable[[A], B], op3: Callable[[B], C], op4: Callable[[C], D]) -> D: # pylint: disable=function-redefined, no-self-use
        ... # pylint: disable=pointless-statement
     # pylint: disable=function-redefined
    def pipe(self, *operators: Callable[['Observable'], Any]) -> Any:  # type: ignore
        ...

@jcafhe
Copy link
Collaborator

jcafhe commented Apr 23, 2019

A note to the impl. by @jcafhe is that you should probably not need the Union an T can also be an Observable or?

Yes you are right. I'm a bit lost here, I was confused with the to_xxx operators which I believed return a type like a set/list/dict but in fact they return a Callable[[Observable], Observable] (except for to_future).

If it helps, here is a quick overview of operators in rx/operators with an 'exotic' return type:

name return type
multicast Callable[[Observable], Union[Observable, ConnectableObservable]]
partition Callable[[Observable], List[Observable]]
partition_indexed Callable[[Observable], List[Observable]]
ref_count Callable[[ConnectableObservable], Observable]
replay Callable[[Observable], Union[Observable, ConnectableObservable]]
to_future Callable[[Observable], Future]

I believe some operators are incorrectly typed too, but they should have a 'classical' rtype:

name declared rtype real rtype
ignore_elements Observable Callable[[Observable], Observable]
publish ConnectableObservable TBD relies on pipe & multicast operator
single_or_default_async Observable Callable[[Observable], Observable]
single_or_default Observable TBD relies on pipe & single_or_default_async
skip_last Observable Callable[[Observable], Observable]
zip_with_iterable - Callable[[Observable], Observable]

Alternatively, would it make sense to make these operators non pipable (since they do not follow the observable in -> observable out convention), and just expose them as regular functions ?

@MainRo that makes sense IMO.

@dbrattli dbrattli mentioned this issue May 5, 2019
9 tasks
@lock
Copy link

lock bot commented May 20, 2020

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@lock lock bot locked as resolved and limited conversation to collaborators May 20, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

3 participants