Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate from SynchronizedObserver to SerializedObserver #962

Merged

Conversation

benjchristensen
Copy link
Member

This pull request replaces use of SynchronizedObserver with SerializedObserver.

Why?

1) Deadlocks

Holding a lock while emitting notifications (onNext, onCompleted, onError) allows for deadlocks if the event results in a cycle back to the parent.

While testing RxJava 0.17.0 in Netflix production we ran into one of these. The vulnerability has existed all along but we finally hit it.

This issue has also been reported before such as: http://twistedoakstudios.com/blog/Post8424_deadlocks-in-practice-dont-hold-locks-while-notifying

2) Blocking Threads

The use of synchronized can block threads. If it's used in areas such as modifying a data structure this can be okay. When emitting a notification however it is a problem as the result of an onNext can take a non-deterministically long time to complete. This means any other thread trying to emit will be blocked.

If the source threads are event loops (such as Vert.x or Netty) this will block the event loops.

For example, if two network calls off two Netty event loops are being merged (such as via flatMap) and one of them does further slow processing that causes onNext to be slow, it will block the other onNext which blocks the event loop and prevents any further IO on that thread. This is a significant problem for system scale and breaks the promise of Rx being a non-blocking, reactive library.

Solution

The synchronize, SynchronizedObserver and SynchronizedSubscriber operator and classes have been deprecated. They are replaced by serialize, SerializedObserver and SerializedSubscriber.

The SerializedObserver still ensures only a single thread can emit onNext, onCompleted, or onError at a time but does not hold a lock while doing so. Instead of blocking threads it will accept the incoming events into a buffer. Thus, it becomes an asynchronous operator.

The merge operator (which impacts flatMap) now uses SerializedObserver, along with any other place in RxJava that needed synchronization.

Implementation

3 implementatations were written and tested:

Performance testing revealed:

    /**
     * 1 streams emitting in a tight loop. Testing for single-threaded overhead.
     * 
     * -> blocking synchronization (SynchronizedObserver)
     * 
     * Run: 10 - 58,186,310 ops/sec
     * Run: 11 - 60,592,037 ops/sec
     * Run: 12 - 58,099,263 ops/sec
     * Run: 13 - 59,034,765 ops/sec
     * Run: 14 - 58,231,548 ops/sec
     * 
     * -> state machine technique (SerializedObserverViaStateMachine)
     * 
     * Run: 10 - 34,668,810 ops/sec
     * Run: 11 - 32,874,312 ops/sec
     * Run: 12 - 33,389,339 ops/sec
     * Run: 13 - 35,269,946 ops/sec
     * Run: 14 - 34,165,013 ops/sec
     * 
     * -> using queue and counter technique (SerializedObserverViaQueueAndCounter)
     * 
     * Run: 10 - 19,548,387 ops/sec
     * Run: 11 - 19,471,069 ops/sec
     * Run: 12 - 19,480,112 ops/sec
     * Run: 13 - 18,720,550 ops/sec
     * Run: 14 - 19,070,383 ops/sec
     * 
     * -> using queue and lock technique (SerializedObserverViaQueueAndLock)
     * 
     * Run: 10 - 51,295,152 ops/sec
     * Run: 11 - 50,317,937 ops/sec
     * Run: 12 - 51,126,331 ops/sec
     * Run: 13 - 52,418,291 ops/sec
     * Run: 14 - 51,694,710 ops/sec
     */

    /**
     * 2 streams emitting in tight loops so very high contention.
     * 
     * -> blocking synchronization (SynchronizedObserver)
     * 
     * Run: 10 - 8,361,252 ops/sec
     * Run: 11 - 7,184,728 ops/sec
     * Run: 12 - 8,249,685 ops/sec
     * Run: 13 - 6,831,595 ops/sec
     * Run: 14 - 8,003,358 ops/sec
     * 
     * (faster because it allows each thread to be "single threaded" while blocking the other)
     * 
     * -> state machine technique (SerializedObserverViaStateMachine)
     * 
     * Run: 10 - 4,060,062 ops/sec
     * Run: 11 - 3,561,131 ops/sec
     * Run: 12 - 3,721,387 ops/sec
     * Run: 13 - 3,693,909 ops/sec
     * Run: 14 - 3,516,324 ops/sec
     * 
     * -> using queue and counter technique (SerializedObserverViaQueueAndCounter)
     * 
     * Run: 10 - 4,300,229 ops/sec
     * Run: 11 - 4,395,995 ops/sec
     * Run: 12 - 4,551,550 ops/sec
     * Run: 13 - 4,443,235 ops/sec
     * Run: 14 - 4,158,475 ops/sec
     * 
     * -> using queue and lock technique (SerializedObserverViaQueueAndLock)
     * 
     * Run: 10 - 6,369,781 ops/sec
     * Run: 11 - 6,933,872 ops/sec
     * Run: 12 - 5,652,535 ops/sec
     * Run: 13 - 5,503,716 ops/sec
     * Run: 14 - 6,219,264 ops/sec
     */

    /**
     * 2 streams emitting once a millisecond. Slow emission so little to no contention.
     * 
     * -> blocking synchronization (SynchronizedObserver)
     * 
     * Run: 10 - 1,996 ops/sec
     * Run: 11 - 1,996 ops/sec
     * Run: 12 - 1,995 ops/sec
     * Run: 13 - 1,997 ops/sec
     * Run: 14 - 1,996 ops/sec
     * 
     * -> state machine technique (SerializedObserverViaStateMachine)
     * 
     * Run: 10 - 1,996 ops/sec
     * Run: 11 - 1,996 ops/sec
     * Run: 12 - 1,996 ops/sec
     * Run: 13 - 1,996 ops/sec
     * Run: 14 - 1,996 ops/sec
     * 
     * -> using queue and counter technique (SerializedObserverViaQueueAndCounter)
     * 
     * Run: 10 - 1,996 ops/sec
     * Run: 11 - 1,996 ops/sec
     * Run: 12 - 1,996 ops/sec
     * Run: 13 - 1,996 ops/sec
     * Run: 14 - 1,995 ops/sec
     * 
     * -> using queue and lock technique (SerializedObserverViaQueueAndLock)
     * 
     * Run: 10 - 1,996 ops/sec
     * Run: 11 - 1,996 ops/sec
     * Run: 12 - 1,997 ops/sec
     * Run: 13 - 1,996 ops/sec
     * Run: 14 - 1,995 ops/sec
     */

The state machine solution was tested in production but caused performance problems, most likely due to the immense object allocation it needs to do.

The elegant "queue and counter" solution does not perform well enough in the non-contended case.

The "queue and lock" model performs well in the non-contended case and under contention, despite not being a very elegant solution and requiring the use of mutex locks for the state changes (but it does not hold the locks during notification).

Considerations

This does allow unbounded buffer growth, the same as observeOn and zip instead of blocking the producer threads.

Conclusion

The implementation in this pull request can and likely will be improved over time. The other implementations are purposefully being shown to allow others to provide further insight on how to do this better.

This change is important to ensure RxJava is non-blocking and our canary testing of this change in the Netflix production environment suggests this change is both performant and functional.

@cloudbees-pull-request-builder

RxJava-pull-requests #902 SUCCESS
This pull request looks good

benjchristensen added a commit that referenced this pull request Mar 13, 2014
Migrate from SynchronizedObserver to SerializedObserver
@benjchristensen benjchristensen merged commit 4834d85 into ReactiveX:master Mar 13, 2014
@benjchristensen benjchristensen deleted the serialize-synchronize branch March 13, 2014 18:34
if (canEmit) {
// we won the right to emit
try {
drainQueue(list);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should onError be draining the queue before sending the error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can, but if we're winning the right to emit immediately as it's doing here, it's highly unlikely there is anything in the queue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, by "yes we can" I meant we can clear it ... which means skip draining it in the onError case.

@akarnokd
Copy link
Member

How about the enqueue/drain approach with synchronized counter increment and queue replacement as drain similar to your code? (No ConcurrentLinkedQueue.)

@benjchristensen
Copy link
Member Author

Possibly, but it exposes a different problem: starvation. One thread could end up constantly looping and draining the queue and never get a chance to return and emit data that it needs to emit.

I considered setting a max number of loops, but when it finishes those loops then the other issue would still occur, possibly leaving items in the queue waiting for some period of time until another event.

The queue/drain model works far better when there is a single thread dedicated to draining like in observeOn. Stealing threads like this I'm not sure how to avoid having one of the two problems, and I'm debating which one is worse (obviously I've decided starvation is worse for now, and termination events always end up causing the queue to be drained).

@headinthebox
Copy link
Contributor

But streams are not guaranteed to terminate. They can be silent forever like Observable.never

@benjchristensen
Copy link
Member Author

They can be silent forever like Observable.never

In which case it would never emit anything and never queue anything.

But streams are not guaranteed to terminate.

Of course, in which case it's the next onNext that would drain the queue.

The most concerning type of use case in the current implementation is one where 2 events are emitted, one is delivered, the other is queued, then a long time passes before any further events occur. In this use case, the second onNext is just sitting in a queue waiting to be delivered.

The only model I can think of that solves this requires injecting extra concurrency - using a Scheduler (defaulting to the computation one) to drain the queue, but optimizing for using the providing thread in the non-contended case, only using the Scheduler to drain the queue when there is contention. I'm not a fan of requiring that though for merge since that means flatMap could result in additional concurrency - but perhaps it's an okay tradeoff to sometimes end up delivering the events via a different Scheduler (think "event loop") to ensure these two bad cases can't happen. It would be a significant departure to start allowing additional scheduling and concurrency on merge though.

@abersnaze
Copy link
Contributor

Could a fast producer see that it's starving another thread and proactively
steal the canEmit to take over draining for a bit.

On Thu, Mar 13, 2014 at 5:48 PM, Ben Christensen
notifications@github.comwrote:

They can be silent forever like Observable.never

In which case it would never emit anything and never queue anything.

But streams are not guaranteed to terminate.

Of course, in which case it's the next onNext that would drain the queue.

The most concerning type of use case in the current implementation is one
where 2 events are emitted, one is delivered, the other is queued, then a
long time passes before any further events occur. In this use case, the
second onNext is just sitting in a queue waiting to be delivered.

The only model I can think of that solves this requires injecting extra
concurrency - using a Scheduler (defaulting to the computation one) to
drain the queue, but optimizing for using the providing thread in the
non-contended case, only using the Scheduler to drain the queue when
there is contention. I'm not a fan of requiring that though for mergesince that means
flatMap could result in additional concurrency - but perhaps it's an okay
tradeoff to sometimes end up delivering the events via a different
Scheduler (think "event loop") to ensure these two bad cases can't
happen. It would be a significant departure to start allowing additional
scheduling and concurrency on merge though.


Reply to this email directly or view it on GitHubhttps://github.com//pull/962#issuecomment-37604736
.

@benjchristensen
Copy link
Member Author

Could a fast producer see that it's starving another thread and proactively
steal the canEmit to take over draining for a bit.

How would you do that, by keeping track of the number of onNext received by each Thread ID?

And how would it steal the canEmit without blocking?

@benjchristensen
Copy link
Member Author

I don't know if this issue will remain in a few weeks if we end up implementing the continuation back-pressure solution, as the inner queue would be bounded to a relatively small size (128? 512? 1024?) and the origin threads would end up parking themselves if they fill their buffers.

This would naturally cause a form of rescheduling and no single thread would starve out another as all would be required to limit the amount they emit.

@Strilanc
Copy link

Glad to see people taking notice of the deadlock issue.

Here are some tweaks that might help the queue-and-counter case:

  • Optimistically start operations by CompareSwapping 1 for 0 into the counter. If it succeeds, do your work without queueing it (then try to drain the queue and decrement out of that responsibility as usual). This saves two atomic queue operations when things don't overlap, but costs a CAS when they do.
  • Wrap the Observable instead of the Observer. This reduces the number of atomic operations when there are multiple subscriptions, but makes each serialized operation take longer (so will cause more overlap).
  • Since draining the queue can potentially live-lock a thread, keep a non-synchronized count of how much work you have drained. When it exceeds 10 or 100 or whatever, spin off a new thread pool task to do the work so the caller gets control back. (Handing off to another producer is hard)
  • Improve the concurrent queue, using the knowledge that our case is many-producer-single-consumer. For example, a long time ago I tried an idea where producers did a single atomic Exchange, the consumer had no atomic operations, but there were issues with when queued operations became visible and that problem complicated draining and ultimately made it slower.

@benjchristensen
Copy link
Member Author

Hi @Strilanc, thanks for getting involved

queue-and-counter case

Are you able to get it to perform as well as the queue-and-lock implementation?

Wrap the Observable instead of the Observer.

What do you mean by this? Here is the use case that needs to be solved: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/OperatorMerge.java#L39

Since draining the queue can potentially live-lock a thread

We have chosen the other tradeoff of possibly delayed delivery right now so this doesn't happen. We discuss this tradeoff in the comments above. Your proposed solution is one of the possibilities discussed but involves adding additional concurrency which has traditionally been unwanted (for cases like merge/flatMap). I think this issue goes away if we end up implementation the back pressure design we're considering that would eliminate the unbounded buffers and thus naturally eliminate the live-lock/thread-starvation issue. (Example origin supporting continuation and parking itself: https://github.com/benjchristensen/RxBackpressure/blob/master/rx-bp-prototype/src/main/java/rx/operators/OnSubscribeFromIterable.java#L69)

Improve the concurrent queue

Definitely. As part of the continuation implementation for back pressure I expect us to end up using optimized ring buffers since we will have fixed size buffers. We may be able to use a multi-producer-single-consumer ring buffer in some places, but in this case where we're stealing producer threads it's not a traditional producer/consumer use case so likely will still need multi-producer-multi-consumer.

@Strilanc
Copy link

Hey @benjchristensen ,

Are you able to get it to perform as well as the queue-and-lock implementation?

I used the optimistic-CAS trick as part of fixing a performance problem in an Objective-C app that used ReactiveCocoa. However, that was essentially a single threaded case and I can't test on anything even approaching the scale NetFlix can.

What do you mean by this? Here is the use case that needs to be solved: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/OperatorMerge.java#L39

You're right that it doesn't apply to that case. I was thinking of subscribing to a BehaviorSubject, where it would be wasteful to do the serialization once per observer instead of once inside the subject.

We have chosen the other tradeoff of possibly delayed delivery right now so this doesn't happen. We discuss this tradeoff in the comments above. [...]

I think those comments are all about passing the draining to another producer. The suggestion I'm giving is to transfer the work to the thread pool. This avoids issues where the queue stops until another producer comes along, and the corresponding tricky synchronization involved in that.

but in this case where we're stealing producer threads it's not a traditional producer/consumer use case so likely will still need multi-producer-multi-consumer.

Yes, it is a bit of a unique case. The consumer is only ever in one place, but that place keeps teleporting around.

@benjchristensen
Copy link
Member Author

I can't test on anything even approaching the scale NetFlix can.

I don't think the Netflix scale impacts this very much, as many application use cases regardless of horizontal or vertical scale apply. There are 3 primary use cases that occur when serializing:

  1. No contention

This needs to be optimized for as it's very common and the JVM does a great job with locks of recognizing there is no contention and performing well.

  1. Highly Contended

This happens when tight for/while loops are emitting and being merged. It's somewhat of a bad use case, but it happens. Locks perform rather well per-thread since they block the other threads.

  1. Intermittent contention

This case is easy with locks, queues or CAS and all perform about the same.

I was thinking of subscribing to a BehaviorSubject, where it would be wasteful to do the serialization once per observer instead of once inside the subject.

That makes sense.

The suggestion I'm giving is to transfer the work to the thread pool. This avoids issues where the queue stops until another producer comes along, and the corresponding tricky synchronization involved in that.

Agreed. It will be far easier in this case, and much more similar to how the queue-drain model works in observeOn. We've been trying to eliminate adding concurrency to serialize though, as the last thing anyone expects flatMap/merge to do is inject a new thread to consume the work.

In Rx it's fairly normal for the producer to move across threads (as long as the emissions are serialized) since merging of streams happens all the time or other things like onErrorResumeNext and timeout. Moving consumption to a new thread though only happens when explicitly asked for via observeOn.

That said, if the back-pressure work does not solve the trade-offs (thread-starvation or notification delay) we will likely end up pursuing the fallback solution of moving work to a Scheduler when the live-lock counter threshold is hit.

@benjchristensen
Copy link
Member Author

Optimistically start operations by CompareSwapping 1 for 0 into the counter. If it succeeds, do your work without queueing it

I had done this optimization on the state machine one but hadn't on the queue and counter implementation as I'm not quite sure the race condition is safe. It seems it should be good as it drains AFTER emitting, since this only occurs when count is 0, but I get multiple unit test failures when I attempt this optimization. I haven't spent the time to figure out why. Code updated to not queue on non-contended case here: https://github.com/benjchristensen/RxJava/blob/serialize-implementations/rxjava-core/src/main/java/rx/observers/SerializedObserverViaQueueAndCounter.java#L50

With this optimization here are the before and after on performance:

     * -> using queue and counter technique (SerializedObserverViaQueueAndCounter)
     * 
     * Run: 10 - 19,548,387 ops/sec
     * Run: 11 - 19,471,069 ops/sec
     * Run: 12 - 19,480,112 ops/sec
     * Run: 13 - 18,720,550 ops/sec
     * Run: 14 - 19,070,383 ops/sec
     * 
     * ... after optimizations
     * 
     * Run: 10 - 40,202,262 ops/sec
     * Run: 11 - 40,628,288 ops/sec
     * Run: 12 - 41,878,527 ops/sec
     * Run: 13 - 43,400,405 ops/sec
     * Run: 14 - 40,002,117 ops/sec

However, the queue-and-lock implementation is still faster:

     * Run: 10 - 51,295,152 ops/sec
     * Run: 11 - 50,317,937 ops/sec
     * Run: 12 - 51,126,331 ops/sec
     * Run: 13 - 52,418,291 ops/sec
     * Run: 14 - 51,694,710 ops/sec

... and this optimization makes the contended cases slower:

     * Run: 10 - 4,300,229 ops/sec
     * Run: 11 - 4,395,995 ops/sec
     * Run: 12 - 4,551,550 ops/sec
     * Run: 13 - 4,443,235 ops/sec
     * Run: 14 - 4,158,475 ops/sec
     * 
     * ... after "optimizations"
     * 
     * Run: 10 - 2,857,008 ops/sec
     * Run: 11 - 3,093,778 ops/sec
     * Run: 12 - 4,009,758 ops/sec
     * Run: 13 - 3,094,435 ops/sec
     * Run: 14 - 3,166,119 ops/sec

Even if the performance changes made it better than queue-and-lock I am not certain this is thread-safe and there are 6 unit tests across the codebase now failing, whereas queue-and-lock still performs better and all tests pass.

I will explore this more as we finish experimenting with back pressure as that will define how merge and the primary use case for serialization/synchronization can be implemented.

@benjchristensen
Copy link
Member Author

Thanks for continuing on this. I'm not surprised we can't beat the compiler :-) I'm curious why the CAS operations aren't similarly optimized.

This exploration isn't done yet, and I'll come back to it in coming weeks. We don't necessarily need to be faster than locks to end up using CAS, there are other benefits to CAS over locks.

@davidmoten
Copy link
Collaborator

I'm seeing a nasty bug related to this I believe where the use of merge + SerializedSubscriber has the result of onCompleted being called before onNext has finished. I thought I'd mention it now while I dig around trying to get a unit test in case it rings bells for those of you that a familiar with the changed code. My use case worked as expected in 0.17.0 then not in 0.17.1 at all. Unfortunately the code is too complex to present here without adding stupid amounts of noise so I will try to distill a unit test.

@Strilanc
Copy link

@benjchristensen @davidmoten I think the problem has to do with the queue not being drained properly. There's a copy-away-to-avoid-locking thing happening, but drainQueue only drains the copy. It doesn't go back and check if there's more to drain, which can leave the queue with items hanging.

Actually, I'm kind of surprised any concurrent completions are working. If terminated is set while the queue is being concurrently drained, I don't think anything forwards the completion ever!

There's also a race on emitting being cleared vs another producer taking up draining.

I think your particular bug is because of this:

    if (canEmit) {
        // we won the right to emit
        try {
            drainQueue(list);
            actual.onNext(t);

There's a race with an onComplete firing before that call to drainQueue runs, then onNext ruins everything. ... Except that list != queue, so I dunno...

Still, the hand-off process between producers is wrong in the current queue+lock implementation. It's rotten enough to consider starting from scratch to avoid contamination from the same subtle mistakes. There definitely need to be tests that detect these.

@davidmoten
Copy link
Collaborator

Thanks for that, I'll stop chasing a unit test. I'll leave it to the rxjava
experts to run with.

On 20 March 2014 11:22, Craig Gidney notifications@github.com wrote:

I think the problem has to do with the queue not being drained properly.
There's a copy-away-to-avoid-locking thing happening, but drainQueue only
drains the copy. It doesn't go back and check if there's more to drain,
which can leave the queue with items hanging.

Actually, I'm kind of surprised any concurrent completions are working. If
terminated is set while the queue is being concurrently drained, I don't
think anything forwards the completion ever!

There's also a race on emitting being cleared vs another producer taking
up draining.

Your particular bug is because of this:

if (canEmit) {
    // we won the right to emit
    try {
        drainQueue(list);
        actual.onNext(t);

There's a race with an onComplete firing before that call to drainQueueruns, then
onNext ruins everything.

Basically the entire hand-off process between producers is wrong in the
current queue+lock implementation.

Reply to this email directly or view it on GitHubhttps://github.com//pull/962#issuecomment-38123526
.

@benjchristensen
Copy link
Member Author

If terminated is set while the queue is being concurrently drained, I don't think anything forwards the completion ever!

It happens right here: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/observers/SerializedObserver.java#L160

@benjchristensen
Copy link
Member Author

Can one of you please provide me a test case? The only way I can cause a problem so far is if the source Observable breaks the Rx contract.

Here are examples where threads are racing each other:

    @Test
    public void testConcurrency() {

        Observable<Integer> o = Observable.create(new OnSubscribe<Integer>() {

            @Override
            public void call(final Subscriber<? super Integer> s) {
                Schedulers.newThread().schedule(new Action1<Inner>() {

                    @Override
                    public void call(Inner inner) {
                        for (int i = 0; i < 10000; i++) {
                            s.onNext(1);
                        }
                        s.onCompleted();
                    }

                });
            }
        });

        for (int i = 0; i < 1000; i++) {
            Observable<Integer> merge = Observable.merge(o, o, o);
            TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
            merge.subscribe(ts);

            ts.awaitTerminalEvent();
            assertEquals(1, ts.getOnCompletedEvents().size());
            assertEquals(30000, ts.getOnNextEvents().size());
            List<Integer> onNextEvents = ts.getOnNextEvents();
            System.out.println("onNext: " + onNextEvents.size() + " onCompleted: " + ts.getOnCompletedEvents().size());
        }
    }

Another variant this time with sleeps to cause random concurrent behavior:

    @Test
    public void testConcurrencyWithSleeping() {

        Observable<Integer> o = Observable.create(new OnSubscribe<Integer>() {

            @Override
            public void call(final Subscriber<? super Integer> s) {
                Schedulers.newThread().schedule(new Action1<Inner>() {

                    @Override
                    public void call(Inner inner) {
                        for (int i = 0; i < 100; i++) {
                            s.onNext(1);
                            try {
                                Thread.sleep(1);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        s.onCompleted();
                    }

                });
            }
        });

        for (int i = 0; i < 100; i++) {
            Observable<Integer> merge = Observable.merge(o, o, o);
            TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
            merge.subscribe(ts);

            ts.awaitTerminalEvent();
            assertEquals(1, ts.getOnCompletedEvents().size());
            assertEquals(300, ts.getOnNextEvents().size());
            List<Integer> onNextEvents = ts.getOnNextEvents();
            System.out.println("onNext: " + onNextEvents.size() + " onCompleted: " + ts.getOnCompletedEvents().size());
        }
    }

Both of those work correctly.

I am however able to get it to behave poorly when the Rx contract is broken:

    @Test
    public void testConcurrencyWithBrokenContract() {

        Observable<Integer> o = Observable.create(new OnSubscribe<Integer>() {

            @Override
            public void call(final Subscriber<? super Integer> s) {
                Schedulers.newThread().schedule(new Action1<Inner>() {

                    @Override
                    public void call(Inner inner) {
                        for (int i = 0; i < 10000; i++) {
                            s.onNext(1);
                        }
                        s.onCompleted();
                        for (int i = 0; i < 100; i++) {
                            s.onNext(1);
                        }
                        s.onCompleted();
                        for (int i = 0; i < 100; i++) {
                            s.onNext(1);
                        }
                    }

                });
            }
        });

        for (int i = 0; i < 1000; i++) {
            Observable<Integer> merge = Observable.merge(o, o, o);
            TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
            merge.subscribe(ts);

            ts.awaitTerminalEvent();
            assertEquals(1, ts.getOnCompletedEvents().size());
            assertEquals(30000, ts.getOnNextEvents().size());
            List<Integer> onNextEvents = ts.getOnNextEvents();
            System.out.println("onNext: " + onNextEvents.size() + " onCompleted: " + ts.getOnCompletedEvents().size());
        }
    }

This emits: java.lang.AssertionError: expected:<30000> but was:<29377>

The fix for this is to not decrement the count if onComplete is received more than once:

             final class InnerObserver extends Subscriber<T> {

+                private boolean innerCompleted = false;
+
                 public InnerObserver() {
                 }

                 @Override
                 public void onCompleted() {
-                    if (runningCount.decrementAndGet() == 0 && completed) {
-                        o.onCompleted();
+                    if (!innerCompleted) {
+                        // we check if already completed otherwise a misbehaving Observable that emits onComplete more than once
+                        // will cause the runningCount to decrement multiple times.
+                        innerCompleted = true;
+                        if (runningCount.decrementAndGet() == 0 && completed) {
+                            o.onCompleted();
+                        }
+                        cleanup();
                     }
-                    cleanup();
                 }

@benjchristensen
Copy link
Member Author

Just merged the fix for an Observable that emits multiple onComplete.

@davidmoten
Copy link
Collaborator

Ben, this serialized subscriber complicates my world for this reason:

I am merging asynchronous observables. Each observable has a resource that must remain open till all onNext calls have finished for that observable. The OnSubscribe for the observable get a fresh resource, calls onNext repeatedly then closes the resource and calls onComplete. Because the SerializedSubscriber has buffered stuff asynchronously I am encountering the situation where the resource has been closed before the onNext have finished and I get errors.

Unless I'm mistaken this is a most undesirable effect that will complicate the hell out of writing Observables.

@Strilanc
Copy link

@benjchristensen My guess was that if a subject sent a re-entrant onCompleted in its serialized onNext callback, that the onComplete would not forward. Something like:

Subject<Object> s = new Subject<>();
Observable<Object> r = s.Serialized();
bool[] a = new bool[1];
r.onCompleted(() -> a[0] = true);
r.onNext((x) -> s.sendCompleted());
s.sendNext(null);
assert(a[0]);

(I apologize if I got some of the method names wrong.)

@Strilanc
Copy link

@davidmoten It sounds like the problem is that the resource is being closed by the producer, instead of the consumer, when it's the consumer that relies upon it. Would it also break if delay was applied to it?

I don't know the conventions of RxJava well enough to suggest what should be done. My guess would be that the consumer should be responsible for disposing the subscription, even if it completes, and so you'd cleanup-on-dispose rather than cleanup-on-complete-sent.

@davidmoten
Copy link
Collaborator

@Strilanc good description. not sure about delay, I'll look it up.

Cleanup on completion I suspect is achieved with the Observable.using() method so I'll try that but cleanup on dispose is also a possibility. I wrote a quick test and unsubscribe is called even after onCompleted is called so that could work. I'm a bit concerned in the synchronous case that if I use unsubscribe to close resources then the subsequent observable (say in a flatMap) might commence and be blocked waiting for a resource from a singleton resource pool.

@Strilanc
Copy link

@davidmoten I don't know the specifics of the situation so this might be a ridiculous suggestion in context, but what about releasing producing rights after on-complete is sent but only release the actual resource based on a subscribed reference count?

So, consumer1 subscribes and the resource is allocated. The notifications get sent, and the resource is now no longer "busy" but does still have to exist. Consumer2 subscribes, and does not wait because the resource is not "busy" (despite already being allocated). Consumer2 unsubscribes, and the resource goes back to "allocated but not busy". Consumer1 unsubscribes and the resource gets released.

@benjchristensen
Copy link
Member Author

Each observable has a resource that must remain open till all onNext calls have finished for that observable.

@davidmoten Once an Observable has emitted all of its onNext it has no further control over the events. Anything downstream can be async (such as zip and observeOn) and onCompleted and cleanup will have been performed by the source Observable while the events are still being processed downstream. That is the nature of async push.

My guess was that if a subject sent a re-entrant onCompleted in its serialized onNext callback, that the onComplete would not forward.

@Strilanc The Rx contract is clear that once an onCompleted or onError is received no further events are to be sent. Those are terminal events. Thus your example breaks the contract by sending onComplete to Observable "s" and then expecting to send more events to it.

My guess would be that the consumer should be responsible for disposing the subscription, even if it completes, and so you'd cleanup-on-dispose rather than cleanup-on-complete-sent.

The unsubscribe always gets called at the end (after onComplete or onError) but events can not rely upon the source Observable not having been cleaned up because any number of compositions can cause the source Observable to be long gone by the time the events are processed. Some examples include: zip, observeOn, cache, replay, delay, window, buffer, and now merge.

The unsubscribe event also can occur well before the final Subscriber receives the events. For example, the merge operator eagerly invokes unsubscribe as soon as onComplete is received otherwise memory leaks can occur on long-lived streaming use cases since the final Subscriber may never actually complete.

From what I can tell the difference in behavior you're seeing is coincidental. Before the synchronized solution would allow a broken contract to flow through, such as: onNext, onCompleted, onNext. This would flow through until some operator or the final SafeSubscriber filtered out the last onNext that broke the contract. The new serialized solution is maintaining the contract and filtering out everything after a terminal event (according to the Rx contract). It doesn't need to be done on serialized but is an optimization to allow it to finish draining the queue. I could go either way on this one ... but no operator or Observable should depend on events propagating after onCompleted, onError or unsubscribe.

@davidmoten What is the use case you have where emitted events depend on their original Observable not having been cleaned up? That is not going to be reliable when it gets composed.

@davidmoten
Copy link
Collaborator

@benjchristensen https://github.com/benjchristensen thanks for your
detailed response. I've refactored my observable to internalize usage of
the resource to the Observable and it now only emits items that do not
depend on the resource. I did this on my first iteration of the code a
month ago but found nothing was pushing me to do this in particular
(clearly hadn't used zip, cache etc in my use cases and was deliberately
avoiding observeOn).

The use case was resource = jdbc connection and item = ResultSet. The first
action from the observable was always a map to somethiing not dependent on
the resultset/connection which was frequently flatmapped but
SerializedSubscriber threw a spanner in the works and quite rightly so I
realize. I've internalized that map now so that a ResultSet is not emitted
and all my unit tests pass with 0.17.1. At least now I'll be able to throw
async operators at it with impunity now. FYI the project is at
https://github.com/davidmoten/rxjava-jdbc.

Thanks again for your time and to @Strilanc, twas all me and was easily
fixed.
Dave

On 22 March 2014 01:17, Ben Christensen notifications@github.com wrote:

Each observable has a resource that must remain open till all onNext calls
have finished for that observable.

@davidmoten https://github.com/davidmoten Once an Observable has
emitted all of its onNext it has no further control over the events.
Anything downstream can be async (such as zip and observeOn) and
onCompleted and cleanup will have been performed by the source Observablewhile the events are still being processed downstream. That is the nature
of async push.

My guess was that if a subject sent a re-entrant onCompleted in its
serialized onNext callback, that the onComplete would not forward.

@Strilanc https://github.com/Strilanc The Rx contract is clear that
once an onCompleted or onError is received no further events are to be
sent. Those are terminal events. Thus your example breaks the contract by
sending onComplete to Observable "s" and then expecting to send more
events to it.

My guess would be that the consumer should be responsible for disposing
the subscription, even if it completes, and so you'd cleanup-on-dispose
rather than cleanup-on-complete-sent.

The unsubscribe always gets called at the end (after onComplete or onError)
but events can not rely upon the source Observable not having been
cleaned up because any number of compositions can cause the source
Observable to be long gone by the time the events are processed. Some
examples include: zip, observeOn, cache, replay, delay, window, buffer,
and now merge.

The unsubscribe event also can occur well before the final Subscriberreceives the events. For example, the
merge operator eagerly invokes unsubscribe as soon as onComplete is
received otherwise memory leaks can occur on long-lived streaming use cases
since the final Subscriber may never actually complete.

From what I can tell the difference in behavior you're seeing is
coincidental. Before the synchronized solution would allow a broken
contract to flow through, such as: onNext, onCompleted, onNext. This
would flow through until some operator or the final SafeSubscriberfiltered out the last
onNext that broke the contract. The new serialized solution is
maintaining the contract and filtering out everything after a terminal
event (according to the Rx contract). It doesn't need to be done on
serialized but is an optimization to allow it to finish draining the
queue. I could go either way on this one ... but no operator or Observable
should depend on events propagating after onCompleted, onError or
unsubscribe.

@davidmoten https://github.com/davidmoten What is the use case you have
where emitted events depend on their original Observable not having been
cleaned up? That is not going to be reliable when it gets composed.

Reply to this email directly or view it on GitHubhttps://github.com//pull/962#issuecomment-38278838
.

@headinthebox
Copy link
Contributor

Always keep a copy of http://go.microsoft.com/fwlink/?LinkID=205219 next to your keyboard (which reminds me, we should do a refresh of this, and for every language).

@benjchristensen
Copy link
Member Author

@davidmoten Glad it was easy to resolve.

benjchristensen added a commit to benjchristensen/RxJava that referenced this pull request Mar 26, 2014
@davidmoten
Copy link
Collaborator

@benjchristensen With reference to this problem:

The most concerning type of use case in the current implementation is one where 2 events are emitted, one is delivered, the other is queued, then a long time passes before any further events occur. In this use case, the second onNext is just sitting in a queue waiting to be delivered.

I'm encountering this problem now using merge. My use case is described here . My assumption at this point is that the problem is here to stay and I'm looking for the least objectionable way of handling it. Do we need to be able to parameterize the use of a SerializedSubscriber so that we can avoid the behaviour if we choose to? In SerializedSubscriber once winning the right to emit we currently drain the queue before the onNext is called. Is there any scope for draining the queue some limited/configurable number of times AFTER the onNext is called as well? I certainly see problems with this as well because the last drain of the queue still offers the chance of delaying an incoming onNext. For certain use cases however this could reduce the likelihood of the delay happening. Perhaps some probabilistic diminishment is the best we can hope for.

@Strilanc
Copy link

@davidmoten If an event sits in the queue until the next event, instead of eventually being forwarded even if another event doesn't come along, it's a bug.

Are you actually experiencing this bug?

@davidmoten
Copy link
Collaborator

@Strilanc Yeah I'm experiencing it as described here. My first para above quotes Ben on this one and is known issue. I realize on reviewing the conversation above that my rambly ideas seem to have been talked about already. I thought I'd push it along a little seeing as I'm bumping into the issue now (though not without a somewhat ugly workaround). It strikes me as a serious side-effect of work that is trying to ameliorate deadlocks and blocking.

@benjchristensen
Copy link
Member Author

I have created #998 for us to continue this discussion and determine how to move forward.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants