From 18e18d38db17be73c8ac693bd770d631ec61374d Mon Sep 17 00:00:00 2001 From: nitsanw Date: Wed, 22 Jun 2016 14:00:32 +0200 Subject: [PATCH] Fix MpscArrayQueue.offerIfBelowThreshold issue (fix #120) --- .../org/jctools/queues/MpscArrayQueue.java | 31 +++++++------- ...pscOfferBelowThresholdQueueSanityTest.java | 41 +++++++++++++++++++ 2 files changed, 55 insertions(+), 17 deletions(-) create mode 100644 jctools-core/src/test/java/org/jctools/queues/MpscOfferBelowThresholdQueueSanityTest.java diff --git a/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java index d4b0faad..4937c114 100755 --- a/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java @@ -156,38 +156,36 @@ public MpscArrayQueue(final int capacity) { } /** - * Attempts {@link MpscArrayQueue#offer(E)}} if and only if the current availability is above the - * threshold. + * {@link MpscArrayQueue#offer(E)}} if {@link MpscArrayQueue#size()} is less than threshold. * - * @param e - * the object to offer onto the queue. - * @param threshold - * as absolute number to be compared to the current queue size. - * @return whether the offer is successful. + * @param e the object to offer onto the queue. + * @param threshold the minimum number of available slots. + * @return true if the offer is successful, false if queue size exceeds threshold. * @since 1.0.1 */ - public boolean offerIfBelowTheshold(final E e, int threshold) { + public boolean offerIfBelowThreshold(final E e, int threshold) { if (null == e) { throw new NullPointerException(); } - // use a cached view on consumer index (potentially updated in loop) final long mask = this.mask; + final long capacity = mask + 1; + long producerLimit = lvProducerLimit(); // LoadLoad long pIndex; do { pIndex = lvProducerIndex(); // LoadLoad - long available = producerLimit - pIndex; - if (available >= threshold) { + long size = capacity - available; + if (size >= threshold) { final long cIndex = lvConsumerIndex(); // LoadLoad - final long capacity = mask + 1; - final long wrapPoint = pIndex - threshold; - if (cIndex <= wrapPoint) { - return false; // FULL :( + size = pIndex - cIndex; + if (size >= threshold) { + return false; // the size exceeds threshold } else { // update producer limit to the next index that we must recheck the consumer index producerLimit = cIndex + capacity; + // this is racy, but the race is benign soProducerLimit(producerLimit); } @@ -254,8 +252,7 @@ public boolean offer(final E e) { /** * A wait free alternative to offer which fails on CAS failure. * - * @param e - * new element, not null + * @param e new element, not null * @return 1 if next element cannot be filled, -1 if CAS failed, 0 if successful */ public final int failFastOffer(final E e) { diff --git a/jctools-core/src/test/java/org/jctools/queues/MpscOfferBelowThresholdQueueSanityTest.java b/jctools-core/src/test/java/org/jctools/queues/MpscOfferBelowThresholdQueueSanityTest.java new file mode 100644 index 00000000..374962ba --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/MpscOfferBelowThresholdQueueSanityTest.java @@ -0,0 +1,41 @@ +package org.jctools.queues; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Queue; + +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class MpscOfferBelowThresholdQueueSanityTest extends QueueSanityTest { + /** + * This allows us to test the offersIfBelowThreshold through all the offer utilizing threads. The effect should be + * as if the queue capacity is halved. + */ + static class MpscArrayQueueOverride extends MpscArrayQueue { + int threshold; + public MpscArrayQueueOverride(int capacity) { + super(capacity); + threshold = capacity()/2; + } + @Override + public boolean offer(E e) { + return super.offerIfBelowThreshold(e, threshold); + } + } + + @Parameterized.Parameters + public static Collection parameters() { + ArrayList list = new ArrayList(); + list.add(makeQueue(0, 1, 8, Ordering.FIFO, new MpscArrayQueueOverride(16))); + return list; + } + + public MpscOfferBelowThresholdQueueSanityTest(ConcurrentQueueSpec spec, Queue queue) { + super(spec, queue); + } + +}