From 1fa6ae3be23200c787cc1c25c0bb8cac99ae0d17 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Fri, 10 May 2013 13:20:33 -0700 Subject: [PATCH] fix concurrency bug in ScheduledObserver - found a concurrency bug while working on https://github.com/Netflix/Hystrix/issues/123 - the following code would lock up occasionally due to onCompleted not being delivered: ```java public class RunTest { public static void main(String[] args) { System.out.println("Starting test..."); final ArrayList strings = new ArrayList(200000); int num = 10000; while (true) { long start = System.currentTimeMillis(); final AtomicInteger count = new AtomicInteger(); for (int i = 0; i < num; i++) { new TestService1(2, 5).toObservable().forEach(new Action1() { @Override public void call(Integer v) { count.addAndGet(v); } }); new TestService2("hello").toObservable().forEach(new Action1() { @Override public void call(String v) { strings.add(v); } }); } long time = (System.currentTimeMillis() - start); long executions = num * 2; System.out.println("Time: " + time + "ms for " + executions + " executions (" + (time * 1000) / executions + " microseconds)"); System.out.println(" Count: " + count); System.out.println(" Strings: " + strings.size()); strings.clear(); } } } ``` - Also made OperationObserveOn not use ScheduledObserver if the `ImmediateScheduler` is chosen to allow an optimization. I believe this optimization is safe because ScheduledObserver does not require knowledge of a Scheduler (such as for now()) and all we do is emit data to the Observer on a scheduler and if we know it's Immediate we can go direct and skip the enqueuing step. This allows shaving off a noticable number of microseconds per execution in the loop above. --- .../java/rx/operators/OperationObserveOn.java | 8 +- .../java/rx/operators/ScheduledObserver.java | 74 ++++++++----------- 2 files changed, 37 insertions(+), 45 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index 12276c7b8d..201b3c53f0 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -30,6 +30,7 @@ import rx.Observer; import rx.Scheduler; import rx.Subscription; +import rx.concurrency.ImmediateScheduler; import rx.concurrency.Schedulers; import rx.util.functions.Func1; @@ -50,7 +51,12 @@ public ObserveOn(Observable source, Scheduler scheduler) { @Override public Subscription call(final Observer observer) { - return source.subscribe(new ScheduledObserver(observer, scheduler)); + if (scheduler instanceof ImmediateScheduler) { + // do nothing if we request ImmediateScheduler so we don't invoke overhead + return source.subscribe(observer); + } else { + return source.subscribe(new ScheduledObserver(observer, scheduler)); + } } } diff --git a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java index 7273a29f5c..c491760a1b 100644 --- a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java +++ b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,18 +18,13 @@ import rx.Notification; import rx.Observer; import rx.Scheduler; +import rx.concurrency.Schedulers; import rx.util.functions.Action0; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; - /* package */class ScheduledObserver implements Observer { private final Observer underlying; private final Scheduler scheduler; - private final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); - private final AtomicInteger counter = new AtomicInteger(0); - public ScheduledObserver(Observer underlying, Scheduler scheduler) { this.underlying = underlying; this.scheduler = scheduler; @@ -46,47 +41,38 @@ public void onError(final Exception e) { } @Override - public void onNext(final T args) { - enqueue(new Notification(args)); + public void onNext(final T v) { + enqueue(new Notification(v)); } - private void enqueue(Notification notification) { - int count = counter.getAndIncrement(); - - queue.offer(notification); + private void enqueue(final Notification notification) { - if (count == 0) { - processQueue(); - } - } - - private void processQueue() { - scheduler.schedule(new Action0() { + Schedulers.currentThread().schedule(new Action0() { @Override public void call() { - Notification not = queue.poll(); - - switch (not.getKind()) { - case OnNext: - underlying.onNext(not.getValue()); - break; - case OnError: - underlying.onError(not.getException()); - break; - case OnCompleted: - underlying.onCompleted(); - break; - default: - throw new IllegalStateException("Unknown kind of notification " + not); - - } - - int count = counter.decrementAndGet(); - if (count > 0) { - scheduler.schedule(this); - } + scheduler.schedule(new Action0() { + @Override + public void call() { + switch (notification.getKind()) { + case OnNext: + underlying.onNext(notification.getValue()); + break; + case OnError: + underlying.onError(notification.getException()); + break; + case OnCompleted: + underlying.onCompleted(); + break; + default: + throw new IllegalStateException("Unknown kind of notification " + notification); + + } + } + }); } + }); - } + }; + }