Skip to content

Commit

Permalink
Merge pull request #268 from benjchristensen/observeOn-fixes
Browse files Browse the repository at this point in the history
Fix concurrency bug in ScheduledObserver
  • Loading branch information
benjchristensen committed May 10, 2013
2 parents 18b1362 + 1fa6ae3 commit 24b6b37
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.ImmediateScheduler;
import rx.concurrency.Schedulers;
import rx.util.functions.Func1;

Expand All @@ -50,7 +51,12 @@ public ObserveOn(Observable<T> source, Scheduler scheduler) {

@Override
public Subscription call(final Observer<T> observer) {
return source.subscribe(new ScheduledObserver<T>(observer, scheduler));
if (scheduler instanceof ImmediateScheduler) {
// do nothing if we request ImmediateScheduler so we don't invoke overhead
return source.subscribe(observer);
} else {
return source.subscribe(new ScheduledObserver<T>(observer, scheduler));
}
}
}

Expand Down
74 changes: 30 additions & 44 deletions rxjava-core/src/main/java/rx/operators/ScheduledObserver.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2013 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -18,18 +18,13 @@
import rx.Notification;
import rx.Observer;
import rx.Scheduler;
import rx.concurrency.Schedulers;
import rx.util.functions.Action0;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

/* package */class ScheduledObserver<T> implements Observer<T> {
private final Observer<T> underlying;
private final Scheduler scheduler;

private final ConcurrentLinkedQueue<Notification<T>> queue = new ConcurrentLinkedQueue<Notification<T>>();
private final AtomicInteger counter = new AtomicInteger(0);

public ScheduledObserver(Observer<T> underlying, Scheduler scheduler) {
this.underlying = underlying;
this.scheduler = scheduler;
Expand All @@ -46,47 +41,38 @@ public void onError(final Exception e) {
}

@Override
public void onNext(final T args) {
enqueue(new Notification<T>(args));
public void onNext(final T v) {
enqueue(new Notification<T>(v));
}

private void enqueue(Notification<T> notification) {
int count = counter.getAndIncrement();

queue.offer(notification);
private void enqueue(final Notification<T> notification) {

if (count == 0) {
processQueue();
}
}

private void processQueue() {
scheduler.schedule(new Action0() {
Schedulers.currentThread().schedule(new Action0() {
@Override
public void call() {
Notification<T> not = queue.poll();

switch (not.getKind()) {
case OnNext:
underlying.onNext(not.getValue());
break;
case OnError:
underlying.onError(not.getException());
break;
case OnCompleted:
underlying.onCompleted();
break;
default:
throw new IllegalStateException("Unknown kind of notification " + not);

}

int count = counter.decrementAndGet();
if (count > 0) {
scheduler.schedule(this);
}

scheduler.schedule(new Action0() {
@Override
public void call() {
switch (notification.getKind()) {
case OnNext:
underlying.onNext(notification.getValue());
break;
case OnError:
underlying.onError(notification.getException());
break;
case OnCompleted:
underlying.onCompleted();
break;
default:
throw new IllegalStateException("Unknown kind of notification " + notification);

}
}
});
}

});
}
};

}

0 comments on commit 24b6b37

Please sign in to comment.