From 658ab3a05661c3281bc4d78a7e81eec6f00c1d40 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 23 Dec 2014 21:01:55 -0800 Subject: [PATCH] RxRingBuffer with Inline Release Can release when ... - terminal event emitted - unsubscribe/release occurs and queue is empty - unsubscribe/release has occurred and poll/peek occurs Can result in a queue not being put back in the pool if an early unsubscribe occurs from a different thread and causes the poll/peek to not trigger the release. In this case GC will reclaim the pool so it still functions, it just misses the pooling optimization. There is a commented out test of using finalize(). It works as a safety net for the edge case, but at the cost of increased GC time. --- .../java/rx/internal/util/RxRingBuffer.java | 72 ++++++++++++++++--- 1 file changed, 62 insertions(+), 10 deletions(-) diff --git a/src/main/java/rx/internal/util/RxRingBuffer.java b/src/main/java/rx/internal/util/RxRingBuffer.java index 389793c1c3..c4268faa05 100644 --- a/src/main/java/rx/internal/util/RxRingBuffer.java +++ b/src/main/java/rx/internal/util/RxRingBuffer.java @@ -293,7 +293,9 @@ protected SpmcArrayQueue createObject() { } }; - + + private volatile boolean released = false; + private RxRingBuffer(Queue queue, int size) { this.queue = queue; this.pool = null; @@ -307,7 +309,19 @@ private RxRingBuffer(ObjectPool> pool, int size) { } public void release() { - if (pool != null) { + released = true; + // if the queue was not used it will not be null and will not have already returned + // itself to the pool so we want to reclaim immediately + if(queue != null && queue.isEmpty()) { + returnQueueToPool(); + } + } + + /** + * We only release once a terminal event is emitted. + */ + private void returnQueueToPool() { + if (pool != null && queue != null) { Queue q = queue; q.clear(); queue = null; @@ -331,7 +345,8 @@ public void unsubscribe() { * if more onNext are sent than have been requested */ public void onNext(Object o) throws MissingBackpressureException { - if (queue == null) { + if (released) { + // System.out.println("onNext: " + o); throw new IllegalStateException("This instance has been unsubscribed and the queue is no longer usable."); } if (!queue.offer(on.next(o))) { @@ -362,21 +377,33 @@ public int capacity() { } public int count() { - if (queue == null) { + if (released) { + return 0; + } + try { + return queue.size(); + } catch (NullPointerException npe) { + // if a race occurs between the if check and it actually being released we'll just default to 0 return 0; } - return queue.size(); } public boolean isEmpty() { - if (queue == null) { + if (released) { + return true; + } + try { + return queue.isEmpty(); + } catch (NullPointerException npe) { + // if a race occurs between the if check and it actually being released we'll just default to 0 return true; } - return queue.isEmpty(); } public Object poll() { - if (queue == null) { + if (released) { + // return to pool if not already done + returnQueueToPool(); // we are unsubscribed and have released the undelrying queue return null; } @@ -394,16 +421,25 @@ public Object poll() { * a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it * is currently the way it is. */ + boolean shouldRelease = false; if (o == null && terminalState != null && queue.isEmpty()) { o = terminalState; // once emitted we clear so a poll loop will finish terminalState = null; + shouldRelease = true; + } + if (shouldRelease) { + // we emitted a terminal event so release resources + release(); + returnQueueToPool(); } return o; } public Object peek() { - if (queue == null) { + if (released) { + // return to pool if not already done + returnQueueToPool(); // we are unsubscribed and have released the undelrying queue return null; } @@ -438,7 +474,23 @@ public Throwable asError(Object o) { @Override public boolean isUnsubscribed() { - return queue == null; + return released || queue == null; } + + + /** + * Experimenting with finalize. This is an optional safety net, but it comes at a high cost. + * In a 1 minute test with Flight Recorder it increases GC time from 4ms to 28ms. + */ +// @Override +// protected void finalize() throws Throwable { +// try { +// if(queue != null) { +// returnQueueToPool(); +// } +// } finally { +// super.finalize(); +// } +// } }