Skip to content

Commit

Permalink
Merge 3322e9f into 671c0a8
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Nov 5, 2019
2 parents 671c0a8 + 3322e9f commit d30b4d1
Showing 1 changed file with 29 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ private AtomicChunk<E> appendNextChunks(AtomicChunk<E> producerBuffer, long chun
if (newChunk != null)
{
//single-writer: producerBuffer::index == nextChunkIndex is protecting it
assert newChunk.lvIndex() == AtomicChunk.NIL_CHUNK_INDEX;
assert newChunk.lvIndex() < producerBuffer.lvIndex();
newChunk.spPrev(producerBuffer);
//index set is releasing prev, allowing other pending offers to continue
newChunk.soIndex(nextChunkIndex);
Expand Down Expand Up @@ -446,9 +446,6 @@ private E rotateConsumerBuffer(AtomicChunk<E> consumerBuffer, AtomicChunk<E> nex
{
next = consumerBuffer.lvNext();
}
//prevent other consumers to use it, but need to await next != null
//or the producer won't be able to append next on a NIL_CHUNK_INDEX!
consumerBuffer.soIndex(AtomicChunk.NIL_CHUNK_INDEX);
//we can freely spin awaiting producer, because we are the only one in charge to
//rotate the consumer buffer and use next
final E e = spinForElement(next, consumerOffset);
Expand Down Expand Up @@ -523,24 +520,19 @@ public E poll()
else
{
final boolean pooled = consumerBuffer.isPooled();
e = pooled ? null : consumerBuffer.lvElement(consumerOffset);
final long index = pooled ? consumerBuffer.lvSequence(consumerOffset) : consumerBuffer.lvIndex();
if (index != chunkIndex)
{
if (index < chunkIndex)
{
// if pooled:
// a) consumerBuffer::index > chunkIndex: a chunk used in the past or at its first use
// b) consumerBuffer::index < chunkIndex: a rotation is in progress
// c) consumerBuffer::index == chunkIndex: rotation is happened, not yet an element in
// For a) there is no need to check q emptiness, but doing it isn't wrong,
// because the check will fail (ie q isn't empty re consumerIndex):
// consumerBuffer::index > chunkIndex means that others have proceeded consuming new elements.
// For b) and c) the emptiness check is necessary, because if there are no other elements
// poll *must* return null.
//
// if !pooled:
// - the rotation isn't happened yet
if (pooled) {
final long sequence = consumerBuffer.lvSequence(consumerOffset);
if (sequence != chunkIndex) {
if (sequence > chunkIndex) {
//stale view of the world
continue;
}
final long index = consumerBuffer.lvIndex();
if (index > chunkIndex)
{
//stale view of the world
continue;
}
if (consumerIndex >= pIndex && // test against cached pIndex
consumerIndex == (pIndex = lvProducerIndex()))
{ // update pIndex if we must
Expand All @@ -549,24 +541,25 @@ public E poll()
}
continue;
}
else
} else {
e = consumerBuffer.lvElement(consumerOffset);
final long index = consumerBuffer.lvIndex();
if (index != chunkIndex || e == null)
{
//Stale view of the world: retry!
if (index > chunkIndex)
{
//stale view of the world
continue;
}
if (consumerIndex >= pIndex && // test against cached pIndex
consumerIndex == (pIndex = lvProducerIndex()))
{ // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}
continue;
}
}
assert index == chunkIndex;
if (!pooled && e == null)
{
if (consumerIndex >= pIndex && // test against cached pIndex
consumerIndex == (pIndex = lvProducerIndex()))
{ // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}
//we are awaiting the producer here
continue;
}
}
if (casConsumerIndex(consumerIndex, consumerIndex + 1))
{
Expand Down Expand Up @@ -633,7 +626,6 @@ public E peek()
if (consumerBuffer.lvIndex() != chunkIndex)
{
e = null;
continue;
}
}
while (e == null && consumerIndex != lvProducerIndex());
Expand Down

0 comments on commit d30b4d1

Please sign in to comment.