diff --git a/jctools-core/src/main/java/org/jctools/queues/MpmcUnboundedXaddArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpmcUnboundedXaddArrayQueue.java index f0839d8f..b7dafb7f 100644 --- a/jctools-core/src/main/java/org/jctools/queues/MpmcUnboundedXaddArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpmcUnboundedXaddArrayQueue.java @@ -25,6 +25,7 @@ import static org.jctools.util.UnsafeAccess.UNSAFE; import static org.jctools.util.UnsafeAccess.fieldOffset; +import static org.jctools.util.UnsafeRefArrayAccess.lpElement; abstract class MpmcProgressiveChunkedQueuePad1 extends AbstractQueue implements IndexedQueue @@ -437,20 +438,38 @@ private static E spinForElement(AtomicChunk chunk, int offset) return e; } - private void rotateConsumerBuffer(AtomicChunk consumerBuffer, AtomicChunk next) + private E rotateConsumerBuffer(AtomicChunk consumerBuffer, AtomicChunk next, int consumerOffset, long expectedChunkIndex) { + while (next == null) + { + next = consumerBuffer.lvNext(); + } + //prevent other consumers to use it, but need to await next != null + //or the producer won't be able to append next on a NIL_CHUNK_INDEX! + consumerBuffer.soIndex(AtomicChunk.NIL_CHUNK_INDEX); + //we can freely spin awaiting producer, because we are the only one in charge to + //rotate the consumer buffer and use next + final E e = spinForElement(next, consumerOffset); + final boolean pooled = next.isPooled(); + if (pooled) + { + while (next.lvSequence(consumerOffset) != expectedChunkIndex) + { + + } + } + next.soElement(consumerOffset, null); next.spPrev(null); //save from nepotism consumerBuffer.spNext(null); - //prevent other consumers to use it - consumerBuffer.soIndex(AtomicChunk.NIL_CHUNK_INDEX); if (consumerBuffer.isPooled()) { - final boolean pooled = freeBuffer.offer(consumerBuffer); - assert pooled; + final boolean offered = freeBuffer.offer(consumerBuffer); + assert offered; } //expose next to the other consumers soConsumerBuffer(next); + return e; } @Override @@ -465,6 +484,7 @@ public E poll() boolean firstElementOfNewChunk; E e = null; AtomicChunk next = null; + long pIndex = -1; // start with bogus value, hope we don't need it long chunkIndex; do { @@ -477,6 +497,12 @@ public E poll() { next = consumerBuffer.lvNext(); final long expectedChunkIndex = chunkIndex - 1; + //we don't care about < or >, because if: + //- consumerBuffer::index < expectedChunkIndex: another consumer has rotated consumerBuffer, + // but not reused (yet, if possible) + //- consumerBuffer::index > expectedChunkIndex: another consumer has rotated consumerBuffer, + // that has been pooled and reused again or + //In both cases we have a stale view of the world with a not reliable next value. if (expectedChunkIndex != consumerBuffer.lvIndex()) { //another consumer has already rotated the consumer buffer or is yet to rotating it @@ -484,56 +510,61 @@ public E poll() } if (next == null) { - if (lvProducerIndex() == consumerIndex) - { + if (consumerIndex >= pIndex && // test against cached pIndex + consumerIndex == (pIndex = lvProducerIndex())) + { // update pIndex if we must + // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] return null; } - //if another consumer rotate consumerBuffer, its chunkIndex will change - while ((next = consumerBuffer.lvNext()) == null) - { - if (expectedChunkIndex != consumerBuffer.lvIndex()) - { - //another consumer has already rotated the consumer buffer - break; - } - } - if (next == null) - { - continue; - } + //not empty: can attempt the cas } } else { - if (consumerBuffer.isPooled()) + final boolean pooled = consumerBuffer.isPooled(); + e = pooled ? null : consumerBuffer.lvElement(consumerOffset); + final long index = pooled ? consumerBuffer.lvSequence(consumerOffset) : consumerBuffer.lvIndex(); + if (index != chunkIndex) { - if (consumerBuffer.lvSequence(consumerOffset) != chunkIndex) + if (index < chunkIndex) { - if (lvProducerIndex() == consumerIndex) - { + // if pooled: + // a) consumerBuffer::index > chunkIndex: a chunk used in the past or at its first use + // b) consumerBuffer::index < chunkIndex: a rotation is in progress + // c) consumerBuffer::index == chunkIndex: rotation is happened, not yet an element in + // For a) there is no need to check q emptiness, but doing it isn't wrong, + // because the check will fail (ie q isn't empty re consumerIndex): + // consumerBuffer::index > chunkIndex means that others have proceeded consuming new elements. + // For b) and c) the emptiness check is necessary, because if there are no other elements + // poll *must* return null. + // + // if !pooled: + // - the rotation isn't happened yet + if (consumerIndex >= pIndex && // test against cached pIndex + consumerIndex == (pIndex = lvProducerIndex())) + { // update pIndex if we must + // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] return null; } continue; } - } - else - { - e = consumerBuffer.lvElement(consumerOffset); - if (chunkIndex != consumerBuffer.lvIndex()) + else { - //another consumer has already rotated the consumer buffer or is yet to rotating it + //Stale view of the world: retry! continue; } - if (e == null) - { - if (lvProducerIndex() == consumerIndex) - { - return null; - } - //if the buffer is not empty, another consumer could have already - //stolen it, incrementing consumerIndex too: better to check if it has happened - continue; + } + assert index == chunkIndex; + if (!pooled && e == null) + { + if (consumerIndex >= pIndex && // test against cached pIndex + consumerIndex == (pIndex = lvProducerIndex())) + { // update pIndex if we must + // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] + return null; } + //we are awaiting the producer here + continue; } } if (casConsumerIndex(consumerIndex, consumerIndex + 1)) @@ -545,19 +576,7 @@ public E poll() //if we are the firstElementOfNewChunk we need to rotate the consumer buffer if (firstElementOfNewChunk) { - //we can freely spin awaiting producer, because we are the only one in charge to - //rotate the consumer buffer and using next - e = spinForElement(next, consumerOffset); - final boolean pooled = next.isPooled(); - if (pooled) - { - while (next.lvSequence(consumerOffset) != chunkIndex) - { - - } - } - next.soElement(consumerOffset, null); - rotateConsumerBuffer(consumerBuffer, next); + e = rotateConsumerBuffer(consumerBuffer, next, consumerOffset, chunkIndex); } else { @@ -577,74 +596,47 @@ public E peek() { final int chunkMask = this.chunkMask; final int chunkShift = this.chunkShift; + final int chunkSize = chunkMask + 1; long consumerIndex; - AtomicChunk consumerBuffer; - int consumerOffset; - boolean firstElementOfNewChunk; E e; - AtomicChunk next; do { + e = null; consumerIndex = this.lvConsumerIndex(); - consumerBuffer = this.lvConsumerBuffer(); + AtomicChunk consumerBuffer = this.lvConsumerBuffer(); + final int consumerOffset = (int) (consumerIndex & chunkMask); final long chunkIndex = consumerIndex >> chunkShift; - consumerOffset = (int) (consumerIndex & chunkMask); - final int chunkSize = chunkMask + 1; - firstElementOfNewChunk = consumerOffset == 0 && consumerIndex >= chunkSize; + final boolean firstElementOfNewChunk = consumerOffset == 0 && consumerIndex >= chunkSize; if (firstElementOfNewChunk) { + AtomicChunk next = consumerBuffer.lvNext(); final long expectedChunkIndex = chunkIndex - 1; - next = consumerBuffer.lvNext(); if (expectedChunkIndex != consumerBuffer.lvIndex()) { - //another consumer has already rotated the consumer buffer or is yet to rotating it continue; } if (next == null) { - if (lvProducerIndex() == consumerIndex) - { - return null; - } - //if another consumer rotate consumerBuffer, its chunkIndex will change - while ((next = consumerBuffer.lvNext()) == null) - { - if (expectedChunkIndex != consumerBuffer.lvIndex()) - { - //another consumer has already rotated the consumer buffer - break; - } - } - if (next == null) - { - continue; - } + continue; } consumerBuffer = next; } - e = consumerBuffer.lvElement(consumerOffset); - if (e != null) + if (consumerBuffer.isPooled()) { - //validate the element read - if (chunkIndex != consumerBuffer.lvIndex()) + if (consumerBuffer.lvSequence(consumerOffset) != chunkIndex) { - //another consumer has already rotated the consumer buffer or is yet to rotating it continue; } - return e; } - else + e = consumerBuffer.lvElement(consumerOffset); + if (consumerBuffer.lvIndex() != chunkIndex) { - if (lvProducerIndex() == consumerIndex) - { - return null; - } - //if the q is not empty, another consumer could have already - //stolen it, incrementing consumerIndex too: better to check if it has happened + e = null; continue; } } - while (true); + while (e == null && consumerIndex != lvProducerIndex()); + return e; } @Override