Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 108 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.operators;

import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;

import rx.Scheduler;
Expand All @@ -24,7 +25,6 @@
import rx.schedulers.TestScheduler;
import rx.schedulers.TrampolineScheduler;
import rx.subscriptions.Subscriptions;
import rx.util.InterruptibleBlockingQueue;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

Expand Down Expand Up @@ -112,7 +112,7 @@ private class ObserveOnSubscriber extends Subscriber<T> {
final Subscriber<? super T> observer;
private volatile Scheduler.Inner recursiveScheduler;

private final InterruptibleBlockingQueue queue = new InterruptibleBlockingQueue(bufferSize);
private final InterruptibleBlockingQueue<Object> queue = new InterruptibleBlockingQueue<Object>(bufferSize);
final AtomicLong counter = new AtomicLong(0);

public ObserveOnSubscriber(Subscriber<? super T> observer) {
Expand Down Expand Up @@ -220,4 +220,110 @@ private void pollQueue() {

}

/**
* Single-producer-single-consumer queue (only thread-safe for 1 producer thread with 1 consumer thread).
*
* This supports an interrupt() being called externally rather than needing to interrupt the thread. This allows
* unsubscribe behavior when this queue is being used.
*
* @param <E>
*/
private static class InterruptibleBlockingQueue<E> {

private final Semaphore semaphore;
private volatile boolean interrupted = false;

private final E[] buffer;

private AtomicLong tail = new AtomicLong();
private AtomicLong head = new AtomicLong();
private final int capacity;
private final int mask;

@SuppressWarnings("unchecked")
public InterruptibleBlockingQueue(final int size) {
this.semaphore = new Semaphore(size);
this.capacity = size;
this.mask = size - 1;
buffer = (E[]) new Object[size];
}

/**
* Used to unsubscribe and interrupt the producer if blocked in put()
*/
public void interrupt() {
interrupted = true;
semaphore.release();
}

public void addBlocking(final E e) throws InterruptedException {
if (interrupted) {
throw new InterruptedException("Interrupted by Unsubscribe");
}
semaphore.acquire();
if (interrupted) {
throw new InterruptedException("Interrupted by Unsubscribe");
}
if (e == null) {
throw new IllegalArgumentException("Can not put null");
}

if (offer(e)) {
return;
} else {
throw new IllegalStateException("Queue is full");
}
}

private boolean offer(final E e) {
final long _t = tail.get();
if (_t - head.get() == capacity) {
// queue is full
return false;
}
int index = (int) (_t & mask);
buffer[index] = e;
// move the tail forward
tail.lazySet(_t + 1);

return true;
}

public E poll() {
if (interrupted) {
return null;
}
final long _h = head.get();
if (tail.get() == _h) {
// nothing available
return null;
}
int index = (int) (_h & mask);

// fetch the item
E v = buffer[index];
// allow GC to happen
buffer[index] = null;
// increment and signal we're done
head.lazySet(_h + 1);
if (v != null) {
semaphore.release();
}
return v;
}

public int size()
{
int size;
do
{
final long currentHead = head.get();
final long currentTail = tail.get();
size = (int) (currentTail - currentHead);
} while (size > buffer.length);

return size;
}

}
}
111 changes: 0 additions & 111 deletions rxjava-core/src/main/java/rx/util/InterruptibleBlockingQueue.java

This file was deleted.