Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 62 additions & 10 deletions src/main/java/rx/internal/util/RxRingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ protected SpmcArrayQueue<Object> createObject() {
}

};


private volatile boolean released = false;

private RxRingBuffer(Queue<Object> queue, int size) {
this.queue = queue;
this.pool = null;
Expand All @@ -307,7 +309,19 @@ private RxRingBuffer(ObjectPool<Queue<Object>> pool, int size) {
}

public void release() {
if (pool != null) {
released = true;
// if the queue was not used it will not be null and will not have already returned
// itself to the pool so we want to reclaim immediately
if(queue != null && queue.isEmpty()) {
returnQueueToPool();
}
}

/**
* We only release once a terminal event is emitted.
*/
private void returnQueueToPool() {
if (pool != null && queue != null) {
Queue<Object> q = queue;
q.clear();
queue = null;
Expand All @@ -331,7 +345,8 @@ public void unsubscribe() {
* if more onNext are sent than have been requested
*/
public void onNext(Object o) throws MissingBackpressureException {
if (queue == null) {
if (released) {
// System.out.println("onNext: " + o);
throw new IllegalStateException("This instance has been unsubscribed and the queue is no longer usable.");
}
if (!queue.offer(on.next(o))) {
Expand Down Expand Up @@ -362,21 +377,33 @@ public int capacity() {
}

public int count() {
if (queue == null) {
if (released) {
return 0;
}
try {
return queue.size();
} catch (NullPointerException npe) {
// if a race occurs between the if check and it actually being released we'll just default to 0
return 0;
}
return queue.size();
}

public boolean isEmpty() {
if (queue == null) {
if (released) {
return true;
}
try {
return queue.isEmpty();
} catch (NullPointerException npe) {
// if a race occurs between the if check and it actually being released we'll just default to 0
return true;
}
return queue.isEmpty();
}

public Object poll() {
if (queue == null) {
if (released) {
// return to pool if not already done
returnQueueToPool();
// we are unsubscribed and have released the undelrying queue
return null;
}
Expand All @@ -394,16 +421,25 @@ public Object poll() {
* a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it
* is currently the way it is.
*/
boolean shouldRelease = false;
if (o == null && terminalState != null && queue.isEmpty()) {
o = terminalState;
// once emitted we clear so a poll loop will finish
terminalState = null;
shouldRelease = true;
}
if (shouldRelease) {
// we emitted a terminal event so release resources
release();
returnQueueToPool();
}
return o;
}

public Object peek() {
if (queue == null) {
if (released) {
// return to pool if not already done
returnQueueToPool();
// we are unsubscribed and have released the undelrying queue
return null;
}
Expand Down Expand Up @@ -438,7 +474,23 @@ public Throwable asError(Object o) {

@Override
public boolean isUnsubscribed() {
return queue == null;
return released || queue == null;
}


/**
* Experimenting with finalize. This is an optional safety net, but it comes at a high cost.
* In a 1 minute test with Flight Recorder it increases GC time from 4ms to 28ms.
*/
// @Override
// protected void finalize() throws Throwable {
// try {
// if(queue != null) {
// returnQueueToPool();
// }
// } finally {
// super.finalize();
// }
// }

}