Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zip operator should complete if a single upstream source completes #525

Closed
MichaelSchneeberger opened this issue Jun 26, 2020 · 2 comments · Fixed by #537
Closed

zip operator should complete if a single upstream source completes #525

MichaelSchneeberger opened this issue Jun 26, 2020 · 2 comments · Fixed by #537

Comments

@MichaelSchneeberger
Copy link

The following observable y ...

import rx
from rx import operators as rxop

y = rx.range(2).pipe(
    rxop.zip(rx.defer(lambda _: y)),
    rxop.merge(rx.just(0)),
    rxop.share(),
)

y.subscribe(print, on_completed=lambda: print('completed'))

... never completes.

My suggestion: The zip operator should complete if any upstream source completed and the corresponding queue is empty.

That is how it is implemented in RxJava.

@MainRo
Copy link
Collaborator

MainRo commented Jul 5, 2020

I agree that zip should complete in such cases, because no item will be emitted after one source completes. Some applications may use the current behavior to wait for all streams to complete, but probably most people expect it to complete immediately.

@hoc081098
Copy link
Contributor

Hi @MichaelSchneeberger, I created a PR #537

MainRo pushed a commit that referenced this issue Sep 23, 2020
MainRo added a commit to MainRo/RxPY that referenced this issue Jan 22, 2022
The original fix for ReactiveX#525 breaks sequences where some observables emit
item faster than other ones. By completing too soon, the remaining
observables cannot catchup later. A very simple case is in ReactiveX#578 where
the two observables to zip emit their items sequentially.

We can fix both issues by completing whenever an observable completes
and there is no queued item. Otherwise we let the remaining observables
a chance to emit new items before completion.
MainRo added a commit that referenced this issue Feb 6, 2022
The original fix for #525 breaks sequences where some observables emit
items faster than other ones. By completing too soon, the remaining
observables cannot catch up later. A very simple case is in #578 where
the two observables to zip emit their items sequentially.

We can fix both issues by completing whenever an observable completes
and there is no queued item. Otherwise, we let the remaining observables
a chance to emit new items before completion.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants