(actualCapacity);
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.getClass().getName();
+ }
+
+ @Override
+ public void clear()
+ {
+ while (poll() != null)
+ {
+ // toss it away
+ }
+ }
+
+ @Override
+ public final int capacity()
+ {
+ return (int) (mask + 1);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ */
+ @Override
+ public final int size()
+ {
+ return IndexedQueueSizeUtil.size(this, IndexedQueueSizeUtil.PLAIN_DIVISOR);
+ }
+
+ @Override
+ public final boolean isEmpty()
+ {
+ return IndexedQueueSizeUtil.isEmpty(this);
+ }
+
+ @Override
+ public final long currentProducerIndex()
+ {
+ return lvProducerIndex();
+ }
+
+ @Override
+ public final long currentConsumerIndex()
+ {
+ return lvConsumerIndex();
+ }
+
+ /**
+ * Get an iterator for this queue. This method is thread safe.
+ *
+ * The iterator provides a best-effort snapshot of the elements in the queue.
+ * The returned iterator is not guaranteed to return elements in queue order,
+ * and races with the consumer thread may cause gaps in the sequence of returned elements.
+ * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements.
+ *
+ * @return The iterator.
+ */
+ @Override
+ public final Iterator iterator() {
+ final long cIndex = lvConsumerIndex();
+ final long pIndex = lvProducerIndex();
+
+ return new WeakIterator(cIndex, pIndex, mask, buffer);
+ }
+
+ private static class WeakIterator implements Iterator {
+
+ private final long pIndex;
+ private final int mask;
+ private final AtomicReferenceArray buffer;
+ private long nextIndex;
+ private E nextElement;
+
+ WeakIterator(long cIndex, long pIndex, int mask, AtomicReferenceArray buffer) {
+ this.nextIndex = cIndex;
+ this.pIndex = pIndex;
+ this.mask = mask;
+ this.buffer = buffer;
+ nextElement = getNext();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextElement != null;
+ }
+
+ @Override
+ public E next() {
+ final E e = nextElement;
+ if (e == null)
+ throw new NoSuchElementException();
+ nextElement = getNext();
+ return e;
+ }
+
+ private E getNext() {
+ final int mask = this.mask;
+ final AtomicReferenceArray buffer = this.buffer;
+ while (nextIndex < pIndex) {
+ int offset = calcCircularRefElementOffset(nextIndex++, mask);
+ E e = lvRefElement(buffer, offset);
+ if (e != null) {
+ return e;
+ }
+ }
+ return null;
+ }
+ }
+}
diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/BaseLinkedAtomicUnpaddedQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/BaseLinkedAtomicUnpaddedQueue.java
new file mode 100644
index 00000000..55d0962a
--- /dev/null
+++ b/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/BaseLinkedAtomicUnpaddedQueue.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jctools.queues.atomic.unpadded;
+
+import java.util.AbstractQueue;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.atomic.*;
+import org.jctools.queues.*;
+import static org.jctools.queues.atomic.unpadded.AtomicQueueUtil.*;
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseLinkedQueue.java.
+ */
+abstract class BaseLinkedAtomicUnpaddedQueuePad0 extends AbstractQueue implements MessagePassingQueue {
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseLinkedQueue.java.
+ */
+abstract class BaseLinkedAtomicUnpaddedQueueProducerNodeRef extends BaseLinkedAtomicUnpaddedQueuePad0 {
+
+ private static final AtomicReferenceFieldUpdater P_NODE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(BaseLinkedAtomicUnpaddedQueueProducerNodeRef.class, LinkedQueueAtomicNode.class, "producerNode");
+
+ private volatile LinkedQueueAtomicNode producerNode;
+
+ final void spProducerNode(LinkedQueueAtomicNode newValue) {
+ P_NODE_UPDATER.lazySet(this, newValue);
+ }
+
+ final void soProducerNode(LinkedQueueAtomicNode newValue) {
+ P_NODE_UPDATER.lazySet(this, newValue);
+ }
+
+ final LinkedQueueAtomicNode lvProducerNode() {
+ return producerNode;
+ }
+
+ final boolean casProducerNode(LinkedQueueAtomicNode expect, LinkedQueueAtomicNode newValue) {
+ return P_NODE_UPDATER.compareAndSet(this, expect, newValue);
+ }
+
+ final LinkedQueueAtomicNode lpProducerNode() {
+ return producerNode;
+ }
+
+ protected final LinkedQueueAtomicNode xchgProducerNode(LinkedQueueAtomicNode newValue) {
+ return P_NODE_UPDATER.getAndSet(this, newValue);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseLinkedQueue.java.
+ */
+abstract class BaseLinkedAtomicUnpaddedQueuePad1 extends BaseLinkedAtomicUnpaddedQueueProducerNodeRef {
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseLinkedQueue.java.
+ */
+abstract class BaseLinkedAtomicUnpaddedQueueConsumerNodeRef extends BaseLinkedAtomicUnpaddedQueuePad1 {
+
+ private static final AtomicReferenceFieldUpdater C_NODE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(BaseLinkedAtomicUnpaddedQueueConsumerNodeRef.class, LinkedQueueAtomicNode.class, "consumerNode");
+
+ private volatile LinkedQueueAtomicNode consumerNode;
+
+ final void spConsumerNode(LinkedQueueAtomicNode newValue) {
+ C_NODE_UPDATER.lazySet(this, newValue);
+ }
+
+ @SuppressWarnings("unchecked")
+ final LinkedQueueAtomicNode lvConsumerNode() {
+ return consumerNode;
+ }
+
+ final LinkedQueueAtomicNode lpConsumerNode() {
+ return consumerNode;
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseLinkedQueue.java.
+ */
+abstract class BaseLinkedAtomicUnpaddedQueuePad2 extends BaseLinkedAtomicUnpaddedQueueConsumerNodeRef {
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseLinkedQueue.java.
+ *
+ * A base data structure for concurrent linked queues. For convenience also pulled in common single consumer
+ * methods since at this time there's no plan to implement MC.
+ */
+abstract class BaseLinkedAtomicUnpaddedQueue extends BaseLinkedAtomicUnpaddedQueuePad2 {
+
+ @Override
+ public final Iterator iterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getName();
+ }
+
+ protected final LinkedQueueAtomicNode newNode() {
+ return new LinkedQueueAtomicNode();
+ }
+
+ protected final LinkedQueueAtomicNode newNode(E e) {
+ return new LinkedQueueAtomicNode(e);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * IMPLEMENTATION NOTES:
+ * This is an O(n) operation as we run through all the nodes and count them.
+ * The accuracy of the value returned by this method is subject to races with producer/consumer threads. In
+ * particular when racing with the consumer thread this method may under estimate the size.
+ *
+ * @see java.util.Queue#size()
+ */
+ @Override
+ public final int size() {
+ // Read consumer first, this is important because if the producer is node is 'older' than the consumer
+ // the consumer may overtake it (consume past it) invalidating the 'snapshot' notion of size.
+ LinkedQueueAtomicNode chaserNode = lvConsumerNode();
+ LinkedQueueAtomicNode producerNode = lvProducerNode();
+ int size = 0;
+ // must chase the nodes all the way to the producer node, but there's no need to count beyond expected head.
+ while (// don't go passed producer node
+ chaserNode != producerNode && // stop at last node
+ chaserNode != null && // stop at max int
+ size < Integer.MAX_VALUE) {
+ LinkedQueueAtomicNode next;
+ next = chaserNode.lvNext();
+ // check if this node has been consumed, if so return what we have
+ if (next == chaserNode) {
+ return size;
+ }
+ chaserNode = next;
+ size++;
+ }
+ return size;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * IMPLEMENTATION NOTES:
+ * Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to
+ * observe the producerNode.value is null, which also means an empty queue because only the
+ * consumerNode.value is allowed to be null.
+ *
+ * @see MessagePassingQueue#isEmpty()
+ */
+ @Override
+ public boolean isEmpty() {
+ LinkedQueueAtomicNode consumerNode = lvConsumerNode();
+ LinkedQueueAtomicNode producerNode = lvProducerNode();
+ return consumerNode == producerNode;
+ }
+
+ protected E getSingleConsumerNodeValue(LinkedQueueAtomicNode currConsumerNode, LinkedQueueAtomicNode nextNode) {
+ // we have to null out the value because we are going to hang on to the node
+ final E nextValue = nextNode.getAndNullValue();
+ // Fix up the next ref of currConsumerNode to prevent promoted nodes from keeping new ones alive.
+ // We use a reference to self instead of null because null is already a meaningful value (the next of
+ // producer node is null).
+ currConsumerNode.soNext(currConsumerNode);
+ spConsumerNode(nextNode);
+ // currConsumerNode is now no longer referenced and can be collected
+ return nextValue;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * IMPLEMENTATION NOTES:
+ * Poll is allowed from a SINGLE thread.
+ * Poll is potentially blocking here as the {@link Queue#poll()} does not allow returning {@code null} if the queue is not
+ * empty. This is very different from the original Vyukov guarantees. See {@link #relaxedPoll()} for the original
+ * semantics.
+ * Poll reads {@code consumerNode.next} and:
+ *
+ * - If it is {@code null} AND the queue is empty return {@code null}, if queue is not empty spin wait for
+ * value to become visible.
+ *
- If it is not {@code null} set it as the consumer node and return it's now evacuated value.
+ *
+ * This means the consumerNode.value is always {@code null}, which is also the starting point for the queue.
+ * Because {@code null} values are not allowed to be offered this is the only node with it's value set to
+ * {@code null} at any one time.
+ *
+ * @see MessagePassingQueue#poll()
+ * @see java.util.Queue#poll()
+ */
+ @Override
+ public E poll() {
+ final LinkedQueueAtomicNode currConsumerNode = lpConsumerNode();
+ LinkedQueueAtomicNode nextNode = currConsumerNode.lvNext();
+ if (nextNode != null) {
+ return getSingleConsumerNodeValue(currConsumerNode, nextNode);
+ } else if (currConsumerNode != lvProducerNode()) {
+ nextNode = spinWaitForNextNode(currConsumerNode);
+ // got the next node...
+ return getSingleConsumerNodeValue(currConsumerNode, nextNode);
+ }
+ return null;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * IMPLEMENTATION NOTES:
+ * Peek is allowed from a SINGLE thread.
+ * Peek is potentially blocking here as the {@link Queue#peek()} does not allow returning {@code null} if the queue is not
+ * empty. This is very different from the original Vyukov guarantees. See {@link #relaxedPeek()} for the original
+ * semantics.
+ * Poll reads the next node from the consumerNode and:
+ *
+ * - If it is {@code null} AND the queue is empty return {@code null}, if queue is not empty spin wait for
+ * value to become visible.
+ *
- If it is not {@code null} return it's value.
+ *
+ *
+ * @see MessagePassingQueue#peek()
+ * @see java.util.Queue#peek()
+ */
+ @Override
+ public E peek() {
+ final LinkedQueueAtomicNode currConsumerNode = lpConsumerNode();
+ LinkedQueueAtomicNode nextNode = currConsumerNode.lvNext();
+ if (nextNode != null) {
+ return nextNode.lpValue();
+ } else if (currConsumerNode != lvProducerNode()) {
+ nextNode = spinWaitForNextNode(currConsumerNode);
+ // got the next node...
+ return nextNode.lpValue();
+ }
+ return null;
+ }
+
+ LinkedQueueAtomicNode spinWaitForNextNode(LinkedQueueAtomicNode currNode) {
+ LinkedQueueAtomicNode nextNode;
+ while ((nextNode = currNode.lvNext()) == null) {
+ // spin, we are no longer wait free
+ }
+ return nextNode;
+ }
+
+ @Override
+ public E relaxedPoll() {
+ final LinkedQueueAtomicNode currConsumerNode = lpConsumerNode();
+ final LinkedQueueAtomicNode nextNode = currConsumerNode.lvNext();
+ if (nextNode != null) {
+ return getSingleConsumerNodeValue(currConsumerNode, nextNode);
+ }
+ return null;
+ }
+
+ @Override
+ public E relaxedPeek() {
+ final LinkedQueueAtomicNode nextNode = lpConsumerNode().lvNext();
+ if (nextNode != null) {
+ return nextNode.lpValue();
+ }
+ return null;
+ }
+
+ @Override
+ public boolean relaxedOffer(E e) {
+ return offer(e);
+ }
+
+ @Override
+ public int drain(Consumer c, int limit) {
+ if (null == c)
+ throw new IllegalArgumentException("c is null");
+ if (limit < 0)
+ throw new IllegalArgumentException("limit is negative: " + limit);
+ if (limit == 0)
+ return 0;
+ LinkedQueueAtomicNode chaserNode = this.lpConsumerNode();
+ for (int i = 0; i < limit; i++) {
+ final LinkedQueueAtomicNode nextNode = chaserNode.lvNext();
+ if (nextNode == null) {
+ return i;
+ }
+ // we have to null out the value because we are going to hang on to the node
+ final E nextValue = getSingleConsumerNodeValue(chaserNode, nextNode);
+ chaserNode = nextNode;
+ c.accept(nextValue);
+ }
+ return limit;
+ }
+
+ @Override
+ public int drain(Consumer c) {
+ return MessagePassingQueueUtil.drain(this, c);
+ }
+
+ @Override
+ public void drain(Consumer c, WaitStrategy wait, ExitCondition exit) {
+ MessagePassingQueueUtil.drain(this, c, wait, exit);
+ }
+
+ @Override
+ public int capacity() {
+ return UNBOUNDED_CAPACITY;
+ }
+}
diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/BaseMpscLinkedAtomicUnpaddedArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/BaseMpscLinkedAtomicUnpaddedArrayQueue.java
new file mode 100644
index 00000000..f78019ad
--- /dev/null
+++ b/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/BaseMpscLinkedAtomicUnpaddedArrayQueue.java
@@ -0,0 +1,650 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jctools.queues.atomic.unpadded;
+
+import org.jctools.queues.IndexedQueueSizeUtil.IndexedQueue;
+import org.jctools.util.PortableJvmInfo;
+import org.jctools.util.Pow2;
+import org.jctools.util.RangeUtil;
+import java.util.AbstractQueue;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.*;
+import org.jctools.queues.*;
+import static org.jctools.queues.atomic.unpadded.AtomicQueueUtil.*;
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
+ */
+abstract class BaseMpscLinkedAtomicUnpaddedArrayQueuePad1 extends AbstractQueue implements IndexedQueue {
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
+ */
+abstract class BaseMpscLinkedAtomicUnpaddedArrayQueueProducerFields extends BaseMpscLinkedAtomicUnpaddedArrayQueuePad1 {
+
+ private static final AtomicLongFieldUpdater P_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(BaseMpscLinkedAtomicUnpaddedArrayQueueProducerFields.class, "producerIndex");
+
+ private volatile long producerIndex;
+
+ @Override
+ public final long lvProducerIndex() {
+ return producerIndex;
+ }
+
+ final void soProducerIndex(long newValue) {
+ P_INDEX_UPDATER.lazySet(this, newValue);
+ }
+
+ final boolean casProducerIndex(long expect, long newValue) {
+ return P_INDEX_UPDATER.compareAndSet(this, expect, newValue);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
+ */
+abstract class BaseMpscLinkedAtomicUnpaddedArrayQueuePad2 extends BaseMpscLinkedAtomicUnpaddedArrayQueueProducerFields {
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
+ */
+abstract class BaseMpscLinkedAtomicUnpaddedArrayQueueConsumerFields extends BaseMpscLinkedAtomicUnpaddedArrayQueuePad2 {
+
+ private static final AtomicLongFieldUpdater C_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(BaseMpscLinkedAtomicUnpaddedArrayQueueConsumerFields.class, "consumerIndex");
+
+ private volatile long consumerIndex;
+
+ protected long consumerMask;
+
+ protected AtomicReferenceArray consumerBuffer;
+
+ @Override
+ public final long lvConsumerIndex() {
+ return consumerIndex;
+ }
+
+ final long lpConsumerIndex() {
+ return consumerIndex;
+ }
+
+ final void soConsumerIndex(long newValue) {
+ C_INDEX_UPDATER.lazySet(this, newValue);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
+ */
+abstract class BaseMpscLinkedAtomicUnpaddedArrayQueuePad3 extends BaseMpscLinkedAtomicUnpaddedArrayQueueConsumerFields {
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
+ */
+abstract class BaseMpscLinkedAtomicUnpaddedArrayQueueColdProducerFields extends BaseMpscLinkedAtomicUnpaddedArrayQueuePad3 {
+
+ private static final AtomicLongFieldUpdater P_LIMIT_UPDATER = AtomicLongFieldUpdater.newUpdater(BaseMpscLinkedAtomicUnpaddedArrayQueueColdProducerFields.class, "producerLimit");
+
+ private volatile long producerLimit;
+
+ protected long producerMask;
+
+ protected AtomicReferenceArray producerBuffer;
+
+ final long lvProducerLimit() {
+ return producerLimit;
+ }
+
+ final boolean casProducerLimit(long expect, long newValue) {
+ return P_LIMIT_UPDATER.compareAndSet(this, expect, newValue);
+ }
+
+ final void soProducerLimit(long newValue) {
+ P_LIMIT_UPDATER.lazySet(this, newValue);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
+ *
+ * An MPSC array queue which starts at initialCapacity and grows to maxCapacity in linked chunks
+ * of the initial size. The queue grows only when the current buffer is full and elements are not copied on
+ * resize, instead a link to the new buffer is stored in the old buffer for the consumer to follow.
+ */
+abstract class BaseMpscLinkedAtomicUnpaddedArrayQueue extends BaseMpscLinkedAtomicUnpaddedArrayQueueColdProducerFields implements MessagePassingQueue, QueueProgressIndicators {
+
+ // No post padding here, subclasses must add
+ private static final Object JUMP = new Object();
+
+ private static final Object BUFFER_CONSUMED = new Object();
+
+ private static final int CONTINUE_TO_P_INDEX_CAS = 0;
+
+ private static final int RETRY = 1;
+
+ private static final int QUEUE_FULL = 2;
+
+ private static final int QUEUE_RESIZE = 3;
+
+ /**
+ * @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the chunk size.
+ * Must be 2 or more.
+ */
+ public BaseMpscLinkedAtomicUnpaddedArrayQueue(final int initialCapacity) {
+ RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2, "initialCapacity");
+ int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
+ // leave lower bit of mask clear
+ long mask = (p2capacity - 1) << 1;
+ // need extra element to point at next array
+ AtomicReferenceArray buffer = allocateRefArray(p2capacity + 1);
+ producerBuffer = buffer;
+ producerMask = mask;
+ consumerBuffer = buffer;
+ consumerMask = mask;
+ // we know it's all empty to start with
+ soProducerLimit(mask);
+ }
+
+ @Override
+ public int size() {
+ return IndexedQueueSizeUtil.size(this, IndexedQueueSizeUtil.IGNORE_PARITY_DIVISOR);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ // Order matters!
+ // Loading consumer before producer allows for producer increments after consumer index is read.
+ // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
+ // nothing we can do to make this an exact method.
+ return ((lvConsumerIndex() - lvProducerIndex()) / 2 == 0);
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getName();
+ }
+
+ @Override
+ public boolean offer(final E e) {
+ if (null == e) {
+ throw new NullPointerException();
+ }
+ long mask;
+ AtomicReferenceArray buffer;
+ long pIndex;
+ while (true) {
+ long producerLimit = lvProducerLimit();
+ pIndex = lvProducerIndex();
+ // lower bit is indicative of resize, if we see it we spin until it's cleared
+ if ((pIndex & 1) == 1) {
+ continue;
+ }
+ // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)
+ // mask/buffer may get changed by resizing -> only use for array access after successful CAS.
+ mask = this.producerMask;
+ buffer = this.producerBuffer;
+ // a successful CAS ties the ordering, lv(pIndex) - [mask/buffer] -> cas(pIndex)
+ // assumption behind this optimization is that queue is almost always empty or near empty
+ if (producerLimit <= pIndex) {
+ int result = offerSlowPath(mask, pIndex, producerLimit);
+ switch(result) {
+ case CONTINUE_TO_P_INDEX_CAS:
+ break;
+ case RETRY:
+ continue;
+ case QUEUE_FULL:
+ return false;
+ case QUEUE_RESIZE:
+ resize(mask, buffer, pIndex, e, null);
+ return true;
+ }
+ }
+ if (casProducerIndex(pIndex, pIndex + 2)) {
+ break;
+ }
+ }
+ // INDEX visible before ELEMENT
+ final int offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
+ // release element e
+ soRefElement(buffer, offset, e);
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * This implementation is correct for single consumer thread use only.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public E poll() {
+ final AtomicReferenceArray buffer = consumerBuffer;
+ final long cIndex = lpConsumerIndex();
+ final long mask = consumerMask;
+ final int offset = modifiedCalcCircularRefElementOffset(cIndex, mask);
+ Object e = lvRefElement(buffer, offset);
+ if (e == null) {
+ long pIndex = lvProducerIndex();
+ // isEmpty?
+ if ((cIndex - pIndex) / 2 == 0) {
+ return null;
+ }
+ // poll() == null iff queue is empty, null element is not strong enough indicator, so we must
+ // spin until element is visible.
+ do {
+ e = lvRefElement(buffer, offset);
+ } while (e == null);
+ }
+ if (e == JUMP) {
+ final AtomicReferenceArray nextBuffer = nextBuffer(buffer, mask);
+ return newBufferPoll(nextBuffer, cIndex);
+ }
+ // release element null
+ soRefElement(buffer, offset, null);
+ // release cIndex
+ soConsumerIndex(cIndex + 2);
+ return (E) e;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * This implementation is correct for single consumer thread use only.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public E peek() {
+ final AtomicReferenceArray buffer = consumerBuffer;
+ final long cIndex = lpConsumerIndex();
+ final long mask = consumerMask;
+ final int offset = modifiedCalcCircularRefElementOffset(cIndex, mask);
+ Object e = lvRefElement(buffer, offset);
+ if (e == null) {
+ long pIndex = lvProducerIndex();
+ // isEmpty?
+ if ((cIndex - pIndex) / 2 == 0) {
+ return null;
+ }
+ // peek() == null iff queue is empty, null element is not strong enough indicator, so we must
+ // spin until element is visible.
+ do {
+ e = lvRefElement(buffer, offset);
+ } while (e == null);
+ }
+ if (e == JUMP) {
+ return newBufferPeek(nextBuffer(buffer, mask), cIndex);
+ }
+ return (E) e;
+ }
+
+ /**
+ * We do not inline resize into this method because we do not resize on fill.
+ */
+ private int offerSlowPath(long mask, long pIndex, long producerLimit) {
+ final long cIndex = lvConsumerIndex();
+ long bufferCapacity = getCurrentBufferCapacity(mask);
+ if (cIndex + bufferCapacity > pIndex) {
+ if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) {
+ // retry from top
+ return RETRY;
+ } else {
+ // continue to pIndex CAS
+ return CONTINUE_TO_P_INDEX_CAS;
+ }
+ } else // full and cannot grow
+ if (availableInQueue(pIndex, cIndex) <= 0) {
+ // offer should return false;
+ return QUEUE_FULL;
+ } else // grab index for resize -> set lower bit
+ if (casProducerIndex(pIndex, pIndex + 1)) {
+ // trigger a resize
+ return QUEUE_RESIZE;
+ } else {
+ // failed resize attempt, retry from top
+ return RETRY;
+ }
+ }
+
+ /**
+ * @return available elements in queue * 2
+ */
+ protected abstract long availableInQueue(long pIndex, long cIndex);
+
+ @SuppressWarnings("unchecked")
+ private AtomicReferenceArray nextBuffer(final AtomicReferenceArray buffer, final long mask) {
+ final int offset = nextArrayOffset(mask);
+ final AtomicReferenceArray nextBuffer = (AtomicReferenceArray) lvRefElement(buffer, offset);
+ consumerBuffer = nextBuffer;
+ consumerMask = (length(nextBuffer) - 2) << 1;
+ soRefElement(buffer, offset, BUFFER_CONSUMED);
+ return nextBuffer;
+ }
+
+ private static int nextArrayOffset(long mask) {
+ return modifiedCalcCircularRefElementOffset(mask + 2, Long.MAX_VALUE);
+ }
+
+ private E newBufferPoll(AtomicReferenceArray nextBuffer, long cIndex) {
+ final int offset = modifiedCalcCircularRefElementOffset(cIndex, consumerMask);
+ final E n = lvRefElement(nextBuffer, offset);
+ if (n == null) {
+ throw new IllegalStateException("new buffer must have at least one element");
+ }
+ soRefElement(nextBuffer, offset, null);
+ soConsumerIndex(cIndex + 2);
+ return n;
+ }
+
+ private E newBufferPeek(AtomicReferenceArray nextBuffer, long cIndex) {
+ final int offset = modifiedCalcCircularRefElementOffset(cIndex, consumerMask);
+ final E n = lvRefElement(nextBuffer, offset);
+ if (null == n) {
+ throw new IllegalStateException("new buffer must have at least one element");
+ }
+ return n;
+ }
+
+ @Override
+ public long currentProducerIndex() {
+ return lvProducerIndex() / 2;
+ }
+
+ @Override
+ public long currentConsumerIndex() {
+ return lvConsumerIndex() / 2;
+ }
+
+ @Override
+ public abstract int capacity();
+
+ @Override
+ public boolean relaxedOffer(E e) {
+ return offer(e);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public E relaxedPoll() {
+ final AtomicReferenceArray buffer = consumerBuffer;
+ final long cIndex = lpConsumerIndex();
+ final long mask = consumerMask;
+ final int offset = modifiedCalcCircularRefElementOffset(cIndex, mask);
+ Object e = lvRefElement(buffer, offset);
+ if (e == null) {
+ return null;
+ }
+ if (e == JUMP) {
+ final AtomicReferenceArray nextBuffer = nextBuffer(buffer, mask);
+ return newBufferPoll(nextBuffer, cIndex);
+ }
+ soRefElement(buffer, offset, null);
+ soConsumerIndex(cIndex + 2);
+ return (E) e;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public E relaxedPeek() {
+ final AtomicReferenceArray buffer = consumerBuffer;
+ final long cIndex = lpConsumerIndex();
+ final long mask = consumerMask;
+ final int offset = modifiedCalcCircularRefElementOffset(cIndex, mask);
+ Object e = lvRefElement(buffer, offset);
+ if (e == JUMP) {
+ return newBufferPeek(nextBuffer(buffer, mask), cIndex);
+ }
+ return (E) e;
+ }
+
+ @Override
+ public int fill(Supplier s) {
+ // result is a long because we want to have a safepoint check at regular intervals
+ long result = 0;
+ final int capacity = capacity();
+ do {
+ final int filled = fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH);
+ if (filled == 0) {
+ return (int) result;
+ }
+ result += filled;
+ } while (result <= capacity);
+ return (int) result;
+ }
+
+ @Override
+ public int fill(Supplier s, int limit) {
+ if (null == s)
+ throw new IllegalArgumentException("supplier is null");
+ if (limit < 0)
+ throw new IllegalArgumentException("limit is negative:" + limit);
+ if (limit == 0)
+ return 0;
+ long mask;
+ AtomicReferenceArray buffer;
+ long pIndex;
+ int claimedSlots;
+ while (true) {
+ long producerLimit = lvProducerLimit();
+ pIndex = lvProducerIndex();
+ // lower bit is indicative of resize, if we see it we spin until it's cleared
+ if ((pIndex & 1) == 1) {
+ continue;
+ }
+ // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)
+ // NOTE: mask/buffer may get changed by resizing -> only use for array access after successful CAS.
+ // Only by virtue offloading them between the lvProducerIndex and a successful casProducerIndex are they
+ // safe to use.
+ mask = this.producerMask;
+ buffer = this.producerBuffer;
+ // a successful CAS ties the ordering, lv(pIndex) -> [mask/buffer] -> cas(pIndex)
+ // we want 'limit' slots, but will settle for whatever is visible to 'producerLimit'
+ // -> producerLimit >= batchIndex
+ long batchIndex = Math.min(producerLimit, pIndex + 2l * limit);
+ if (pIndex >= producerLimit) {
+ int result = offerSlowPath(mask, pIndex, producerLimit);
+ switch(result) {
+ case CONTINUE_TO_P_INDEX_CAS:
+ // offer slow path verifies only one slot ahead, we cannot rely on indication here
+ case RETRY:
+ continue;
+ case QUEUE_FULL:
+ return 0;
+ case QUEUE_RESIZE:
+ resize(mask, buffer, pIndex, null, s);
+ return 1;
+ }
+ }
+ // claim limit slots at once
+ if (casProducerIndex(pIndex, batchIndex)) {
+ claimedSlots = (int) ((batchIndex - pIndex) / 2);
+ break;
+ }
+ }
+ for (int i = 0; i < claimedSlots; i++) {
+ final int offset = modifiedCalcCircularRefElementOffset(pIndex + 2l * i, mask);
+ soRefElement(buffer, offset, s.get());
+ }
+ return claimedSlots;
+ }
+
+ @Override
+ public void fill(Supplier s, WaitStrategy wait, ExitCondition exit) {
+ MessagePassingQueueUtil.fill(this, s, wait, exit);
+ }
+
+ @Override
+ public int drain(Consumer c) {
+ return drain(c, capacity());
+ }
+
+ @Override
+ public int drain(Consumer c, int limit) {
+ return MessagePassingQueueUtil.drain(this, c, limit);
+ }
+
+ @Override
+ public void drain(Consumer c, WaitStrategy wait, ExitCondition exit) {
+ MessagePassingQueueUtil.drain(this, c, wait, exit);
+ }
+
+ /**
+ * Get an iterator for this queue. This method is thread safe.
+ *
+ * The iterator provides a best-effort snapshot of the elements in the queue.
+ * The returned iterator is not guaranteed to return elements in queue order,
+ * and races with the consumer thread may cause gaps in the sequence of returned elements.
+ * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements.
+ *
+ * @return The iterator.
+ */
+ @Override
+ public Iterator iterator() {
+ return new WeakIterator(consumerBuffer, lvConsumerIndex(), lvProducerIndex());
+ }
+
+ /**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
+ */
+ private static class WeakIterator implements Iterator {
+
+ private final long pIndex;
+
+ private long nextIndex;
+
+ private E nextElement;
+
+ private AtomicReferenceArray currentBuffer;
+
+ private int mask;
+
+ WeakIterator(AtomicReferenceArray currentBuffer, long cIndex, long pIndex) {
+ this.pIndex = pIndex >> 1;
+ this.nextIndex = cIndex >> 1;
+ setBuffer(currentBuffer);
+ nextElement = getNext();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextElement != null;
+ }
+
+ @Override
+ public E next() {
+ final E e = nextElement;
+ if (e == null) {
+ throw new NoSuchElementException();
+ }
+ nextElement = getNext();
+ return e;
+ }
+
+ private void setBuffer(AtomicReferenceArray buffer) {
+ this.currentBuffer = buffer;
+ this.mask = length(buffer) - 2;
+ }
+
+ private E getNext() {
+ while (nextIndex < pIndex) {
+ long index = nextIndex++;
+ E e = lvRefElement(currentBuffer, calcCircularRefElementOffset(index, mask));
+ // skip removed/not yet visible elements
+ if (e == null) {
+ continue;
+ }
+ // not null && not JUMP -> found next element
+ if (e != JUMP) {
+ return e;
+ }
+ // need to jump to the next buffer
+ int nextBufferIndex = mask + 1;
+ Object nextBuffer = lvRefElement(currentBuffer, calcRefElementOffset(nextBufferIndex));
+ if (nextBuffer == BUFFER_CONSUMED || nextBuffer == null) {
+ // Consumer may have passed us, or the next buffer is not visible yet: drop out early
+ return null;
+ }
+ setBuffer((AtomicReferenceArray) nextBuffer);
+ // now with the new array retry the load, it can't be a JUMP, but we need to repeat same index
+ e = lvRefElement(currentBuffer, calcCircularRefElementOffset(index, mask));
+ // skip removed/not yet visible elements
+ if (e == null) {
+ continue;
+ } else {
+ return e;
+ }
+ }
+ return null;
+ }
+ }
+
+ private void resize(long oldMask, AtomicReferenceArray oldBuffer, long pIndex, E e, Supplier s) {
+ assert (e != null && s == null) || (e == null || s != null);
+ int newBufferLength = getNextBufferSize(oldBuffer);
+ final AtomicReferenceArray newBuffer;
+ try {
+ newBuffer = allocateRefArray(newBufferLength);
+ } catch (OutOfMemoryError oom) {
+ assert lvProducerIndex() == pIndex + 1;
+ soProducerIndex(pIndex);
+ throw oom;
+ }
+ producerBuffer = newBuffer;
+ final int newMask = (newBufferLength - 2) << 1;
+ producerMask = newMask;
+ final int offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
+ final int offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);
+ // element in new array
+ soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);
+ // buffer linked
+ soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);
+ // ASSERT code
+ final long cIndex = lvConsumerIndex();
+ final long availableInQueue = availableInQueue(pIndex, cIndex);
+ RangeUtil.checkPositive(availableInQueue, "availableInQueue");
+ // Invalidate racing CASs
+ // We never set the limit beyond the bounds of a buffer
+ soProducerLimit(pIndex + Math.min(newMask, availableInQueue));
+ // make resize visible to the other producers
+ soProducerIndex(pIndex + 2);
+ // INDEX visible before ELEMENT, consistent with consumer expectation
+ // make resize visible to consumer
+ soRefElement(oldBuffer, offsetInOld, JUMP);
+ }
+
+ /**
+ * @return next buffer size(inclusive of next array pointer)
+ */
+ protected abstract int getNextBufferSize(AtomicReferenceArray buffer);
+
+ /**
+ * @return current buffer capacity for elements (excluding next pointer and jump entry) * 2
+ */
+ protected abstract long getCurrentBufferCapacity(long mask);
+}
diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/BaseSpscLinkedAtomicUnpaddedArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/BaseSpscLinkedAtomicUnpaddedArrayQueue.java
new file mode 100644
index 00000000..fd49e5f7
--- /dev/null
+++ b/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/BaseSpscLinkedAtomicUnpaddedArrayQueue.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jctools.queues.atomic.unpadded;
+
+import org.jctools.queues.IndexedQueueSizeUtil.IndexedQueue;
+import org.jctools.util.PortableJvmInfo;
+import java.util.AbstractQueue;
+import java.util.Iterator;
+import java.util.concurrent.atomic.*;
+import org.jctools.queues.*;
+import static org.jctools.queues.atomic.unpadded.AtomicQueueUtil.*;
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseSpscLinkedArrayQueue.java.
+ */
+abstract class BaseSpscLinkedAtomicUnpaddedArrayQueuePrePad extends AbstractQueue implements IndexedQueue {
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseSpscLinkedArrayQueue.java.
+ */
+abstract class BaseSpscLinkedAtomicUnpaddedArrayQueueConsumerColdFields extends BaseSpscLinkedAtomicUnpaddedArrayQueuePrePad {
+
+ protected long consumerMask;
+
+ protected AtomicReferenceArray consumerBuffer;
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseSpscLinkedArrayQueue.java.
+ */
+abstract class BaseSpscLinkedAtomicUnpaddedArrayQueueConsumerField extends BaseSpscLinkedAtomicUnpaddedArrayQueueConsumerColdFields {
+
+ private static final AtomicLongFieldUpdater C_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(BaseSpscLinkedAtomicUnpaddedArrayQueueConsumerField.class, "consumerIndex");
+
+ private volatile long consumerIndex;
+
+ @Override
+ public final long lvConsumerIndex() {
+ return consumerIndex;
+ }
+
+ final long lpConsumerIndex() {
+ return consumerIndex;
+ }
+
+ final void soConsumerIndex(long newValue) {
+ C_INDEX_UPDATER.lazySet(this, newValue);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseSpscLinkedArrayQueue.java.
+ */
+abstract class BaseSpscLinkedAtomicUnpaddedArrayQueueL2Pad extends BaseSpscLinkedAtomicUnpaddedArrayQueueConsumerField {
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseSpscLinkedArrayQueue.java.
+ */
+abstract class BaseSpscLinkedAtomicUnpaddedArrayQueueProducerFields extends BaseSpscLinkedAtomicUnpaddedArrayQueueL2Pad {
+
+ private static final AtomicLongFieldUpdater P_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(BaseSpscLinkedAtomicUnpaddedArrayQueueProducerFields.class, "producerIndex");
+
+ private volatile long producerIndex;
+
+ @Override
+ public final long lvProducerIndex() {
+ return producerIndex;
+ }
+
+ final void soProducerIndex(long newValue) {
+ P_INDEX_UPDATER.lazySet(this, newValue);
+ }
+
+ final long lpProducerIndex() {
+ return producerIndex;
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseSpscLinkedArrayQueue.java.
+ */
+abstract class BaseSpscLinkedAtomicUnpaddedArrayQueueProducerColdFields extends BaseSpscLinkedAtomicUnpaddedArrayQueueProducerFields {
+
+ protected long producerBufferLimit;
+
+ // fixed for chunked and unbounded
+ protected long producerMask;
+
+ protected AtomicReferenceArray producerBuffer;
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedLinkedQueueGenerator
+ * which can found in the jctools-build module. The original source file is BaseSpscLinkedArrayQueue.java.
+ */
+abstract class BaseSpscLinkedAtomicUnpaddedArrayQueue extends BaseSpscLinkedAtomicUnpaddedArrayQueueProducerColdFields implements MessagePassingQueue, QueueProgressIndicators {
+
+ private static final Object JUMP = new Object();
+
+ @Override
+ public final Iterator iterator() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public final int size() {
+ return IndexedQueueSizeUtil.size(this, IndexedQueueSizeUtil.PLAIN_DIVISOR);
+ }
+
+ @Override
+ public final boolean isEmpty() {
+ return IndexedQueueSizeUtil.isEmpty(this);
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getName();
+ }
+
+ @Override
+ public long currentProducerIndex() {
+ return lvProducerIndex();
+ }
+
+ @Override
+ public long currentConsumerIndex() {
+ return lvConsumerIndex();
+ }
+
+ protected final void soNext(AtomicReferenceArray curr, AtomicReferenceArray next) {
+ int offset = nextArrayOffset(curr);
+ soRefElement(curr, offset, next);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected final AtomicReferenceArray lvNextArrayAndUnlink(AtomicReferenceArray curr) {
+ final int offset = nextArrayOffset(curr);
+ final AtomicReferenceArray nextBuffer = (AtomicReferenceArray) lvRefElement(curr, offset);
+ // prevent GC nepotism
+ soRefElement(curr, offset, null);
+ return nextBuffer;
+ }
+
+ @Override
+ public boolean relaxedOffer(E e) {
+ return offer(e);
+ }
+
+ @Override
+ public E relaxedPoll() {
+ return poll();
+ }
+
+ @Override
+ public E relaxedPeek() {
+ return peek();
+ }
+
+ @Override
+ public int drain(Consumer c) {
+ return MessagePassingQueueUtil.drain(this, c);
+ }
+
+ @Override
+ public int fill(Supplier s) {
+ // result is a long because we want to have a safepoint check at regular intervals
+ long result = 0;
+ final int capacity = capacity();
+ do {
+ final int filled = fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH);
+ if (filled == 0) {
+ return (int) result;
+ }
+ result += filled;
+ } while (result <= capacity);
+ return (int) result;
+ }
+
+ @Override
+ public int drain(Consumer c, int limit) {
+ return MessagePassingQueueUtil.drain(this, c, limit);
+ }
+
+ @Override
+ public int fill(Supplier s, int limit) {
+ if (null == s)
+ throw new IllegalArgumentException("supplier is null");
+ if (limit < 0)
+ throw new IllegalArgumentException("limit is negative:" + limit);
+ if (limit == 0)
+ return 0;
+ for (int i = 0; i < limit; i++) {
+ // local load of field to avoid repeated loads after volatile reads
+ final AtomicReferenceArray buffer = producerBuffer;
+ final long index = lpProducerIndex();
+ final long mask = producerMask;
+ final int offset = calcCircularRefElementOffset(index, mask);
+ // expected hot path
+ if (index < producerBufferLimit) {
+ writeToQueue(buffer, s.get(), index, offset);
+ } else {
+ if (!offerColdPath(buffer, mask, index, offset, null, s)) {
+ return i;
+ }
+ }
+ }
+ return limit;
+ }
+
+ @Override
+ public void drain(Consumer c, WaitStrategy wait, ExitCondition exit) {
+ MessagePassingQueueUtil.drain(this, c, wait, exit);
+ }
+
+ @Override
+ public void fill(Supplier s, WaitStrategy wait, ExitCondition exit) {
+ MessagePassingQueueUtil.fill(this, s, wait, exit);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * This implementation is correct for single producer thread use only.
+ */
+ @Override
+ public boolean offer(final E e) {
+ // Objects.requireNonNull(e);
+ if (null == e) {
+ throw new NullPointerException();
+ }
+ // local load of field to avoid repeated loads after volatile reads
+ final AtomicReferenceArray buffer = producerBuffer;
+ final long index = lpProducerIndex();
+ final long mask = producerMask;
+ final int offset = calcCircularRefElementOffset(index, mask);
+ // expected hot path
+ if (index < producerBufferLimit) {
+ writeToQueue(buffer, e, index, offset);
+ return true;
+ }
+ return offerColdPath(buffer, mask, index, offset, e, null);
+ }
+
+ abstract boolean offerColdPath(AtomicReferenceArray buffer, long mask, long pIndex, int offset, E v, Supplier extends E> s);
+
+ /**
+ * {@inheritDoc}
+ *
+ * This implementation is correct for single consumer thread use only.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public E poll() {
+ // local load of field to avoid repeated loads after volatile reads
+ final AtomicReferenceArray buffer = consumerBuffer;
+ final long index = lpConsumerIndex();
+ final long mask = consumerMask;
+ final int offset = calcCircularRefElementOffset(index, mask);
+ final Object e = lvRefElement(buffer, offset);
+ boolean isNextBuffer = e == JUMP;
+ if (null != e && !isNextBuffer) {
+ // this ensures correctness on 32bit platforms
+ soConsumerIndex(index + 1);
+ soRefElement(buffer, offset, null);
+ return (E) e;
+ } else if (isNextBuffer) {
+ return newBufferPoll(buffer, index);
+ }
+ return null;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * This implementation is correct for single consumer thread use only.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public E peek() {
+ final AtomicReferenceArray buffer = consumerBuffer;
+ final long index = lpConsumerIndex();
+ final long mask = consumerMask;
+ final int offset = calcCircularRefElementOffset(index, mask);
+ final Object e = lvRefElement(buffer, offset);
+ if (e == JUMP) {
+ return newBufferPeek(buffer, index);
+ }
+ return (E) e;
+ }
+
+ final void linkOldToNew(final long currIndex, final AtomicReferenceArray oldBuffer, final int offset, final AtomicReferenceArray newBuffer, final int offsetInNew, final E e) {
+ soRefElement(newBuffer, offsetInNew, e);
+ // link to next buffer and add next indicator as element of old buffer
+ soNext(oldBuffer, newBuffer);
+ soRefElement(oldBuffer, offset, JUMP);
+ // index is visible after elements (isEmpty/poll ordering)
+ // this ensures atomic write of long on 32bit platforms
+ soProducerIndex(currIndex + 1);
+ }
+
+ final void writeToQueue(final AtomicReferenceArray buffer, final E e, final long index, final int offset) {
+ soRefElement(buffer, offset, e);
+ // this ensures atomic write of long on 32bit platforms
+ soProducerIndex(index + 1);
+ }
+
+ private E newBufferPeek(final AtomicReferenceArray buffer, final long index) {
+ AtomicReferenceArray nextBuffer = lvNextArrayAndUnlink(buffer);
+ consumerBuffer = nextBuffer;
+ final long mask = length(nextBuffer) - 2;
+ consumerMask = mask;
+ final int offset = calcCircularRefElementOffset(index, mask);
+ return lvRefElement(nextBuffer, offset);
+ }
+
+ private E newBufferPoll(final AtomicReferenceArray buffer, final long index) {
+ AtomicReferenceArray nextBuffer = lvNextArrayAndUnlink(buffer);
+ consumerBuffer = nextBuffer;
+ final long mask = length(nextBuffer) - 2;
+ consumerMask = mask;
+ final int offset = calcCircularRefElementOffset(index, mask);
+ final E n = lvRefElement(nextBuffer, offset);
+ if (null == n) {
+ throw new IllegalStateException("new buffer must have at least one element");
+ } else {
+ // this ensures correctness on 32bit platforms
+ soConsumerIndex(index + 1);
+ soRefElement(nextBuffer, offset, null);
+ return n;
+ }
+ }
+}
diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/LinkedQueueAtomicNode.java b/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/LinkedQueueAtomicNode.java
new file mode 100644
index 00000000..fa923e6a
--- /dev/null
+++ b/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/LinkedQueueAtomicNode.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jctools.queues.atomic.unpadded;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+public final class LinkedQueueAtomicNode extends AtomicReference>
+{
+ /** */
+ private static final long serialVersionUID = 2404266111789071508L;
+ private E value;
+
+ LinkedQueueAtomicNode()
+ {
+ }
+
+ LinkedQueueAtomicNode(E val)
+ {
+ spValue(val);
+ }
+
+ /**
+ * Gets the current value and nulls out the reference to it from this node.
+ *
+ * @return value
+ */
+ public E getAndNullValue()
+ {
+ E temp = lpValue();
+ spValue(null);
+ return temp;
+ }
+
+ public E lpValue()
+ {
+ return value;
+ }
+
+ public void spValue(E newValue)
+ {
+ value = newValue;
+ }
+
+ public void soNext(LinkedQueueAtomicNode n)
+ {
+ lazySet(n);
+ }
+
+ public void spNext(LinkedQueueAtomicNode n)
+ {
+ lazySet(n);
+ }
+
+ public LinkedQueueAtomicNode lvNext()
+ {
+ return get();
+ }
+}
diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/MpmcAtomicUnpaddedArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/MpmcAtomicUnpaddedArrayQueue.java
new file mode 100644
index 00000000..4f3b8a37
--- /dev/null
+++ b/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/MpmcAtomicUnpaddedArrayQueue.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jctools.queues.atomic.unpadded;
+
+import org.jctools.util.RangeUtil;
+import java.util.concurrent.atomic.*;
+import org.jctools.queues.*;
+import static org.jctools.queues.atomic.unpadded.AtomicQueueUtil.*;
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedArrayQueueGenerator
+ * which can found in the jctools-build module. The original source file is MpmcArrayQueue.java.
+ */
+abstract class MpmcAtomicUnpaddedArrayQueueL1Pad extends SequencedAtomicReferenceArrayQueue {
+
+ MpmcAtomicUnpaddedArrayQueueL1Pad(int capacity) {
+ super(capacity);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedArrayQueueGenerator
+ * which can found in the jctools-build module. The original source file is MpmcArrayQueue.java.
+ */
+abstract class MpmcAtomicUnpaddedArrayQueueProducerIndexField extends MpmcAtomicUnpaddedArrayQueueL1Pad {
+
+ private static final AtomicLongFieldUpdater P_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(MpmcAtomicUnpaddedArrayQueueProducerIndexField.class, "producerIndex");
+
+ private volatile long producerIndex;
+
+ MpmcAtomicUnpaddedArrayQueueProducerIndexField(int capacity) {
+ super(capacity);
+ }
+
+ @Override
+ public final long lvProducerIndex() {
+ return producerIndex;
+ }
+
+ final boolean casProducerIndex(long expect, long newValue) {
+ return P_INDEX_UPDATER.compareAndSet(this, expect, newValue);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedArrayQueueGenerator
+ * which can found in the jctools-build module. The original source file is MpmcArrayQueue.java.
+ */
+abstract class MpmcAtomicUnpaddedArrayQueueL2Pad extends MpmcAtomicUnpaddedArrayQueueProducerIndexField {
+
+ MpmcAtomicUnpaddedArrayQueueL2Pad(int capacity) {
+ super(capacity);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedArrayQueueGenerator
+ * which can found in the jctools-build module. The original source file is MpmcArrayQueue.java.
+ */
+abstract class MpmcAtomicUnpaddedArrayQueueConsumerIndexField extends MpmcAtomicUnpaddedArrayQueueL2Pad {
+
+ private static final AtomicLongFieldUpdater C_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(MpmcAtomicUnpaddedArrayQueueConsumerIndexField.class, "consumerIndex");
+
+ private volatile long consumerIndex;
+
+ MpmcAtomicUnpaddedArrayQueueConsumerIndexField(int capacity) {
+ super(capacity);
+ }
+
+ @Override
+ public final long lvConsumerIndex() {
+ return consumerIndex;
+ }
+
+ final boolean casConsumerIndex(long expect, long newValue) {
+ return C_INDEX_UPDATER.compareAndSet(this, expect, newValue);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedArrayQueueGenerator
+ * which can found in the jctools-build module. The original source file is MpmcArrayQueue.java.
+ */
+abstract class MpmcAtomicUnpaddedArrayQueueL3Pad extends MpmcAtomicUnpaddedArrayQueueConsumerIndexField {
+
+ MpmcAtomicUnpaddedArrayQueueL3Pad(int capacity) {
+ super(capacity);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedArrayQueueGenerator
+ * which can found in the jctools-build module. The original source file is MpmcArrayQueue.java.
+ *
+ * A Multi-Producer-Multi-Consumer queue based on a {@link org.jctools.queues.ConcurrentCircularArrayQueue}. This
+ * implies that any and all threads may call the offer/poll/peek methods and correctness is maintained.
+ * This implementation follows patterns documented on the package level for False Sharing protection.
+ * The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See here). The original
+ * algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in
+ * Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each
+ * field of the struct. There is a further alternative in the experimental project which uses iteration phase
+ * markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as
+ * well as this implementation.
+ *
+ * Tradeoffs to keep in mind:
+ *
+ * - Padding for false sharing: counter fields and queue fields are all padded as well as either side of
+ * both arrays. We are trading memory to avoid false sharing(active and passive).
+ *
- 2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the
+ * elements array. This is doubling/tripling the memory allocated for the buffer.
+ *
- Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or
+ * equal to the requested capacity.
+ *
+ */
+public class MpmcAtomicUnpaddedArrayQueue extends MpmcAtomicUnpaddedArrayQueueL3Pad {
+
+ public static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.mpmc.max.lookahead.step", 4096);
+
+ private final int lookAheadStep;
+
+ public MpmcAtomicUnpaddedArrayQueue(final int capacity) {
+ super(RangeUtil.checkGreaterThanOrEqual(capacity, 2, "capacity"));
+ lookAheadStep = Math.max(2, Math.min(capacity() / 4, MAX_LOOK_AHEAD_STEP));
+ }
+
+ @Override
+ public boolean offer(final E e) {
+ if (null == e) {
+ throw new NullPointerException();
+ }
+ final int mask = this.mask;
+ final long capacity = mask + 1;
+ final AtomicLongArray sBuffer = sequenceBuffer;
+ long pIndex;
+ int seqOffset;
+ long seq;
+ // start with bogus value, hope we don't need it
+ long cIndex = Long.MIN_VALUE;
+ do {
+ pIndex = lvProducerIndex();
+ seqOffset = calcCircularLongElementOffset(pIndex, mask);
+ seq = lvLongElement(sBuffer, seqOffset);
+ // consumer has not moved this seq forward, it's as last producer left
+ if (seq < pIndex) {
+ // Extra check required to ensure [Queue.offer == false iff queue is full]
+ if (// test against cached cIndex
+ pIndex - capacity >= cIndex && // test against latest cIndex
+ pIndex - capacity >= (cIndex = lvConsumerIndex())) {
+ return false;
+ } else {
+ // (+) hack to make it go around again without CAS
+ seq = pIndex + 1;
+ }
+ }
+ } while (// another producer has moved the sequence(or +)
+ seq > pIndex || // failed to increment
+ !casProducerIndex(pIndex, pIndex + 1));
+ // casProducerIndex ensures correct construction
+ spRefElement(buffer, calcCircularRefElementOffset(pIndex, mask), e);
+ // seq++;
+ soLongElement(sBuffer, seqOffset, pIndex + 1);
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Because return null indicates queue is empty we cannot simply rely on next element visibility for poll
+ * and must test producer index when next element is not visible.
+ */
+ @Override
+ public E poll() {
+ // local load of field to avoid repeated loads after volatile reads
+ final AtomicLongArray sBuffer = sequenceBuffer;
+ final int mask = this.mask;
+ long cIndex;
+ long seq;
+ int seqOffset;
+ long expectedSeq;
+ // start with bogus value, hope we don't need it
+ long pIndex = -1;
+ do {
+ cIndex = lvConsumerIndex();
+ seqOffset = calcCircularLongElementOffset(cIndex, mask);
+ seq = lvLongElement(sBuffer, seqOffset);
+ expectedSeq = cIndex + 1;
+ if (seq < expectedSeq) {
+ // slot has not been moved by producer
+ if (// test against cached pIndex
+ cIndex >= pIndex && // update pIndex if we must
+ cIndex == (pIndex = lvProducerIndex())) {
+ // strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
+ return null;
+ } else {
+ // trip another go around
+ seq = expectedSeq + 1;
+ }
+ }
+ } while (// another consumer beat us to it
+ seq > expectedSeq || // failed the CAS
+ !casConsumerIndex(cIndex, cIndex + 1));
+ final int offset = calcCircularRefElementOffset(cIndex, mask);
+ final E e = lpRefElement(buffer, offset);
+ spRefElement(buffer, offset, null);
+ // i.e. seq += capacity
+ soLongElement(sBuffer, seqOffset, cIndex + mask + 1);
+ return e;
+ }
+
+ @Override
+ public E peek() {
+ // local load of field to avoid repeated loads after volatile reads
+ final AtomicLongArray sBuffer = sequenceBuffer;
+ final int mask = this.mask;
+ long cIndex;
+ long seq;
+ int seqOffset;
+ long expectedSeq;
+ // start with bogus value, hope we don't need it
+ long pIndex = -1;
+ E e;
+ while (true) {
+ cIndex = lvConsumerIndex();
+ seqOffset = calcCircularLongElementOffset(cIndex, mask);
+ seq = lvLongElement(sBuffer, seqOffset);
+ expectedSeq = cIndex + 1;
+ if (seq < expectedSeq) {
+ // slot has not been moved by producer
+ if (// test against cached pIndex
+ cIndex >= pIndex && // update pIndex if we must
+ cIndex == (pIndex = lvProducerIndex())) {
+ // strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
+ return null;
+ }
+ } else if (seq == expectedSeq) {
+ final int offset = calcCircularRefElementOffset(cIndex, mask);
+ e = lvRefElement(buffer, offset);
+ if (lvConsumerIndex() == cIndex)
+ return e;
+ }
+ }
+ }
+
+ @Override
+ public boolean relaxedOffer(E e) {
+ if (null == e) {
+ throw new NullPointerException();
+ }
+ final int mask = this.mask;
+ final AtomicLongArray sBuffer = sequenceBuffer;
+ long pIndex;
+ int seqOffset;
+ long seq;
+ do {
+ pIndex = lvProducerIndex();
+ seqOffset = calcCircularLongElementOffset(pIndex, mask);
+ seq = lvLongElement(sBuffer, seqOffset);
+ if (seq < pIndex) {
+ // slot not cleared by consumer yet
+ return false;
+ }
+ } while (// another producer has moved the sequence
+ seq > pIndex || // failed to increment
+ !casProducerIndex(pIndex, pIndex + 1));
+ // casProducerIndex ensures correct construction
+ spRefElement(buffer, calcCircularRefElementOffset(pIndex, mask), e);
+ soLongElement(sBuffer, seqOffset, pIndex + 1);
+ return true;
+ }
+
+ @Override
+ public E relaxedPoll() {
+ final AtomicLongArray sBuffer = sequenceBuffer;
+ final int mask = this.mask;
+ long cIndex;
+ int seqOffset;
+ long seq;
+ long expectedSeq;
+ do {
+ cIndex = lvConsumerIndex();
+ seqOffset = calcCircularLongElementOffset(cIndex, mask);
+ seq = lvLongElement(sBuffer, seqOffset);
+ expectedSeq = cIndex + 1;
+ if (seq < expectedSeq) {
+ return null;
+ }
+ } while (// another consumer beat us to it
+ seq > expectedSeq || // failed the CAS
+ !casConsumerIndex(cIndex, cIndex + 1));
+ final int offset = calcCircularRefElementOffset(cIndex, mask);
+ final E e = lpRefElement(buffer, offset);
+ spRefElement(buffer, offset, null);
+ soLongElement(sBuffer, seqOffset, cIndex + mask + 1);
+ return e;
+ }
+
+ @Override
+ public E relaxedPeek() {
+ // local load of field to avoid repeated loads after volatile reads
+ final AtomicLongArray sBuffer = sequenceBuffer;
+ final int mask = this.mask;
+ long cIndex;
+ long seq;
+ int seqOffset;
+ long expectedSeq;
+ E e;
+ do {
+ cIndex = lvConsumerIndex();
+ seqOffset = calcCircularLongElementOffset(cIndex, mask);
+ seq = lvLongElement(sBuffer, seqOffset);
+ expectedSeq = cIndex + 1;
+ if (seq < expectedSeq) {
+ return null;
+ } else if (seq == expectedSeq) {
+ final int offset = calcCircularRefElementOffset(cIndex, mask);
+ e = lvRefElement(buffer, offset);
+ if (lvConsumerIndex() == cIndex)
+ return e;
+ }
+ } while (true);
+ }
+
+ @Override
+ public int drain(Consumer c, int limit) {
+ if (null == c)
+ throw new IllegalArgumentException("c is null");
+ if (limit < 0)
+ throw new IllegalArgumentException("limit is negative: " + limit);
+ if (limit == 0)
+ return 0;
+ final AtomicLongArray sBuffer = sequenceBuffer;
+ final int mask = this.mask;
+ final AtomicReferenceArray buffer = this.buffer;
+ final int maxLookAheadStep = Math.min(this.lookAheadStep, limit);
+ int consumed = 0;
+ while (consumed < limit) {
+ final int remaining = limit - consumed;
+ final int lookAheadStep = Math.min(remaining, maxLookAheadStep);
+ final long cIndex = lvConsumerIndex();
+ final long lookAheadIndex = cIndex + lookAheadStep - 1;
+ final int lookAheadSeqOffset = calcCircularLongElementOffset(lookAheadIndex, mask);
+ final long lookAheadSeq = lvLongElement(sBuffer, lookAheadSeqOffset);
+ final long expectedLookAheadSeq = lookAheadIndex + 1;
+ if (lookAheadSeq == expectedLookAheadSeq && casConsumerIndex(cIndex, expectedLookAheadSeq)) {
+ for (int i = 0; i < lookAheadStep; i++) {
+ final long index = cIndex + i;
+ final int seqOffset = calcCircularLongElementOffset(index, mask);
+ final int offset = calcCircularRefElementOffset(index, mask);
+ final long expectedSeq = index + 1;
+ while (lvLongElement(sBuffer, seqOffset) != expectedSeq) {
+ }
+ final E e = lpRefElement(buffer, offset);
+ spRefElement(buffer, offset, null);
+ soLongElement(sBuffer, seqOffset, index + mask + 1);
+ c.accept(e);
+ }
+ consumed += lookAheadStep;
+ } else {
+ if (lookAheadSeq < expectedLookAheadSeq) {
+ if (notAvailable(cIndex, mask, sBuffer, cIndex + 1)) {
+ return consumed;
+ }
+ }
+ return consumed + drainOneByOne(c, remaining);
+ }
+ }
+ return limit;
+ }
+
+ private int drainOneByOne(Consumer c, int limit) {
+ final AtomicLongArray sBuffer = sequenceBuffer;
+ final int mask = this.mask;
+ final AtomicReferenceArray buffer = this.buffer;
+ long cIndex;
+ int seqOffset;
+ long seq;
+ long expectedSeq;
+ for (int i = 0; i < limit; i++) {
+ do {
+ cIndex = lvConsumerIndex();
+ seqOffset = calcCircularLongElementOffset(cIndex, mask);
+ seq = lvLongElement(sBuffer, seqOffset);
+ expectedSeq = cIndex + 1;
+ if (seq < expectedSeq) {
+ return i;
+ }
+ } while (// another consumer beat us to it
+ seq > expectedSeq || // failed the CAS
+ !casConsumerIndex(cIndex, cIndex + 1));
+ final int offset = calcCircularRefElementOffset(cIndex, mask);
+ final E e = lpRefElement(buffer, offset);
+ spRefElement(buffer, offset, null);
+ soLongElement(sBuffer, seqOffset, cIndex + mask + 1);
+ c.accept(e);
+ }
+ return limit;
+ }
+
+ @Override
+ public int fill(Supplier s, int limit) {
+ if (null == s)
+ throw new IllegalArgumentException("supplier is null");
+ if (limit < 0)
+ throw new IllegalArgumentException("limit is negative:" + limit);
+ if (limit == 0)
+ return 0;
+ final AtomicLongArray sBuffer = sequenceBuffer;
+ final int mask = this.mask;
+ final AtomicReferenceArray buffer = this.buffer;
+ final int maxLookAheadStep = Math.min(this.lookAheadStep, limit);
+ int produced = 0;
+ while (produced < limit) {
+ final int remaining = limit - produced;
+ final int lookAheadStep = Math.min(remaining, maxLookAheadStep);
+ final long pIndex = lvProducerIndex();
+ final long lookAheadIndex = pIndex + lookAheadStep - 1;
+ final int lookAheadSeqOffset = calcCircularLongElementOffset(lookAheadIndex, mask);
+ final long lookAheadSeq = lvLongElement(sBuffer, lookAheadSeqOffset);
+ final long expectedLookAheadSeq = lookAheadIndex;
+ if (lookAheadSeq == expectedLookAheadSeq && casProducerIndex(pIndex, expectedLookAheadSeq + 1)) {
+ for (int i = 0; i < lookAheadStep; i++) {
+ final long index = pIndex + i;
+ final int seqOffset = calcCircularLongElementOffset(index, mask);
+ final int offset = calcCircularRefElementOffset(index, mask);
+ while (lvLongElement(sBuffer, seqOffset) != index) {
+ }
+ // Ordered store ensures correct construction
+ soRefElement(buffer, offset, s.get());
+ soLongElement(sBuffer, seqOffset, index + 1);
+ }
+ produced += lookAheadStep;
+ } else {
+ if (lookAheadSeq < expectedLookAheadSeq) {
+ if (notAvailable(pIndex, mask, sBuffer, pIndex)) {
+ return produced;
+ }
+ }
+ return produced + fillOneByOne(s, remaining);
+ }
+ }
+ return limit;
+ }
+
+ private boolean notAvailable(long index, int mask, AtomicLongArray sBuffer, long expectedSeq) {
+ final int seqOffset = calcCircularLongElementOffset(index, mask);
+ final long seq = lvLongElement(sBuffer, seqOffset);
+ if (seq < expectedSeq) {
+ return true;
+ }
+ return false;
+ }
+
+ private int fillOneByOne(Supplier s, int limit) {
+ final AtomicLongArray sBuffer = sequenceBuffer;
+ final int mask = this.mask;
+ final AtomicReferenceArray buffer = this.buffer;
+ long pIndex;
+ int seqOffset;
+ long seq;
+ for (int i = 0; i < limit; i++) {
+ do {
+ pIndex = lvProducerIndex();
+ seqOffset = calcCircularLongElementOffset(pIndex, mask);
+ seq = lvLongElement(sBuffer, seqOffset);
+ if (seq < pIndex) {
+ // slot not cleared by consumer yet
+ return i;
+ }
+ } while (// another producer has moved the sequence
+ seq > pIndex || // failed to increment
+ !casProducerIndex(pIndex, pIndex + 1));
+ // Ordered store ensures correct construction
+ soRefElement(buffer, calcCircularRefElementOffset(pIndex, mask), s.get());
+ soLongElement(sBuffer, seqOffset, pIndex + 1);
+ }
+ return limit;
+ }
+
+ @Override
+ public int drain(Consumer c) {
+ return MessagePassingQueueUtil.drain(this, c);
+ }
+
+ @Override
+ public int fill(Supplier s) {
+ return MessagePassingQueueUtil.fillBounded(this, s);
+ }
+
+ @Override
+ public void drain(Consumer c, WaitStrategy w, ExitCondition exit) {
+ MessagePassingQueueUtil.drain(this, c, w, exit);
+ }
+
+ @Override
+ public void fill(Supplier s, WaitStrategy wait, ExitCondition exit) {
+ MessagePassingQueueUtil.fill(this, s, wait, exit);
+ }
+}
diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/MpscAtomicUnpaddedArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/MpscAtomicUnpaddedArrayQueue.java
new file mode 100644
index 00000000..89bb6edd
--- /dev/null
+++ b/jctools-core/src/main/java/org/jctools/queues/atomic/unpadded/MpscAtomicUnpaddedArrayQueue.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jctools.queues.atomic.unpadded;
+
+import java.util.concurrent.atomic.*;
+import org.jctools.queues.*;
+import static org.jctools.queues.atomic.unpadded.AtomicQueueUtil.*;
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedArrayQueueGenerator
+ * which can found in the jctools-build module. The original source file is MpscArrayQueue.java.
+ */
+abstract class MpscAtomicUnpaddedArrayQueueL1Pad extends AtomicReferenceArrayQueue {
+
+ MpscAtomicUnpaddedArrayQueueL1Pad(int capacity) {
+ super(capacity);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedArrayQueueGenerator
+ * which can found in the jctools-build module. The original source file is MpscArrayQueue.java.
+ */
+abstract class MpscAtomicUnpaddedArrayQueueProducerIndexField extends MpscAtomicUnpaddedArrayQueueL1Pad {
+
+ private static final AtomicLongFieldUpdater P_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(MpscAtomicUnpaddedArrayQueueProducerIndexField.class, "producerIndex");
+
+ private volatile long producerIndex;
+
+ MpscAtomicUnpaddedArrayQueueProducerIndexField(int capacity) {
+ super(capacity);
+ }
+
+ @Override
+ public final long lvProducerIndex() {
+ return producerIndex;
+ }
+
+ final boolean casProducerIndex(long expect, long newValue) {
+ return P_INDEX_UPDATER.compareAndSet(this, expect, newValue);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedArrayQueueGenerator
+ * which can found in the jctools-build module. The original source file is MpscArrayQueue.java.
+ */
+abstract class MpscAtomicUnpaddedArrayQueueMidPad extends MpscAtomicUnpaddedArrayQueueProducerIndexField {
+
+ MpscAtomicUnpaddedArrayQueueMidPad(int capacity) {
+ super(capacity);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedArrayQueueGenerator
+ * which can found in the jctools-build module. The original source file is MpscArrayQueue.java.
+ */
+abstract class MpscAtomicUnpaddedArrayQueueProducerLimitField extends MpscAtomicUnpaddedArrayQueueMidPad {
+
+ private static final AtomicLongFieldUpdater P_LIMIT_UPDATER = AtomicLongFieldUpdater.newUpdater(MpscAtomicUnpaddedArrayQueueProducerLimitField.class, "producerLimit");
+
+ // First unavailable index the producer may claim up to before rereading the consumer index
+ private volatile long producerLimit;
+
+ MpscAtomicUnpaddedArrayQueueProducerLimitField(int capacity) {
+ super(capacity);
+ this.producerLimit = capacity;
+ }
+
+ final long lvProducerLimit() {
+ return producerLimit;
+ }
+
+ final void soProducerLimit(long newValue) {
+ P_LIMIT_UPDATER.lazySet(this, newValue);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedArrayQueueGenerator
+ * which can found in the jctools-build module. The original source file is MpscArrayQueue.java.
+ */
+abstract class MpscAtomicUnpaddedArrayQueueL2Pad extends MpscAtomicUnpaddedArrayQueueProducerLimitField {
+
+ MpscAtomicUnpaddedArrayQueueL2Pad(int capacity) {
+ super(capacity);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedArrayQueueGenerator
+ * which can found in the jctools-build module. The original source file is MpscArrayQueue.java.
+ */
+abstract class MpscAtomicUnpaddedArrayQueueConsumerIndexField extends MpscAtomicUnpaddedArrayQueueL2Pad {
+
+ private static final AtomicLongFieldUpdater C_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(MpscAtomicUnpaddedArrayQueueConsumerIndexField.class, "consumerIndex");
+
+ private volatile long consumerIndex;
+
+ MpscAtomicUnpaddedArrayQueueConsumerIndexField(int capacity) {
+ super(capacity);
+ }
+
+ @Override
+ public final long lvConsumerIndex() {
+ return consumerIndex;
+ }
+
+ final long lpConsumerIndex() {
+ return consumerIndex;
+ }
+
+ final void soConsumerIndex(long newValue) {
+ C_INDEX_UPDATER.lazySet(this, newValue);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedArrayQueueGenerator
+ * which can found in the jctools-build module. The original source file is MpscArrayQueue.java.
+ */
+abstract class MpscAtomicUnpaddedArrayQueueL3Pad extends MpscAtomicUnpaddedArrayQueueConsumerIndexField {
+
+ MpscAtomicUnpaddedArrayQueueL3Pad(int capacity) {
+ super(capacity);
+ }
+}
+
+/**
+ * NOTE: This class was automatically generated by org.jctools.queues.atomic.unpadded.JavaParsingAtomicUnpaddedArrayQueueGenerator
+ * which can found in the jctools-build module. The original source file is MpscArrayQueue.java.
+ *
+ * A Multi-Producer-Single-Consumer queue based on a {@link org.jctools.queues.ConcurrentCircularArrayQueue}. This
+ * implies that any thread may call the offer method, but only a single thread may call poll/peek for correctness to
+ * maintained.
+ * This implementation follows patterns documented on the package level for False Sharing protection.
+ * This implementation is using the Fast Flow
+ * method for polling from the queue (with minor change to correctly publish the index) and an extension of
+ * the Leslie Lamport concurrent queue algorithm (originated by Martin Thompson) on the producer side.
+ */
+public class MpscAtomicUnpaddedArrayQueue extends MpscAtomicUnpaddedArrayQueueL3Pad {
+
+ public MpscAtomicUnpaddedArrayQueue(final int capacity) {
+ super(capacity);
+ }
+
+ /**
+ * {@link #offer}} if {@link #size()} is less than threshold.
+ *
+ * @param e the object to offer onto the queue, not null
+ * @param threshold the maximum allowable size
+ * @return true if the offer is successful, false if queue size exceeds threshold
+ * @since 1.0.1
+ */
+ public boolean offerIfBelowThreshold(final E e, int threshold) {
+ if (null == e) {
+ throw new NullPointerException();
+ }
+ final int mask = this.mask;
+ final long capacity = mask + 1;
+ long producerLimit = lvProducerLimit();
+ long pIndex;
+ do {
+ pIndex = lvProducerIndex();
+ long available = producerLimit - pIndex;
+ long size = capacity - available;
+ if (size >= threshold) {
+ final long cIndex = lvConsumerIndex();
+ size = pIndex - cIndex;
+ if (size >= threshold) {
+ // the size exceeds threshold
+ return false;
+ } else {
+ // update producer limit to the next index that we must recheck the consumer index
+ producerLimit = cIndex + capacity;
+ // this is racy, but the race is benign
+ soProducerLimit(producerLimit);
+ }
+ }
+ } while (!casProducerIndex(pIndex, pIndex + 1));
+ /*
+ * NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on
+ * the index visibility to poll() we would need to handle the case where the element is not visible.
+ */
+ // Won CAS, move on to storing
+ final int offset = calcCircularRefElementOffset(pIndex, mask);
+ soRefElement(buffer, offset, e);
+ // AWESOME :)
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * IMPLEMENTATION NOTES:
+ * Lock free offer using a single CAS. As class name suggests access is permitted to many threads
+ * concurrently.
+ *
+ * @see java.util.Queue#offer
+ * @see org.jctools.queues.MessagePassingQueue#offer
+ */
+ @Override
+ public boolean offer(final E e) {
+ if (null == e) {
+ throw new NullPointerException();
+ }
+ // use a cached view on consumer index (potentially updated in loop)
+ final int mask = this.mask;
+ long producerLimit = lvProducerLimit();
+ long pIndex;
+ do {
+ pIndex = lvProducerIndex();
+ if (pIndex >= producerLimit) {
+ final long cIndex = lvConsumerIndex();
+ producerLimit = cIndex + mask + 1;
+ if (pIndex >= producerLimit) {
+ // FULL :(
+ return false;
+ } else {
+ // update producer limit to the next index that we must recheck the consumer index
+ // this is racy, but the race is benign
+ soProducerLimit(producerLimit);
+ }
+ }
+ } while (!casProducerIndex(pIndex, pIndex + 1));
+ /*
+ * NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on
+ * the index visibility to poll() we would need to handle the case where the element is not visible.
+ */
+ // Won CAS, move on to storing
+ final int offset = calcCircularRefElementOffset(pIndex, mask);
+ soRefElement(buffer, offset, e);
+ // AWESOME :)
+ return true;
+ }
+
+ /**
+ * A wait free alternative to offer which fails on CAS failure.
+ *
+ * @param e new element, not null
+ * @return 1 if next element cannot be filled, -1 if CAS failed, 0 if successful
+ */
+ public final int failFastOffer(final E e) {
+ if (null == e) {
+ throw new NullPointerException();
+ }
+ final int mask = this.mask;
+ final long capacity = mask + 1;
+ final long pIndex = lvProducerIndex();
+ long producerLimit = lvProducerLimit();
+ if (pIndex >= producerLimit) {
+ final long cIndex = lvConsumerIndex();
+ producerLimit = cIndex + capacity;
+ if (pIndex >= producerLimit) {
+ // FULL :(
+ return 1;
+ } else {
+ // update producer limit to the next index that we must recheck the consumer index
+ soProducerLimit(producerLimit);
+ }
+ }
+ // look Ma, no loop!
+ if (!casProducerIndex(pIndex, pIndex + 1)) {
+ // CAS FAIL :(
+ return -1;
+ }
+ // Won CAS, move on to storing
+ final int offset = calcCircularRefElementOffset(pIndex, mask);
+ soRefElement(buffer, offset, e);
+ // AWESOME :)
+ return 0;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * IMPLEMENTATION NOTES:
+ * Lock free poll using ordered loads/stores. As class name suggests access is limited to a single thread.
+ *
+ * @see java.util.Queue#poll
+ * @see org.jctools.queues.MessagePassingQueue#poll
+ */
+ @Override
+ public E poll() {
+ final long cIndex = lpConsumerIndex();
+ final int offset = calcCircularRefElementOffset(cIndex, mask);
+ // Copy field to avoid re-reading after volatile load
+ final AtomicReferenceArray buffer = this.buffer;
+ // If we can't see the next available element we can't poll
+ E e = lvRefElement(buffer, offset);
+ if (null == e) {
+ /*
+ * NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
+ * winning the CAS on offer but before storing the element in the queue. Other producers may go on
+ * to fill up the queue after this element.
+ */
+ if (cIndex != lvProducerIndex()) {
+ do {
+ e = lvRefElement(buffer, offset);
+ } while (e == null);
+ } else {
+ return null;
+ }
+ }
+ spRefElement(buffer, offset, null);
+ soConsumerIndex(cIndex + 1);
+ return e;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * IMPLEMENTATION NOTES:
+ * Lock free peek using ordered loads. As class name suggests access is limited to a single thread.
+ *
+ * @see java.util.Queue#poll
+ * @see org.jctools.queues.MessagePassingQueue#poll
+ */
+ @Override
+ public E peek() {
+ // Copy field to avoid re-reading after volatile load
+ final AtomicReferenceArray buffer = this.buffer;
+ final long cIndex = lpConsumerIndex();
+ final int offset = calcCircularRefElementOffset(cIndex, mask);
+ E e = lvRefElement(buffer, offset);
+ if (null == e) {
+ /*
+ * NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
+ * winning the CAS on offer but before storing the element in the queue. Other producers may go on
+ * to fill up the queue after this element.
+ */
+ if (cIndex != lvProducerIndex()) {
+ do {
+ e = lvRefElement(buffer, offset);
+ } while (e == null);
+ } else {
+ return null;
+ }
+ }
+ return e;
+ }
+
+ @Override
+ public boolean relaxedOffer(E e) {
+ return offer(e);
+ }
+
+ @Override
+ public E relaxedPoll() {
+ final AtomicReferenceArray buffer = this.buffer;
+ final long cIndex = lpConsumerIndex();
+ final int offset = calcCircularRefElementOffset(cIndex, mask);
+ // If we can't see the next available element we can't poll
+ E e = lvRefElement(buffer, offset);
+ if (null == e) {
+ return null;
+ }
+ spRefElement(buffer, offset, null);
+ soConsumerIndex(cIndex + 1);
+ return e;
+ }
+
+ @Override
+ public E relaxedPeek() {
+ final AtomicReferenceArray buffer = this.buffer;
+ final int mask = this.mask;
+ final long cIndex = lpConsumerIndex();
+ return lvRefElement(buffer, calcCircularRefElementOffset(cIndex, mask));
+ }
+
+ @Override
+ public int drain(final Consumer c, final int limit) {
+ if (null == c)
+ throw new IllegalArgumentException("c is null");
+ if (limit < 0)
+ throw new IllegalArgumentException("limit is negative: " + limit);
+ if (limit == 0)
+ return 0;
+ final AtomicReferenceArray buffer = this.buffer;
+ final int mask = this.mask;
+ final long cIndex = lpConsumerIndex();
+ for (int i = 0; i < limit; i++) {
+ final long index = cIndex + i;
+ final int offset = calcCircularRefElementOffset(index, mask);
+ final E e = lvRefElement(buffer, offset);
+ if (null == e) {
+ return i;
+ }
+ spRefElement(buffer, offset, null);
+ // ordered store -> atomic and ordered for size()
+ soConsumerIndex(index + 1);
+ c.accept(e);
+ }
+ return limit;
+ }
+
+ @Override
+ public int fill(Supplier s, int limit) {
+ if (null == s)
+ throw new IllegalArgumentException("supplier is null");
+ if (limit < 0)
+ throw new IllegalArgumentException("limit is negative:" + limit);
+ if (limit == 0)
+ return 0;
+ final int mask = this.mask;
+ final long capacity = mask + 1;
+ long producerLimit = lvProducerLimit();
+ long pIndex;
+ int actualLimit;
+ do {
+ pIndex = lvProducerIndex();
+ long available = producerLimit - pIndex;
+ if (available <= 0) {
+ final long cIndex = lvConsumerIndex();
+ producerLimit = cIndex + capacity;
+ available = producerLimit - pIndex;
+ if (available <= 0) {
+ // FULL :(
+ return 0;
+ } else {
+ // update producer limit to the next index that we must recheck the consumer index
+ soProducerLimit(producerLimit);
+ }
+ }
+ actualLimit = Math.min((int) available, limit);
+ } while (!casProducerIndex(pIndex, pIndex + actualLimit));
+ // right, now we claimed a few slots and can fill them with goodness
+ final AtomicReferenceArray buffer = this.buffer;
+ for (int i = 0; i < actualLimit; i++) {
+ // Won CAS, move on to storing
+ final int offset = calcCircularRefElementOffset(pIndex + i, mask);
+ soRefElement(buffer, offset, s.get());
+ }
+ return actualLimit;
+ }
+
+ @Override
+ public int drain(Consumer