Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concat (hot) error handling guarantees #3090

Closed
artem-zinnatullin opened this issue Jul 20, 2015 · 19 comments
Closed

Concat (hot) error handling guarantees #3090

artem-zinnatullin opened this issue Jul 20, 2015 · 19 comments

Comments

@artem-zinnatullin
Copy link
Contributor

According to the documentation of the Concat operator http://reactivex.io/documentation/operators/concat.html error, happened on one of the source observables may (keyword here) "jump the queue" and trigger onError of the Concat Observable.

Note that if the source Observables are “hot” and are therefore sending their emissions and notifications to the Concat concurrently, an onError notification from any of the source Observables may “jump the queue” and trigger an onError notification from the Observable that results from the Concat operation — even if the source Observable that was the source of the error notification has not yet begun to be concatenated to that resulting Observable.

Marble diagram

But this simple test fails (same as marble diagram):

@Test
public void errorOnOneOfTheSourceObservablesShouldStopConcat() {
  PublishSubject<String> o1 = PublishSubject.create();
  PublishSubject<String> o2 = PublishSubject.create();

  TestSubscriber<String> testSubscriber = new TestSubscriber<>();

  Observable
    .concat(o1, o2)
    .subscribe(testSubscriber);

  o1.onNext("red");
  o2.onNext("yellow");
  o1.onNext("green");
  o2.onError(new IllegalStateException("o2 error"));
  o1.onNext("blue");
  o1.onCompleted();

  // In fact we have:
  testSubscriber.assertValues("red", "green", "blue");
  testSubscriber.assertError(IllegalStateException.class);

  // But according to marble diagram http://reactivex.io/documentation/operators/concat.html
  // Error on one of the observables should stop concat
  testSubscriber.assertValues("red", "green"); // We should not see "blue" here.
  testSubscriber.assertError(IllegalStateException.class);
}

Is it expected behavior or there are no guarantees that error will "jump the queue" and trigger onError on the Concat Observable? We just faced it in production (not critical problem, but we expected behavior according to the documentation).

P.S. Checked on RxJava 1.0.12 and 1.0.13-SNAPSHOT (today), also tried to emit values from different threads — no difference. I can submit this test as PR if you want.

@akarnokd
Copy link
Member

The documentation seems to be a bit confusing. Concat only subscribes to one source at a time so if you have a second hot observable, concat won't see any of it until the previous source completed. If the second observable is a PublishSubject that is in error state, you'll receive that error immediately, but it won't jump the queue. In the marble diagram, you'd see red, green, blue, yellow, error if the second source was cold and red, green, blue, error if it was a PublishSubject.

@artem-zinnatullin
Copy link
Contributor Author

That's good, because I was curios about "jump the queue" impl after playing with RxJava sources and debugging.

So, let's fix documentation! Or other Rx implementations may have this "jump the queue" effect?

@DavidMGross
Copy link
Collaborator

I wrote this section of the docs, and I wish now I remember why. It may be
that someone had reported this behavior in an earlier version of RxJava, or
maybe it was something I picked up from reading about Rx.NET or some other
implementation and just assumed it was generalizable. I don't think I
would have gratuitously added a note like this without having observed it
somewhere.

It sounds like it's just confusing and misleading at this point, though, so
I'll get rid of it.

On Mon, Jul 20, 2015 at 8:58 AM, Artem Zinnatullin <notifications@github.com

wrote:

That's good, because I was curios about "jump the queue" impl after
playing with RxJava sources and debugging.

So, let's fix documentation! Or other Rx implementations may have this
"jump the queue" effect?


Reply to this email directly or view it on GitHub
#3090 (comment).

David M. Gross
PLP Consulting

@artem-zinnatullin
Copy link
Contributor Author

Okay, so let's just remove this marble diagram (main diagram is okay) and next paragraph?

Note that if the source Observables are “hot” and are therefore sending their emissions and notifications to the Concat concurrently, an onError notification from any of the source Observables may “jump the queue” and trigger an onError notification from the Observable that results from the Concat operation — even if the source Observable that was the source of the error notification has not yet begun to be concatenated to that resulting Observable.

I'd better add this info about hot observables and Concat Operator:

"Notice that if you pass hot Observable to the Concat Operator — you will miss all values emitted from hot Observable before completion of previously concatenated Observables because Concat only subscribes to one source at a time."

It's really major piece of information.

@headinthebox
Copy link
Contributor

Note that is not a property of concat, but will happen whenever you have an argument of type Observable.

@DavidMGross
Copy link
Collaborator

Sounds good.

On Mon, Jul 20, 2015 at 10:55 AM, Artem Zinnatullin <
notifications@github.com> wrote:

Okay, so let's just remove this marble diagram (main diagram is okay) and
next paragraph?

Note that if the source Observables are “hot” and are therefore sending
their emissions and notifications to the Concat concurrently, an onError
notification from any of the source Observables may “jump the queue” and
trigger an onError notification from the Observable that results from the
Concat operation — even if the source Observable that was the source of the
error notification has not yet begun to be concatenated to that resulting
Observable.

I'd better add this info about hot observables and Concat Operator:

"Notice that if you pass hot Observable to the Concat Operator — you will
miss all values emitted from hot Observable before completion of previously
concatenated Observables because Concat only subscribes to one source at a
time."

It's really major piece of information.


Reply to this email directly or view it on GitHub
#3090 (comment).

David M. Gross
PLP Consulting

@artem-zinnatullin
Copy link
Contributor Author

Also (sorry for long thread, but documentation is important part of the library): main marble diagram of Concat Operator is not fully correct and creates a bit of confusion (at least for my coworker, I was watching diagrams from RxJava javadocs), but probably it's limitation of Js library for marble diagrams.

Here is how it looks:

screen shot 2015-07-20 at 21 12 49

Notice, that X axis (left to right) represents time.

As you can see, O2 emits before completion of O1 so values emitted before completion of O1 won't be visible for Concat Operator.

Correct version of diagram used in RxJava javadocs:
concat

Would be nice to fix this too (I am not sure, that I can make PR for this).

@staltz
Copy link
Member

staltz commented Jul 20, 2015

@artem-zinnatullin

main marble diagram of Concat Operator is not fully correct and creates a bit of confusion (at least for my coworker, I was watching diagrams from RxJava javadocs), but probably it's limitation of Js library for marble diagrams.

It is correct because the observables are cold.

@DavidMGross
Copy link
Collaborator

There's an ambiguity, I think, about how time is represented in marble
diagrams. If time proceeds from left to right and is the same for all
observers (cue spooky floating E=mc² from the Twilight Zone intro), then
the second (cold) Observable shouldn't begin emitting items until it is
subscribed too, which won't happen until the first Observable
completes, which means its emissions should appear to the right of the
first Observable's onComplete notification.

If instead you interpret the left-to-right time as being relative to each
Observable but not the same between Observables, there's no problem with
the diagram as shown. But that approach seems to lead to misunderstandings.

I'd expect if I look over the diagrams I made for RxJava and for the
reactivex.io site, I'd find that I was inconsistent in which of the above
approaches I used. My original concat() diagram was like your rxmarbles
version, but I later changed it after I got complaints about this very
issue.

On Mon, Jul 20, 2015 at 12:27 PM, André Staltz notifications@github.com
wrote:

@artem-zinnatullin https://github.com/artem-zinnatullin

main marble diagram of Concat Operator is not fully correct and creates a
bit of confusion (at least for my coworker, I was watching diagrams from
RxJava javadocs), but probably it's limitation of Js library for marble
diagrams.

It is correct because the observables are cold.


Reply to this email directly or view it on GitHub
#3090 (comment).

David M. Gross
PLP Consulting

@artem-zinnatullin
Copy link
Contributor Author

@staltz cold or hot does not matter in this case.

@DavidMGross I guess marble diagrams will be more obvious for Rx users if time will be same for all Observables, let's fix it if it's possible?

@headinthebox
Copy link
Contributor

@DavidMGross' diagram look correct to me. If the second observable is cold, I guess there is no issue. But even if it is hot, there is no way to tell if any events happened before you subscribed, and hence it is correct as well.

I am confused why @artem-zinnatullin is confused, and changing @DavidMGross' diagram will make people even more confused.

@artem-zinnatullin
Copy link
Contributor Author

I am confused why @artem-zinnatullin is confused, and changing @DavidMGross' diagram will make people even more confused.

Because X axis represents time, maybe I am wrong, but I think that time should be same for all Observables, this is how we (me and my coworkers) think about concurrent processes, probably it's the only good way to show this behavior:

Notice that if you pass hot Observable to the Concat Operator — you will miss all values emitted from hot Observable before completion of previously concatenated Observables because Concat only subscribes to one source at a time.

Otherwise, if we will think that time is relative to each Observable there will be no way to show that something can be missed, no way to show that processes (Observables) are really concurrent on any marble diagram.

@headinthebox
Copy link
Contributor

Time is the same; for the second observable you do not see any marbles because the concat operator has not yet subscribed to it, so it would not make sense to show any.

@staltz
Copy link
Member

staltz commented Jul 21, 2015

I am convinced this issue is a symptom of not understanding cold observables.

An Observable is not an event emitter/bus, even though you can implement one with a hot observable. But even "hot observables" are just shared cold observables, hence it's vital to understand cold observables.

if we will think that time is relative to each Observable there will be no way to show that something can be missed, no way to show that processes (Observables) are really concurrent on any marble diagram.

"Missing events" can only happen with hot observables. And cold Observables are not concurrent processes per se (their executions might be). That's the wrong abstraction. Observables are best understood as videos. You can watch (subscribe) them, but nothing really happens until you subscribe. If you think of them as videos, then the RxMarbles concat diagram makes a lot of sense. And if that diagram is assuming cold behavior, then I'd rather even keep the diagram there to help people see the cold behavior. Watch this for more.

@artem-zinnatullin
Copy link
Contributor Author

I am convinced this issue is a symptom of not understanding cold observables.

I understand what is Cold Observable and what is Hot Observable.

"Missing events" can only happen with hot observables.

This is true, of course.

And cold Observables are not concurrent processes per se (their executions might be). That's the wrong abstraction.

Your abstraction is okay until we think about Observables as about separate entities, but when we apply Operators we need to understand how an execution will be performed. In case of Concat — we have a strong rule, time of emission is matter. Otherwise, we need to indicate somehow on the diagram that all Observables are cold and create a separate diagram for Hot Observables.

Observables are best understood as videos.

When you think about the video itself — no problem, it's Cold Observable. But you can play videos concurrently and this is how things work in real life.

Programs are entities that get executed in time, marble diagrams represent emissions (execution) in time -> it's a process, not a static thing.

Okay, now how would you represent Merge Operator on marble diagram with your abstraction correctly? Merge is concurrent, it subscribes to all source Observables in parallel and merges their emissions. Without same time for all Observables it will be impossible to represent actual behavior of Merge Operator.

And… Surprise, marble diagram for Merge Operator uses same time for all Observables:
screen shot 2015-07-21 at 16 19 26

It's correct and works both for Cold and Hot Observables — this is what I want to fix in Concat Diagram. It does not matter for Concat Operator whether Observables are Hot or Cold, why it should matter for the Diagram?

P.S. Thanks for the video!

@headinthebox
Copy link
Contributor

IMHO (me and my team actually invented marble diagrams) the RxMarbles diagram for concat is not correct. Showing a marble at a certain point on the timeline means the operator is subscribed to that stream. Do don't know anything about a stream until you subscribe.

@staltz
Copy link
Member

staltz commented Jul 22, 2015

Showing a marble at a certain point on the timeline means the operator is subscribed to that stream.

Alright I get that argument. Was hard to talk about axis x for "same time". I'll fix concat diagram then.

@artem-zinnatullin
Copy link
Contributor Author

Yay!

@artem-zinnatullin
Copy link
Contributor Author

I guess, this issue can be closed, documentation is fixed, I hope RxMarbles will be fixed too, but on RxJava side everything now looks consistent. Thanks everybody!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants