Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RxPY v3 Remaining Work #285

Closed
9 tasks done
dbrattli opened this issue Jan 21, 2019 · 26 comments
Closed
9 tasks done

RxPY v3 Remaining Work #285

dbrattli opened this issue Jan 21, 2019 · 26 comments
Milestone

Comments

@dbrattli
Copy link
Collaborator

dbrattli commented Jan 21, 2019

This issue is for tracking the remaining work for RxPY v3.

  • Fix type hints for pipe Fix type hints for pipe #355
  • Disposables. Have been removed for RxJS 6. I think we should keep disposables, but make them simpler and remove the static Disposable object with methods and only have the simpler creation functions like create, empty etc.
  • Move all observer files to core/observer.
  • Fix testing/marbles.py.
  • Lazy load schedulers. We have many schedulers that are not being used. Perhaps eventloop schedulers should be imported explicitly (full path) so they don't add to startup time
  • Remove (result) mappers from combine_latest and with_latest_from.
  • Add a starmap operator based on itertools for use with combine_latest et al.
  • Remove BlockingObservable and methods and provide a blocking run() method instead.
  • Remove all first arguments that accept Iterable[Observable] for all operators. Use *args instead.

Later, i.e v3.1

  • Merge internal and core to internal similar to RxJS.
  • Making Observables of type Generic[T] instead of Generic[Any]. The problem is what to do with vararg operators such as zip, combine_latest etc.
@jcafhe
Copy link
Collaborator

jcafhe commented Jan 21, 2019

About Disposables, I must say that I'm really confused and don't understand quite well what is the rxjs way.
I Thought that:

subscription = source.subscribe()
subscription.unsubscribe()

was the same as:

disposable = source.subscribe()
disposable.dispose()

With your proposal, would we still be able to 'dispose' in the sense of "hey, I don't need your data anymore"?

@dbrattli
Copy link
Collaborator Author

Yes, want to keep disposables even if they are just an object wrapping of a function that takes nothing and returns nothing Callable[[], None]. In a functional programming language they would just be a function. In OO they become an object instantiated from a class inheriting from a baseclass that implements in interface with a single method.

They are extremely useful for controlling the life-time of resources such as subscriptions.

But we should remove the static disposable with methods and replace with create functions e.g. rx.disposable.empty() instead of:

from rx.core import Disposable 
Disposable.empty()

Similar to what we have done for static create of Observables, e.g rx.empty() instead of Observable.empty().

@jcafhe
Copy link
Collaborator

jcafhe commented Jan 21, 2019

Thank you very much for your insight !

@jcafhe
Copy link
Collaborator

jcafhe commented Jan 22, 2019

Just to make a small review about the removing of result_mapper:

generate, generate_with_relative_time, zip, zip_with_iterable does no longer expose a result_mapper.

There are still operators with a result_mapper that could be removed (in addition to combine_latest and with_latest_from) but I need your opinions before doing anything.

for_in

I feel like if we remove result_mapper, nothing will differ for_in from concat. Maybe we should rename result_mapper to just mapper. We could even extend for_in with an optional predicate argument to mimic a python list comprehension:

[expression for item in items if predicate]

join/group_join

I see no problem to remove result_mapper and return a tuple for these operators.

By the way, I agree with your idea of a starmap operator 👍

@dbrattli
Copy link
Collaborator Author

For for_in it's not really a result mapper since the mapping is not done on any results. It's just a mapper for the builtin Iterable map i.e

    def for_in(values, mapper) -> Observable:
    """Concatenates the observable sequences obtained by running the
    specified result mapper for each element in source.

    Args: 
        values: A list of values to turn into an observable sequence.
        mapper: A function to apply to each item in the values
            list to turn it into an observable sequence.

    Returns:
        An observable sequence from the concatenated observable
        sequences.
    """ 
    return concat(map(mapper, values))

I have removed the impl. file since it's so simple to construct and just added it to the __init__.py file so it's still available.

@erikkemperman
Copy link
Collaborator

erikkemperman commented Jan 23, 2019

Lazy load schedulers. We have many schedulers that are not being used. Perhaps eventloop schedulers should be imported explicitly (full path) so they don't add to startup time

Makes sense. But also: I notice that all the top-level schedulers (not within mainloopscheduler) create a singleton instance. For things like ThreadPoolScheduler and (especially) the ProcessPoolScheduler I am trying to implement, such a potentially useless instantiation is expensive (probably much more so than simply parsing the module source).

What is the purpose of these? If it is only for the unit-tests, can I suggest to remove them?

@dbrattli
Copy link
Collaborator Author

Several operators needs a default scheduler and will use the the singleton schedulers if no scheduler is supplied (immediate_scheduler, timeout_scheduler, current_thread_scheduler), and they must in many cases (current_thread_scheduler) use the same instance for things to work correctly. It makes sense if you dive into things, but it's really a confusing mess inherited from Rx.NET and RxJS. I would love to clean it up and reduce the number of schedulers. The main schedulers are:

  • ImmediateScheduler, the scheduler version of a plain function call
  • CurrentThreadScheduler, a trampolined version that queues if the scheduler is already running
  • EventLoopScheduler, trampolined but on a designated thread.
  • NewThreadScheduler, schedules on a new thread, but schedules stuff from schedules stuff goes on the same thread using EventLoopScheduler.
  • ThreadPoolScheduler, wraps the NewThreadScheduler but uses the ThreadPoolExecutor to limit the number of concurrent threads.

If I remember correctly, only the immediate_scheduler, current_thread_scheduler, timeout_scheduler needs the singleton.

@erikkemperman
Copy link
Collaborator

All right, thanks for clearing that up!

@jcafhe
Copy link
Collaborator

jcafhe commented Jan 24, 2019

  • Making Observables of type Generic[T] instead of Generic[Any]. The problem is what to do with vararg operators such as zip, combine_latest etc. Perhaps look to RxJS and see how they have solved it?

I've just checked in rxjs but I'm not familiar with javascript. For merge operator, the signature is:

export function merge<T, R>(...observables: Array<ObservableInput<any> | SchedulerLike | number>): Observable<R>

So they have a specific ObservableInput of type <Any> instead of the returned Observable<R> . It is defined in internal/types:

export type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T>;

Don't know if it can help.

@dbrattli
Copy link
Collaborator Author

Latest RxPY v3 and pipelining works like a charm with Coconut.

skjermbilde 2019-01-26 kl 08 44 13

@jcafhe
Copy link
Collaborator

jcafhe commented Jan 26, 2019

I've digged into marbles to fix it but I think we need a complete rework.

What I would like is to make it compliant with ASCII diagrams as defined in rxjs (marble-testing). This could be usefull for easily adding new tests as well as generate docs (render picture as you suggested).

However, it would take me quite a lot of time but if it's ok, it could be great.

@MainRo
Copy link
Collaborator

MainRo commented Jan 26, 2019

We can use the ascii marble testing syntax for testing, but to generate diagram it lacks several features:

  • a way to describe higher order observables (btw this could be useful also for testing).
  • a way to show items passing through the operator, is this is shown on marble diagrams describing operators.

If this can be added on top of the existing syntax, then we would have a marble ascii syntax suitable for tests and doc.

@dbrattli
Copy link
Collaborator Author

I've btw fixed testing/marbles. It's not as advanced as the ASCII diagrams in RxJS, but they look slightly overengineered (imo). I like the hot and cold operators. I don't think we need to support higher order streams, and it's just a few operators generating higher orders like window and group_by. Instead we could tap the operators and inner streams for flat_map et al. and instead use a templating engine e.g jinja2 to setup the diagram the way we want.

@dbrattli
Copy link
Collaborator Author

Regarding the issue with Generic[T] for Observable, then I think we need to change merge, zip, combine_latest etc to only take "plain" Observable arguments i.e *args: Observable[Any] and not accept first argument as Iterable[Observable[Any]]. This is because we need to create overloads such as combine_latest(a: Observable[T1], b: Observable[T2]) -> Observable[Tuple[T1, t2]] and combine_latest(a: Observable[T1], b: Observable[T2], c: Observable[T3]) -> Observable[Tuple[T1, T2, T3]] etc. Having typed Iterables returning Observables of different types is not possible, so the Iterable would need to be Iterable[Observable[Any]]. My suggestion is that we do as RxJS and remove the support for having first argument to be Iterable[Observable].

@jcafhe
Copy link
Collaborator

jcafhe commented Jan 27, 2019

I agree, from an user perspective, I prefer to write:

rx.zip(o1, o2, o3)

than

rx.zip([o1, o2, o3])

For a list of observables, we just have to add a *, so it's not that much effort:

rx.zip(*observables)

@dbrattli
Copy link
Collaborator Author

Yes, I suggest we remove the support now, and add it later on demand with another name e.g zip_observables or something. That way we don't need another breaking change when/if we add it.

@MainRo
Copy link
Collaborator

MainRo commented Jan 28, 2019

While working on the examples in the documentation is saw a change I was not aware of: The subscription function provided in the create operator (or Observable) now takes two parameters: the observer and a scheduler.
How should a subscription implementation use this scheduler parameter ? The scheduler parameter is always provided, so my understanding is that the observer method should always be scheduled. However the scheduler can be None. Maybe things are not complete yet on that part ?

@dbrattli
Copy link
Collaborator Author

Yes, that is correct. This is the subscription default scheduler that you may use to set a scheduler once and for all for all operators in the chain. Check out the timeflies_tkinter.py that uses this feature. This means that when using RxPY with an UI library you don't need to set the scheduler on every operator.

For operators such as delay, it will choose the scheduler this way:

 _scheduler = scheduler or scheduler_ or timeout_scheduler

Where scheduler is the operator scheduler argument, scheduler_ is the scheduler given to subscribe (if any) and timeout_scheduler will be used if the other two are None.

For create it means that it may use the scheduler, or it may not. The scheduler may also be None if subscribe() is called without a scheduler. Create itself does not take a scheduler as an argument, but when creating custom logic it may have a scheduler (partially applied) from an outer scope (e.g fromiterable.py. Every operator is really using create(), but they currently short-cut by calling Observable directly with their subscribe function, Obserable(subscribe) is just the same as rx.create(subscribe).

If the subscribe function given to create() produces values (out of thin air) then it should probably schedule these values. If the values arrives from some other non-observable code, and create is used to wrap an observable around it to make it observable, then you should probably not schedule the values.

The confusing part is how this relates to subscribe_on(), that can be used to set the scheduler to where e.g create() run on. This is correct, but it's not possible to schedule values in the future that way, and it's not possible to schedule every value if needed. The goal of RxPY v2 was to remove operator scheduler arguments incl. subscribe_on, observe_on, but now we have both options.

@MainRo
Copy link
Collaborator

MainRo commented Jan 29, 2019

ok thanks for the clarification. I still find this parameter confusing: It has to be declared as a parameter of the subscribe function; but it's value is None unless a scheduler has been provided on subscription; and it is useful only when emitting items after the subscription call.
I like using subscribe_on and observe_on because it allows to control scheduling outside of the observable factory: The factory code can just call the observer, and the user of the factory can decide where to schedule emissions with subscribe_on (for items emitted during subscription) and observe_on (for items emitted later).
I suppose that depending on the structure of the code, one or the other solution is more convenient.

@dbrattli
Copy link
Collaborator Author

It's not more confusing than the scheduler argument to every operator that currently needs a scheduler. Those operators cannot perform their action using subscribe_on, or observe_on. It's also confusing when using e.g TkinterScheduler or IOLoopScheduler, that you need to spesify the scheduler to every operator that deals with time, and that using subscribe_on or observe_on does not help in any way, e.g for operators such dealing with time as delay(), timeout(), interval(), etc. Using subscribe_on does not control when to schedule things, only where. It just allows for the subscribe logic to run in the context of a given scheduler (where). The subscribe logic stil cannot schedule anything at a later point in time (when).

@jcafhe
Copy link
Collaborator

jcafhe commented Feb 7, 2019

Remove all first arguments that accept Iterable[Observable] for all operators. Use *args instead.

We may have a problem with operators concat and catch.

Other operators rely on the fact that the first argument as an Iterable can be an Iterator with possibly infinite length, or a length that could be unknown when calling concat or catch.

*args is a tuple and cannot be mutated, so the length is set when calling the operator function.

if repeat_count is None:
gen = infinite()
else:
gen = range(repeat_count)
return rx.defer(lambda _: rx.concat(source for _ in gen))
return repeat

The operators that expect this behaviour are listed below:

  • repeat depends on operators.concat
  • do_while depends on operators.concat
  • while_do depends on operators.concat
  • retry depends on rx.catch

For repeat, I've tried to re-implement (copy/paste/adapt) the iterator thing from concat to avoid a dependency to concat. It seems to work ca014b5 but I don't know if this is the right thing to do.

@dbrattli
Copy link
Collaborator Author

dbrattli commented Feb 7, 2019

The right thing to do here is to create new functions (operators) that takes Iterable[Obserable] as argument. The name should be e.g. concat_iterable or concat_with_iterable. The same can be done for merge etc. Check out how they did it in F# where overloading of functions is not possible. http://fsprojects.github.io/FSharp.Control.Reactive/reference/fsharp-control-reactive-observablemodule.html

@jcafhe
Copy link
Collaborator

jcafhe commented Feb 7, 2019

good idea 👍
I'm going to rework my PR #302 . Howerver I still have 2 questions:

  • is it ok to call the iterable version from the non iterable operator, e.g. something like:
def concat(*sources: Observable) -> Observable:
    return concat_with_iterable(sources)
  • Would you make the iterable version 'public', i.e. part of the rx / rx.operators namespaces ?

@jcafhe
Copy link
Collaborator

jcafhe commented Feb 13, 2019

I've started to merge core and internal to internal (mv-core-to-internal ) but it leads to multiple circular dependencies, e.g.:

(clarification: each item in the list import the next item, see stack trace below)

  • rx.internal.__init__.py
  • Observable -> rx.internal.observable.observable.py
  • rx.disposable.__init__.py
  • Disposable -> rx.disposable.disposable.py
  • rx.internal.noop -> Import error if noop is imported after rx.observable.Observable in rx.internal.__init__.py

The same thing happens with:

  • rx.internal.PriorityQueue and Observable->VirtualTimeScheduler->PriorityQueue
  • rx.internal.default_error and Observable->AnonymousObserver->default_error
  • ...

The first solution is to import 'isolated' modules (basic, priorityqueue, exceptions, ...) before other imports in rx.internal.__init__.py. But relying on imports order doesn't feel sane.

Another solution is to import full path (relative or absolute) everywhere, e.g. from .internal.basic import noop instead of from .internal import noop. This doesn't feel good either because a package should be allowed to import names declared in the __init__ of an other package.

What do you think ?

EDIT: my comment is not very clear, a trace may be more appropriate:

>>> import rx.internal
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/jehf/DEV/pyprojects/RxPY/rx/__init__.py", line 6, in <module>
    from .internal import Observable, abc, typing, pipe
  File "/home/jehf/DEV/pyprojects/RxPY/rx/internal/__init__.py", line 4, in <module>
    from .observable import Observable, ConnectableObservable
  File "/home/jehf/DEV/pyprojects/RxPY/rx/internal/observable/__init__.py", line 1, in <module>
    from .observable import Observable
  File "/home/jehf/DEV/pyprojects/RxPY/rx/internal/observable/observable.py", line 5, in <module>
    from rx.disposable import Disposable
  File "/home/jehf/DEV/pyprojects/RxPY/rx/disposable/__init__.py", line 3, in <module>
    from .disposable import Disposable
  File "/home/jehf/DEV/pyprojects/RxPY/rx/disposable/disposable.py", line 3, in <module>
    from rx.internal import typing, noop
ImportError: cannot import name 'noop' from 'rx.internal' (/home/jehf/DEV/pyprojects/RxPY/rx/internal/__init__.py)
>>> 

@dbrattli dbrattli added this to the v3.0 milestone Apr 20, 2019
@dbrattli
Copy link
Collaborator Author

Closing this as I don't see anything more that needs to be done for a v3 release from my side. There are some refactoring that can be done but nothing blocking a release IMO. At least we should postpone any further refactoring until a v3.1 release.

dbrattli added a commit that referenced this issue May 11, 2019
* Fix typing for pipe using overloads.

* Fix type typo in hint overload

* Move pylint disables to the right line for the multi-line overloads

* Add an Observable specific overload first with docstring to make pylint happy.

* Remove ellpises from coverage
@lock
Copy link

lock bot commented May 20, 2020

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@lock lock bot locked as resolved and limited conversation to collaborators May 20, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants