Skip to content

Commit

Permalink
Fix #355 by adhering to size semantic for queue emptiness
Browse files Browse the repository at this point in the history
  • Loading branch information
nitsanw committed Jan 5, 2022
1 parent 466822b commit 6e2a486
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 42 deletions.
Expand Up @@ -215,7 +215,7 @@ public boolean isEmpty()
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
// nothing we can do to make this an exact method.
return (this.lvConsumerIndex() == this.lvProducerIndex());
return ((lvConsumerIndex() - lvProducerIndex()) / 2 == 0);
}

@Override
Expand Down Expand Up @@ -298,21 +298,19 @@ public E poll()
Object e = lvRefElement(buffer, offset);
if (e == null)
{
if (cIndex != lvProducerIndex())
long pIndex = lvProducerIndex();
// isEmpty?
if ((cIndex - pIndex) / 2 == 0)
{
// poll() == null iff queue is empty, null element is not strong enough indicator, so we must
// check the producer index. If the queue is indeed not empty we spin until element is
// visible.
do
{
e = lvRefElement(buffer, offset);
}
while (e == null);
return null;
}
else
// poll() == null iff queue is empty, null element is not strong enough indicator, so we must
// spin until element is visible.
do
{
return null;
e = lvRefElement(buffer, offset);
}
while (e == null);
}

if (e == JUMP)
Expand Down Expand Up @@ -341,10 +339,16 @@ public E peek()

final long offset = modifiedCalcCircularRefElementOffset(cIndex, mask);
Object e = lvRefElement(buffer, offset);
if (e == null && cIndex != lvProducerIndex())
if (e == null)
{
long pIndex = lvProducerIndex();
// isEmpty?
if ((cIndex - pIndex) / 2 == 0)
{
return null;
}
// peek() == null iff queue is empty, null element is not strong enough indicator, so we must
// check the producer index. If the queue is indeed not empty we spin until element is visible.
// spin until element is visible.
do
{
e = lvRefElement(buffer, offset);
Expand Down
Expand Up @@ -324,7 +324,7 @@ public int size() {
@Override
public boolean isEmpty() {
// nothing we can do to make this an exact method.
return (this.lvConsumerIndex() == this.lvProducerIndex());
return ((lvConsumerIndex() - lvProducerIndex()) / 2 == 0);
}

@Override
Expand Down Expand Up @@ -386,28 +386,29 @@ public boolean offer(final E e) {
@Override
public E poll() {
final AtomicReferenceArray<E> buffer = consumerBuffer;
final long index = lpConsumerIndex();
final long cIndex = lpConsumerIndex();
final long mask = consumerMask;
final int offset = modifiedCalcCircularRefElementOffset(index, mask);
final int offset = modifiedCalcCircularRefElementOffset(cIndex, mask);
Object e = lvRefElement(buffer, offset);
if (e == null) {
if (index != lvProducerIndex()) {
// visible.
do {
e = lvRefElement(buffer, offset);
} while (e == null);
} else {
long pIndex = lvProducerIndex();
// isEmpty?
if ((cIndex - pIndex) / 2 == 0) {
return null;
}
// spin until element is visible.
do {
e = lvRefElement(buffer, offset);
} while (e == null);
}
if (e == JUMP) {
final AtomicReferenceArray<E> nextBuffer = nextBuffer(buffer, mask);
return newBufferPoll(nextBuffer, index);
return newBufferPoll(nextBuffer, cIndex);
}
// release element null
soRefElement(buffer, offset, null);
// release cIndex
soConsumerIndex(index + 2);
soConsumerIndex(cIndex + 2);
return (E) e;
}

Expand All @@ -420,18 +421,23 @@ public E poll() {
@Override
public E peek() {
final AtomicReferenceArray<E> buffer = consumerBuffer;
final long index = lpConsumerIndex();
final long cIndex = lpConsumerIndex();
final long mask = consumerMask;
final int offset = modifiedCalcCircularRefElementOffset(index, mask);
final int offset = modifiedCalcCircularRefElementOffset(cIndex, mask);
Object e = lvRefElement(buffer, offset);
if (e == null && index != lvProducerIndex()) {
// check the producer index. If the queue is indeed not empty we spin until element is visible.
if (e == null) {
long pIndex = lvProducerIndex();
// isEmpty?
if ((cIndex - pIndex) / 2 == 0) {
return null;
}
// spin until element is visible.
do {
e = lvRefElement(buffer, offset);
} while (e == null);
}
if (e == JUMP) {
return newBufferPeek(nextBuffer(buffer, mask), index);
return newBufferPeek(nextBuffer(buffer, mask), cIndex);
}
return (E) e;
}
Expand Down Expand Up @@ -483,19 +489,19 @@ private static int nextArrayOffset(long mask) {
return modifiedCalcCircularRefElementOffset(mask + 2, Long.MAX_VALUE);
}

private E newBufferPoll(AtomicReferenceArray<E> nextBuffer, long index) {
final int offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
private E newBufferPoll(AtomicReferenceArray<E> nextBuffer, long cIndex) {
final int offset = modifiedCalcCircularRefElementOffset(cIndex, consumerMask);
final E n = lvRefElement(nextBuffer, offset);
if (n == null) {
throw new IllegalStateException("new buffer must have at least one element");
}
soRefElement(nextBuffer, offset, null);
soConsumerIndex(index + 2);
soConsumerIndex(cIndex + 2);
return n;
}

private E newBufferPeek(AtomicReferenceArray<E> nextBuffer, long index) {
final int offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
private E newBufferPeek(AtomicReferenceArray<E> nextBuffer, long cIndex) {
final int offset = modifiedCalcCircularRefElementOffset(cIndex, consumerMask);
final E n = lvRefElement(nextBuffer, offset);
if (null == n) {
throw new IllegalStateException("new buffer must have at least one element");
Expand Down Expand Up @@ -525,32 +531,32 @@ public boolean relaxedOffer(E e) {
@Override
public E relaxedPoll() {
final AtomicReferenceArray<E> buffer = consumerBuffer;
final long index = lpConsumerIndex();
final long cIndex = lpConsumerIndex();
final long mask = consumerMask;
final int offset = modifiedCalcCircularRefElementOffset(index, mask);
final int offset = modifiedCalcCircularRefElementOffset(cIndex, mask);
Object e = lvRefElement(buffer, offset);
if (e == null) {
return null;
}
if (e == JUMP) {
final AtomicReferenceArray<E> nextBuffer = nextBuffer(buffer, mask);
return newBufferPoll(nextBuffer, index);
return newBufferPoll(nextBuffer, cIndex);
}
soRefElement(buffer, offset, null);
soConsumerIndex(index + 2);
soConsumerIndex(cIndex + 2);
return (E) e;
}

@SuppressWarnings("unchecked")
@Override
public E relaxedPeek() {
final AtomicReferenceArray<E> buffer = consumerBuffer;
final long index = lpConsumerIndex();
final long cIndex = lpConsumerIndex();
final long mask = consumerMask;
final int offset = modifiedCalcCircularRefElementOffset(index, mask);
final int offset = modifiedCalcCircularRefElementOffset(cIndex, mask);
Object e = lvRefElement(buffer, offset);
if (e == JUMP) {
return newBufferPeek(nextBuffer(buffer, mask), index);
return newBufferPeek(nextBuffer(buffer, mask), cIndex);
}
return (E) e;
}
Expand Down

0 comments on commit 6e2a486

Please sign in to comment.