Skip to content

Commit

Permalink
Fix MpscArrayQueue.offerIfBelowThreshold issue (fix #120)
Browse files Browse the repository at this point in the history
  • Loading branch information
nitsanw committed Jun 22, 2016
1 parent 610ffc2 commit 18e18d3
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 17 deletions.
31 changes: 14 additions & 17 deletions jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java
Expand Up @@ -156,38 +156,36 @@ public MpscArrayQueue(final int capacity) {
}

/**
* Attempts {@link MpscArrayQueue#offer(E)}} if and only if the current availability is above the
* threshold.
* {@link MpscArrayQueue#offer(E)}} if {@link MpscArrayQueue#size()} is less than threshold.
*
* @param e
* the object to offer onto the queue.
* @param threshold
* as absolute number to be compared to the current queue size.
* @return whether the offer is successful.
* @param e the object to offer onto the queue.
* @param threshold the minimum number of available slots.
* @return true if the offer is successful, false if queue size exceeds threshold.
* @since 1.0.1
*/
public boolean offerIfBelowTheshold(final E e, int threshold) {
public boolean offerIfBelowThreshold(final E e, int threshold) {
if (null == e) {
throw new NullPointerException();
}
// use a cached view on consumer index (potentially updated in loop)
final long mask = this.mask;
final long capacity = mask + 1;

long producerLimit = lvProducerLimit(); // LoadLoad
long pIndex;
do {
pIndex = lvProducerIndex(); // LoadLoad

long available = producerLimit - pIndex;
if (available >= threshold) {
long size = capacity - available;
if (size >= threshold) {

This comment has been minimized.

Copy link
@philipa

philipa Jun 22, 2016

Contributor

The threshold is the "minimum number of available slots", so shouldn't this comparison be based on the available slots, not the current size?

This comment has been minimized.

Copy link
@nitsanw

nitsanw Jun 22, 2016

Author Contributor

see later fix to javadoc

This comment has been minimized.

Copy link
@philipa

philipa Jun 22, 2016

Contributor

D'oh - caught be the same trap!

final long cIndex = lvConsumerIndex(); // LoadLoad
final long capacity = mask + 1;
final long wrapPoint = pIndex - threshold;
if (cIndex <= wrapPoint) {
return false; // FULL :(
size = pIndex - cIndex;
if (size >= threshold) {
return false; // the size exceeds threshold
}
else {
// update producer limit to the next index that we must recheck the consumer index
producerLimit = cIndex + capacity;

// this is racy, but the race is benign
soProducerLimit(producerLimit);
}
Expand Down Expand Up @@ -254,8 +252,7 @@ public boolean offer(final E e) {
/**
* A wait free alternative to offer which fails on CAS failure.
*
* @param e
* new element, not null
* @param e new element, not null
* @return 1 if next element cannot be filled, -1 if CAS failed, 0 if successful
*/
public final int failFastOffer(final E e) {
Expand Down
@@ -0,0 +1,41 @@
package org.jctools.queues;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Queue;

import org.jctools.queues.spec.ConcurrentQueueSpec;
import org.jctools.queues.spec.Ordering;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class MpscOfferBelowThresholdQueueSanityTest extends QueueSanityTest {
/**
* This allows us to test the offersIfBelowThreshold through all the offer utilizing threads. The effect should be
* as if the queue capacity is halved.
*/

This comment has been minimized.

Copy link
@philipa

philipa Jun 22, 2016

Contributor

Maybe also add a test that checks offerIfBelowThreshold(e, size) behaves like offer(e) ?

This comment has been minimized.

Copy link
@nitsanw

nitsanw Jun 22, 2016

Author Contributor

See later commit, I've used several thresholds. The threshold serves as the observed size.

static class MpscArrayQueueOverride<E> extends MpscArrayQueue<E> {
int threshold;
public MpscArrayQueueOverride(int capacity) {
super(capacity);
threshold = capacity()/2;
}
@Override
public boolean offer(E e) {
return super.offerIfBelowThreshold(e, threshold);
}
}

@Parameterized.Parameters
public static Collection<Object[]> parameters() {
ArrayList<Object[]> list = new ArrayList<Object[]>();
list.add(makeQueue(0, 1, 8, Ordering.FIFO, new MpscArrayQueueOverride<Integer>(16)));
return list;
}

public MpscOfferBelowThresholdQueueSanityTest(ConcurrentQueueSpec spec, Queue<Integer> queue) {
super(spec, queue);
}

}

0 comments on commit 18e18d3

Please sign in to comment.