Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify zip operator behavior. #1666

Closed
GarthSnyder opened this issue Jun 3, 2018 · 6 comments
Closed

Simplify zip operator behavior. #1666

GarthSnyder opened this issue Jun 3, 2018 · 6 comments
Labels

Comments

@GarthSnyder
Copy link
Contributor

@GarthSnyder GarthSnyder commented Jun 3, 2018

Consider the following code:

let strings = Observable.of("one", "two")
let gate = BehaviorSubject(value: true)

Observable.zip(strings, gate) { string, _ in string }
    .subscribe(onNext: { print($0) }, onCompleted: { print("Done!") })

// Output: one
gate.onNext(true)   // Output: two
gate.onNext(true)   // Output: Done!

My expectation was that the zip Observable would complete as soon as any one of its constituent sequences completed. However, it is in fact waiting for the gate sequence to produce an extraneous third value before terminating the zip sequence. If gate didn't produce an extra value, the zip wouldn't terminate at all.

The ReactiveX blurb for zip doesn't actually claim that the Observable created by zip will terminate, so perhaps the bug is in my expectations. (The blurb just says that zip "will only emit as many items as the number of items emitted by the source Observable that emits the fewest items." Fair enough, it's doing that. :-)

The following example seems even more directly strange:

let strings = Observable.of("one", "two")
let gate1 = BehaviorSubject(value: true)
let gate2 = BehaviorSubject(value: true)

Observable.zip(strings, gate1, gate2) { string, _, _ in string }
    .subscribe(onNext: { print($0) }, onCompleted: { print("Done!") })

// Output: one
gate1.onNext(true)
gate2.onNext(true)  // Output: two

gate1.onNext(true)
gate2.onNext(true)

gate1.onNext(true)
gate2.onNext(true)  // <crickets>...

No matter how many tokens you stuff in the gate1 and gate2 channels, the zip will never complete.

RxSwift/RxCocoa/RxBlocking/RxTest version/commit

Commit ba41245 of April 4, 2018

Platform/Environment

iOS and macOS, at least

How easy is to reproduce? (chances of successful reproduce after running the self contained code)

  • easy, 100% repro

Xcode version:

9.4

Installation method:

  • Carthage

But also happens if you just clone the RxSwift project and paste the code into one of the playgrounds.

I have multiple versions of Xcode installed:

Yes; Xcode 8.3.3 and Xcode 9.4. But I hardly ever touch 8.3.3

Level of RxSwift knowledge:

  • just starting
@kzaher kzaher added the rxswift 5.0 label Jun 7, 2018
@kzaher

This comment has been minimized.

Copy link
Member

@kzaher kzaher commented Jun 7, 2018

Hi @GarthSnyder ,

The reason why zip behaves this way is because we've been incorporating some behaviors from Rx.NET.

Unfortunately, both the current behavior, and the one you are suggesting can be improved.

We actually shouldn't complete zip if all of the sequences haven't completed. The reason why we shouldn't complete it is because even though sequence couldn't theoretically generate an element in some cases, it could always generate error.

Also, current behavior assumes that observable sequences used as a source for zip don't produce any side effects and that the only reason why somebody is applying zip operator is to zip the result values, which isn't the general case.

We'll remove any types of "playing smart" code from zip and combineLatest in RxSwift 5.0. There shouldn't be any more confusions then.

@kzaher kzaher changed the title Zip doesn't complete in a timely manner (or sometimes, ever) Simplify zip operator behavior. Jun 7, 2018
@GarthSnyder

This comment has been minimized.

Copy link
Contributor Author

@GarthSnyder GarthSnyder commented Jun 7, 2018

Ah, very interesting! Thanks for taking the time to write up this detailed explanation.

In my (current) use case, the extra things being zipped are just tokens (hence the name "gate" above). So, for example, if there are 20 tokens available, only 20 items can be in a particular stage of the pipeline at once. Items consume a token when they enter the critical section (by being zipped with the token stream) and put a token back in the token stream when they leave the critical section (by calling the token-stream Subject's onNext.)

It seems to work great except for the fact that the main pipeline can never complete. Is there a more idiomatic RxSwift way to achieve this effect?

It looks like the request() feature of RxJava (aka "reactive pull backpressure") addresses pretty much this exact need, but it's not available in RxSwift. Would you be interested in merging a PR for that feature if I were to work on it, or is there some underlying reason why it's not a good fit for RxSwift?

(Details that probably aren't all that relevant/important: the application gathers XML feeds for podcasts from the web. Search terms from a static list go in the head of the pipeline, where they are transformed into search results that point to individual feeds. Then later stages of the pipeline read and parse those feeds, emitting a stream of parsed XML documents.

Even though this is a totally static workload, it actually works very well as an RxSwift pipeline because RxSwift allows easy retrying of individual network requests with various parameters (e.g., HTTP user agent) and handles database deadlocks elegantly (just abort the current transaction, back up, and retry parsing). But the pipeline can't be allowed to kick off 200 URLSessionDataTasks at once. :-)

@kzaher

This comment has been minimized.

Copy link
Member

@kzaher kzaher commented Jun 7, 2018

Hi @GarthSnyder ,

Backpressure adds tons of complexity, unpredictability and would slown down the implementation considerably IMHO.

I believe that the primary motivation for backpressure were Netflix's streaming use cases. That's the only information I have. In that context it makes sense, and it seems to me that it's even optimized for that use case. That also means that it's not optimized for bunch of other use cases which are far more common.

You could probably use backpressure to solve some of your problems, yes, but the possibility of doing that doesn't necessarily prove that you should.

I believe that latest RxJava separates Flowable from Observable, and I think this is an awesome decision.

I don't believe that adding Flowable to this repo is an efficient investment of time and it has far more downsides then upsides, so I would really want to stay away from Flowable in the foreseeable future.

Sorry

@GarthSnyder

This comment has been minimized.

Copy link
Contributor Author

@GarthSnyder GarthSnyder commented Jun 7, 2018

No worries. Thanks for all your work on RXSwift!

@kzaher kzaher closed this Jun 17, 2018
@c0diq

This comment has been minimized.

Copy link

@c0diq c0diq commented May 30, 2019

If one wanted to prevent zip from completing if one observable completed, it would have been easier to concat(never()) the source expected to complete early. It feels that this change is going to introduce serious incompatibilities when zip was expected to complete as soon as one source completed.

How do you recommend one replicates the old behavior now?

@kzaher

This comment has been minimized.

Copy link
Member

@kzaher kzaher commented May 31, 2019

If one wanted to prevent zip from completing if one observable completed, it would have been easier to concat(never()) the source expected to complete early.

No, the old behavior was wrong.

How do you recommend one replicates the old behavior now?

Writing your own implementation or doing .take(x) on sources.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants
You can’t perform that action at this time.