Skip to content

Commit

Permalink
Improve mpmc xadd q poll and peek
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Oct 27, 2019
1 parent 7b4246b commit 7aa5fe9
Showing 1 changed file with 85 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import static org.jctools.util.UnsafeAccess.UNSAFE;
import static org.jctools.util.UnsafeAccess.fieldOffset;
import static org.jctools.util.UnsafeRefArrayAccess.lpElement;


abstract class MpmcProgressiveChunkedQueuePad1<E> extends AbstractQueue<E> implements IndexedQueue
Expand Down Expand Up @@ -437,20 +438,38 @@ private static <E> E spinForElement(AtomicChunk<E> chunk, int offset)
return e;
}

private void rotateConsumerBuffer(AtomicChunk<E> consumerBuffer, AtomicChunk<E> next)
private E rotateConsumerBuffer(AtomicChunk<E> consumerBuffer, AtomicChunk<E> next, int consumerOffset, long expectedChunkIndex)
{
while (next == null)
{
next = consumerBuffer.lvNext();
}
//prevent other consumers to use it, but need to await next != null
//or the producer won't be able to append next on a NIL_CHUNK_INDEX!
consumerBuffer.soIndex(AtomicChunk.NIL_CHUNK_INDEX);
//we can freely spin awaiting producer, because we are the only one in charge to
//rotate the consumer buffer and use next
final E e = spinForElement(next, consumerOffset);
final boolean pooled = next.isPooled();
if (pooled)
{
while (next.lvSequence(consumerOffset) != expectedChunkIndex)
{

}
}
next.soElement(consumerOffset, null);
next.spPrev(null);
//save from nepotism
consumerBuffer.spNext(null);
//prevent other consumers to use it
consumerBuffer.soIndex(AtomicChunk.NIL_CHUNK_INDEX);
if (consumerBuffer.isPooled())
{
final boolean pooled = freeBuffer.offer(consumerBuffer);
assert pooled;
final boolean offered = freeBuffer.offer(consumerBuffer);
assert offered;
}
//expose next to the other consumers
soConsumerBuffer(next);
return e;
}

@Override
Expand All @@ -465,6 +484,7 @@ public E poll()
boolean firstElementOfNewChunk;
E e = null;
AtomicChunk<E> next = null;
long pIndex = -1; // start with bogus value, hope we don't need it
long chunkIndex;
do
{
Expand All @@ -477,63 +497,74 @@ public E poll()
{
next = consumerBuffer.lvNext();
final long expectedChunkIndex = chunkIndex - 1;
//we don't care about < or >, because if:
//- consumerBuffer::index < expectedChunkIndex: another consumer has rotated consumerBuffer,
// but not reused (yet, if possible)
//- consumerBuffer::index > expectedChunkIndex: another consumer has rotated consumerBuffer,
// that has been pooled and reused again or
//In both cases we have a stale view of the world with a not reliable next value.
if (expectedChunkIndex != consumerBuffer.lvIndex())
{
//another consumer has already rotated the consumer buffer or is yet to rotating it
continue;
}
if (next == null)
{
if (lvProducerIndex() == consumerIndex)
{
if (consumerIndex >= pIndex && // test against cached pIndex
consumerIndex == (pIndex = lvProducerIndex()))
{ // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}
//if another consumer rotate consumerBuffer, its chunkIndex will change
while ((next = consumerBuffer.lvNext()) == null)
{
if (expectedChunkIndex != consumerBuffer.lvIndex())
{
//another consumer has already rotated the consumer buffer
break;
}
}
if (next == null)
{
continue;
}
//not empty: can attempt the cas
}
}
else
{
if (consumerBuffer.isPooled())
final boolean pooled = consumerBuffer.isPooled();
e = pooled ? null : consumerBuffer.lvElement(consumerOffset);
final long index = pooled ? consumerBuffer.lvSequence(consumerOffset) : consumerBuffer.lvIndex();
if (index != chunkIndex)
{
if (consumerBuffer.lvSequence(consumerOffset) != chunkIndex)
if (index < chunkIndex)
{
if (lvProducerIndex() == consumerIndex)
{
// if pooled:
// a) consumerBuffer::index > chunkIndex: a chunk used in the past or at its first use
// b) consumerBuffer::index < chunkIndex: a rotation is in progress
// c) consumerBuffer::index == chunkIndex: rotation is happened, not yet an element in
// For a) there is no need to check q emptiness, but doing it isn't wrong,
// because the check will fail (ie q isn't empty re consumerIndex):
// consumerBuffer::index > chunkIndex means that others have proceeded consuming new elements.
// For b) and c) the emptiness check is necessary, because if there are no other elements
// poll *must* return null.
//
// if !pooled:
// - the rotation isn't happened yet
if (consumerIndex >= pIndex && // test against cached pIndex
consumerIndex == (pIndex = lvProducerIndex()))
{ // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}
continue;
}
}
else
{
e = consumerBuffer.lvElement(consumerOffset);
if (chunkIndex != consumerBuffer.lvIndex())
else
{
//another consumer has already rotated the consumer buffer or is yet to rotating it
//Stale view of the world: retry!
continue;
}
if (e == null)
{
if (lvProducerIndex() == consumerIndex)
{
return null;
}
//if the buffer is not empty, another consumer could have already
//stolen it, incrementing consumerIndex too: better to check if it has happened
continue;
}
assert index == chunkIndex;
if (!pooled && e == null)
{
if (consumerIndex >= pIndex && // test against cached pIndex
consumerIndex == (pIndex = lvProducerIndex()))
{ // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}
//we are awaiting the producer here
continue;
}
}
if (casConsumerIndex(consumerIndex, consumerIndex + 1))
Expand All @@ -545,19 +576,7 @@ public E poll()
//if we are the firstElementOfNewChunk we need to rotate the consumer buffer
if (firstElementOfNewChunk)
{
//we can freely spin awaiting producer, because we are the only one in charge to
//rotate the consumer buffer and using next
e = spinForElement(next, consumerOffset);
final boolean pooled = next.isPooled();
if (pooled)
{
while (next.lvSequence(consumerOffset) != chunkIndex)
{

}
}
next.soElement(consumerOffset, null);
rotateConsumerBuffer(consumerBuffer, next);
e = rotateConsumerBuffer(consumerBuffer, next, consumerOffset, chunkIndex);
}
else
{
Expand All @@ -577,74 +596,47 @@ public E peek()
{
final int chunkMask = this.chunkMask;
final int chunkShift = this.chunkShift;
final int chunkSize = chunkMask + 1;
long consumerIndex;
AtomicChunk<E> consumerBuffer;
int consumerOffset;
boolean firstElementOfNewChunk;
E e;
AtomicChunk<E> next;
do
{
e = null;
consumerIndex = this.lvConsumerIndex();
consumerBuffer = this.lvConsumerBuffer();
AtomicChunk<E> consumerBuffer = this.lvConsumerBuffer();
final int consumerOffset = (int) (consumerIndex & chunkMask);
final long chunkIndex = consumerIndex >> chunkShift;
consumerOffset = (int) (consumerIndex & chunkMask);
final int chunkSize = chunkMask + 1;
firstElementOfNewChunk = consumerOffset == 0 && consumerIndex >= chunkSize;
final boolean firstElementOfNewChunk = consumerOffset == 0 && consumerIndex >= chunkSize;
if (firstElementOfNewChunk)
{
AtomicChunk<E> next = consumerBuffer.lvNext();
final long expectedChunkIndex = chunkIndex - 1;
next = consumerBuffer.lvNext();
if (expectedChunkIndex != consumerBuffer.lvIndex())
{
//another consumer has already rotated the consumer buffer or is yet to rotating it
continue;
}
if (next == null)
{
if (lvProducerIndex() == consumerIndex)
{
return null;
}
//if another consumer rotate consumerBuffer, its chunkIndex will change
while ((next = consumerBuffer.lvNext()) == null)
{
if (expectedChunkIndex != consumerBuffer.lvIndex())
{
//another consumer has already rotated the consumer buffer
break;
}
}
if (next == null)
{
continue;
}
continue;
}
consumerBuffer = next;
}
e = consumerBuffer.lvElement(consumerOffset);
if (e != null)
if (consumerBuffer.isPooled())
{
//validate the element read
if (chunkIndex != consumerBuffer.lvIndex())
if (consumerBuffer.lvSequence(consumerOffset) != chunkIndex)
{
//another consumer has already rotated the consumer buffer or is yet to rotating it
continue;
}
return e;
}
else
e = consumerBuffer.lvElement(consumerOffset);
if (consumerBuffer.lvIndex() != chunkIndex)
{
if (lvProducerIndex() == consumerIndex)
{
return null;
}
//if the q is not empty, another consumer could have already
//stolen it, incrementing consumerIndex too: better to check if it has happened
e = null;
continue;
}
}
while (true);
while (e == null && consumerIndex != lvProducerIndex());
return e;
}

@Override
Expand Down

0 comments on commit 7aa5fe9

Please sign in to comment.