diff --git a/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java index b4fb19fa..da509969 100755 --- a/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java @@ -94,6 +94,7 @@ protected final boolean casConsumerIndex(long expect, long newValue) { * field of the struct. There is a further alternative in the experimental project which uses iteration phase * markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as * well as this implementation.
+ * * Tradeoffs to keep in mind: *
    *
  1. Padding for false sharing: counter fields and queue fields are all padded as well as either side of @@ -110,6 +111,8 @@ protected final boolean casConsumerIndex(long expect, long newValue) { public class MpmcArrayQueue extends MpmcArrayQueueConsumerField implements QueueProgressIndicators { long p01, p02, p03, p04, p05, p06, p07; long p10, p11, p12, p13, p14, p15, p16, p17; + final static int RECOMENDED_POLL_BATCH = Runtime.getRuntime().availableProcessors() * 4; + final static int RECOMENDED_OFFER_BATCH = Runtime.getRuntime().availableProcessors() * 4; public MpmcArrayQueue(final int capacity) { super(validateCapacity(capacity)); } @@ -122,49 +125,35 @@ private static int validateCapacity(int capacity) { @Override public boolean offer(final E e) { - if (null == e) { - throw new NullPointerException("Null is not a valid element"); - } - - // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; final long capacity = mask + 1; final long[] sBuffer = sequenceBuffer; - long currentProducerIndex; + + long pIndex; long seqOffset; + long seq; long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it - while (true) { - currentProducerIndex = lvProducerIndex(); // LoadLoad - seqOffset = calcSequenceOffset(currentProducerIndex, mask); - final long seq = lvSequence(sBuffer, seqOffset); // LoadLoad - final long delta = seq - currentProducerIndex; - - if (delta == 0) { - // this is expected if we see this first time around - if (casProducerIndex(currentProducerIndex, currentProducerIndex + 1)) { - // Successful CAS: full barrier - break; + do { + pIndex = lvProducerIndex(); + seqOffset = calcSequenceOffset(pIndex, mask); + seq = lvSequence(sBuffer, seqOffset); + if (seq < pIndex) { // consumer has not moved this value forward + if (pIndex - capacity <= cIndex && // test against cached cIndex + pIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex + // Extra check required to ensure [Queue.offer == false iff queue is full] + return false; + } else { + seq = pIndex + 1; // hack to make it go around again } - // failed cas, retry 1 - } else if (delta < 0 && // poll has not moved this value forward - currentProducerIndex - capacity <= cIndex && // test against cached cIndex - currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex - // Extra check required to ensure [Queue.offer == false iff queue is full] - return false; } + } while (seq > pIndex || // another producer has moved the sequence + !casProducerIndex(pIndex, pIndex + 1)); // failed to increment - // another producer has moved the sequence by one, retry 2 - } - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long elementOffset = calcElementOffset(currentProducerIndex, mask); - spElement(buffer, elementOffset, e); - - // increment sequence by 1, the value expected by consumer - // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, seqOffset, currentProducerIndex + 1); // StoreStore - + assert null == lpElement(buffer, calcElementOffset(pIndex, mask)); + spElement(buffer, calcElementOffset(pIndex, mask), e); + soSequence(sBuffer, seqOffset, pIndex + 1); // seq++; return true; + } /** @@ -176,56 +165,49 @@ public boolean offer(final E e) { @Override public E poll() { // local load of field to avoid repeated loads after volatile reads - final long[] lSequenceBuffer = sequenceBuffer; + final long[] sBuffer = sequenceBuffer; final long mask = this.mask; - long currentConsumerIndex; + + long cIndex; + long seq; long seqOffset; + long expectedSeq; long pIndex = -1; // start with bogus value, hope we don't need it - while (true) { - currentConsumerIndex = lvConsumerIndex();// LoadLoad - seqOffset = calcSequenceOffset(currentConsumerIndex, mask); - final long seq = lvSequence(lSequenceBuffer, seqOffset);// LoadLoad - final long delta = seq - (currentConsumerIndex + 1); - - if (delta == 0) { - if (casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1)) { - // Successful CAS: full barrier - break; + do { + cIndex = lvConsumerIndex(); + seqOffset = calcSequenceOffset(cIndex, mask); + seq = lvSequence(sBuffer, seqOffset); + expectedSeq = cIndex + 1; + if (seq < expectedSeq) { // slot has not been moved by producer + if (cIndex >= pIndex && // test against cached pIndex + cIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must + // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] + return null; + } else { + seq = expectedSeq + 1; // trip another go around } - // failed cas, retry 1 - } else if (delta < 0 && // slot has not been moved by producer - currentConsumerIndex >= pIndex && // test against cached pIndex - currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must - // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] - return null; } - - // another consumer beat us and moved sequence ahead, retry 2 - } - - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(currentConsumerIndex, mask); + } while (seq > expectedSeq || // another consumer beat us to it + !casConsumerIndex(cIndex, cIndex + 1)); // failed the CAS + final long offset = calcElementOffset(cIndex, mask); final E e = lpElement(buffer, offset); + assert e != null; spElement(buffer, offset, null); - - // Move sequence ahead by capacity, preparing it for next offer - // (seeing this value from a consumer will lead to retry 2) - soSequence(lSequenceBuffer, seqOffset, currentConsumerIndex + mask + 1);// StoreStore - + soSequence(sBuffer, seqOffset, cIndex + mask + 1);// i.e. seq += capacity return e; } @Override public E peek() { - long currConsumerIndex; + long cIndex; E e; do { - currConsumerIndex = lvConsumerIndex(); + cIndex = lvConsumerIndex(); // other consumers may have grabbed the element, or queue might be empty - e = lpElement(buffer, calcElementOffset(currConsumerIndex)); + e = lpElement(buffer, calcElementOffset(cIndex)); // only return null if queue is empty - } while (e == null && currConsumerIndex != lvProducerIndex()); + } while (e == null && cIndex != lvProducerIndex()); return e; } @@ -267,82 +249,56 @@ public long currentConsumerIndex() { return lvConsumerIndex(); } - @Override public boolean relaxedOffer(E e) { if (null == e) { throw new NullPointerException("Null is not a valid element"); } - - // local load of field to avoid repeated loads after volatile reads final long mask = this.mask; - final long capacity = mask + 1; final long[] sBuffer = sequenceBuffer; - long currentProducerIndex; + + long pIndex; long seqOffset; - while (true) { - currentProducerIndex = lvProducerIndex(); // LoadLoad - seqOffset = calcSequenceOffset(currentProducerIndex, mask); - final long seq = lvSequence(sBuffer, seqOffset); // LoadLoad - final long delta = seq - currentProducerIndex; - - if (delta == 0) { - // this is expected if we see this first time around - if (casProducerIndex(currentProducerIndex, currentProducerIndex + 1)) { - // Successful CAS: full barrier - break; - } - // failed cas, retry 1 - } else if (delta < 0){ + long seq; + do { + pIndex = lvProducerIndex(); + seqOffset = calcSequenceOffset(pIndex, mask); + seq = lvSequence(sBuffer, seqOffset); + if (seq < pIndex) { // slot not cleared by consumer yet return false; } - // another producer has moved the sequence by one, retry 2 - } - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long elementOffset = calcElementOffset(currentProducerIndex, mask); - spElement(buffer, elementOffset, e); - - // increment sequence by 1, the value expected by consumer - // (seeing this value from a producer will lead to retry 2) - soSequence(sBuffer, seqOffset, currentProducerIndex + 1); // StoreStore + } while (seq > pIndex || // another producer has moved the sequence + !casProducerIndex(pIndex, pIndex + 1)); // failed to increment + spElement(buffer, calcElementOffset(pIndex, mask), e); + soSequence(sBuffer, seqOffset, pIndex + 1); return true; } @Override public E relaxedPoll() { - // local load of field to avoid repeated loads after volatile reads - final long[] lSequenceBuffer = sequenceBuffer; + final long[] sBuffer = sequenceBuffer; final long mask = this.mask; - long currentConsumerIndex; + + long cIndex; long seqOffset; - while (true) { - currentConsumerIndex = lvConsumerIndex();// LoadLoad - seqOffset = calcSequenceOffset(currentConsumerIndex, mask); - final long seq = lvSequence(lSequenceBuffer, seqOffset);// LoadLoad - final long delta = seq - (currentConsumerIndex + 1); - - if (delta == 0) { - if (casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1)) { - // Successful CAS: full barrier - break; - } - // failed cas, retry 1 - } else if (delta < 0) { + long seq; + long expectedSeq; + do { + cIndex = lvConsumerIndex(); + seqOffset = calcSequenceOffset(cIndex, mask); + seq = lvSequence(sBuffer, seqOffset); + expectedSeq = cIndex + 1; + if (seq < expectedSeq) { return null; } - // another consumer beat us and moved sequence ahead, retry 2 - } - // on 64bit(no compressed oops) JVM this is the same as seqOffset - final long offset = calcElementOffset(currentConsumerIndex, mask); + } while (seq > expectedSeq || // another consumer beat us to it + !casConsumerIndex(cIndex, cIndex + 1)); // failed the CAS + final long offset = calcElementOffset(cIndex, mask); final E e = lpElement(buffer, offset); spElement(buffer, offset, null); - - // Move sequence ahead by capacity, preparing it for next offer - // (seeing this value from a consumer will lead to retry 2) - soSequence(lSequenceBuffer, seqOffset, currentConsumerIndex + mask + 1);// StoreStore - + soSequence(sBuffer, seqOffset, cIndex + mask + 1); return e; } @@ -354,22 +310,58 @@ public E relaxedPeek() { @Override public int drain(Consumer c) { - final int limit = capacity(); - return drain(c,limit); + final int capacity = capacity(); + int sum = 0; + while (sum < capacity) { + int drained = 0; + if((drained = drain(c, MpmcArrayQueue.RECOMENDED_POLL_BATCH)) == 0) { + break; + } + sum+=drained; + } + return sum; } @Override public int fill(Supplier s) { - throw new UnsupportedOperationException(); + long result = 0;// result is a long because we want to have a safepoint check at regular intervals + final int capacity = capacity(); + do { + final int filled = fill(s, RECOMENDED_OFFER_BATCH); + if (filled == 0) { + return (int) result; + } + result += filled; + } while (result <= capacity); + return (int) result; } @Override public int drain(Consumer c, int limit) { - for (int i=0;i expectedSeq || // another consumer beat us to it + !casConsumerIndex(cIndex, cIndex + 1)); // failed the CAS + + final long offset = calcElementOffset(cIndex, mask); + final E e = lpElement(buffer, offset); + spElement(buffer, offset, null); + soSequence(sBuffer, seqOffset, cIndex + mask + 1); c.accept(e); } return limit; @@ -377,37 +369,55 @@ public int drain(Consumer c, int limit) { @Override public int fill(Supplier s, int limit) { - throw new UnsupportedOperationException(); + final long[] sBuffer = sequenceBuffer; + final long mask = this.mask; + final E[] buffer = this.buffer; + + long pIndex; + long seqOffset; + long seq; + for (int i = 0; i < limit; i++) { + do { + pIndex = lvProducerIndex(); + seqOffset = calcSequenceOffset(pIndex, mask); + seq = lvSequence(sBuffer, seqOffset); + if (seq < pIndex) { // slot not cleared by consumer yet + return i; + } + } while (seq > pIndex || // another producer has moved the sequence + !casProducerIndex(pIndex, pIndex + 1)); // failed to increment + + spElement(buffer, calcElementOffset(pIndex, mask), s.get()); + soSequence(sBuffer, seqOffset, pIndex + 1); + } + return limit; } @Override public void drain(Consumer c, - WaitStrategy wait, + WaitStrategy w, ExitCondition exit) { int idleCounter = 0; while (exit.keepRunning()) { - E e = relaxedPoll(); - if(e==null){ - idleCounter = wait.idle(idleCounter); + if(drain(c, MpmcArrayQueue.RECOMENDED_POLL_BATCH) == 0) { + idleCounter = w.idle(idleCounter); continue; } idleCounter = 0; - c.accept(e); } } @Override public void fill(Supplier s, - WaitStrategy wait, + WaitStrategy w, ExitCondition exit) { int idleCounter = 0; while (exit.keepRunning()) { - E e = s.get(); - while (!relaxedOffer(e)) { - idleCounter = wait.idle(idleCounter); + if (fill(s, MpmcArrayQueue.RECOMENDED_OFFER_BATCH) == 0) { + idleCounter = w.idle(idleCounter); continue; } - idleCounter = 0; + idleCounter = 0; } } } diff --git a/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java index 78d93c72..3370d4a8 100755 --- a/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java @@ -129,8 +129,6 @@ protected void soConsumerIndex(long l) { * @param */ public class MpscArrayQueue extends MpscArrayQueueConsumerField implements QueueProgressIndicators { - private final static int RECOMENDED_OFFER_BATCH = Runtime.getRuntime().availableProcessors() * 4; - long p01, p02, p03, p04, p05, p06, p07; long p10, p11, p12, p13, p14, p15, p16, p17; public MpscArrayQueue(final int capacity) { @@ -375,7 +373,7 @@ public int fill(Supplier s) { long result = 0;// result is a long because we want to have a safepoint check at regular intervals final int capacity = capacity(); do { - final int filled = fill(s, RECOMENDED_OFFER_BATCH); + final int filled = fill(s, MpmcArrayQueue.RECOMENDED_OFFER_BATCH); if (filled == 0) { return (int) result; } @@ -467,12 +465,12 @@ public void drain(Consumer c, @Override public void fill(Supplier s, - WaitStrategy wait, + WaitStrategy w, ExitCondition exit) { int idleCounter = 0; while (exit.keepRunning()) { - if (fill(s, RECOMENDED_OFFER_BATCH) == 0) { - idleCounter = wait.idle(idleCounter); + if (fill(s, MpmcArrayQueue.RECOMENDED_OFFER_BATCH) == 0) { + idleCounter = w.idle(idleCounter); continue; } idleCounter = 0; diff --git a/jctools-core/src/main/java/org/jctools/queues/SpmcArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/SpmcArrayQueue.java index 151564ae..263f36e5 100644 --- a/jctools-core/src/main/java/org/jctools/queues/SpmcArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/SpmcArrayQueue.java @@ -129,7 +129,6 @@ public SpmcArrayQueueL3Pad(int capacity) { } public class SpmcArrayQueue extends SpmcArrayQueueL3Pad implements QueueProgressIndicators { - private final static int RECOMENDED_POLL_BATCH = Runtime.getRuntime().availableProcessors() * 4; public SpmcArrayQueue(final int capacity) { super(capacity); } @@ -286,7 +285,7 @@ public int drain(final Consumer c) { int sum = 0; while (sum < capacity) { int drained = 0; - if((drained = drain(c, RECOMENDED_POLL_BATCH)) == 0) { + if((drained = drain(c, MpmcArrayQueue.RECOMENDED_POLL_BATCH)) == 0) { break; } sum+=drained; @@ -354,13 +353,13 @@ public void drain(final Consumer c, final WaitStrategy w, final ExitCondition final E[] buffer = this.buffer; final long mask = this.mask; - int counter = 0; + int idleCounter = 0; while (exit.keepRunning()) { - int drained = 0; - if((drained = drain(c, RECOMENDED_POLL_BATCH)) == 0) { - counter = w.idle(counter); + if(drain(c, MpmcArrayQueue.RECOMENDED_POLL_BATCH) == 0) { + idleCounter = w.idle(idleCounter); continue; } + idleCounter = 0; } }