Skip to content

Commit

Permalink
Refactored methods to look like those in MpmcArrayQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
kay committed Aug 8, 2017
1 parent f537eda commit 13175e5
Showing 1 changed file with 46 additions and 66 deletions.
Expand Up @@ -35,45 +35,32 @@ public boolean offer(final E e) {
if (null == e) {
throw new NullPointerException();
}

// local load of field to avoid repeated loads after volatile reads
final int mask = this.mask;
final int capacity = mask + 1;
final AtomicLongArray sBuffer = sequenceBuffer;
long currentProducerIndex;

long pIndex;
int seqOffset;
long seq;
long cIndex = Long.MIN_VALUE;// start with bogus value, hope we don't need it
while (true) {
currentProducerIndex = lvProducerIndex(); // LoadLoad
seqOffset = calcSequenceOffset(currentProducerIndex, mask);
final long seq = lvSequence(sBuffer, seqOffset); // LoadLoad
final long delta = seq - currentProducerIndex;

if (delta == 0) {
// this is expected if we see this first time around
if (casProducerIndex(currentProducerIndex, currentProducerIndex + 1)) {
// Successful CAS: full barrier
break;
}
// failed cas, retry 1
} else if (delta < 0 && // poll has not moved this value forward
currentProducerIndex - capacity >= cIndex && // test against cached cIndex
currentProducerIndex - capacity >= (cIndex = lvConsumerIndex())) { // test against latest cIndex
do {
pIndex = lvProducerIndex();
seqOffset = calcSequenceOffset(pIndex, mask);
seq = lvSequence(sBuffer, seqOffset);
if (seq < pIndex) { // consumer has not moved this seq forward, it's as last producer left
// Extra check required to ensure [Queue.offer == false iff queue is full]
return false;
if (pIndex - capacity >= cIndex && // test against cached cIndex
pIndex - capacity >= (cIndex = lvConsumerIndex())) { // test against latest cIndex
return false;
} else {
seq = pIndex + 1; // (+) hack to make it go around again without CAS
}
}
} while (seq > pIndex || // another producer has moved the sequence(or +)
!casProducerIndex(pIndex, pIndex + 1)); // failed to increment

// another producer has moved the sequence by one, retry 2
}

// on 64bit(no compressed oops) JVM this is the same as seqOffset
final int elementOffset = calcElementOffset(currentProducerIndex, mask);
spElement(elementOffset, e);

// increment sequence by 1, the value expected by consumer
// (seeing this value from a producer will lead to retry 2)
soSequence(sBuffer, seqOffset, currentProducerIndex + 1); // StoreStore

soElement(buffer, calcElementOffset(pIndex, mask), e);
soSequence(sBuffer, seqOffset, pIndex + 1); // seq++;
return true;
}

Expand All @@ -86,55 +73,48 @@ public boolean offer(final E e) {
@Override
public E poll() {
// local load of field to avoid repeated loads after volatile reads
final AtomicLongArray lSequenceBuffer = sequenceBuffer;
final AtomicLongArray sBuffer = sequenceBuffer;
final int mask = this.mask;
long currentConsumerIndex;

long cIndex;
long seq;
int seqOffset;
long expectedSeq;
long pIndex = -1; // start with bogus value, hope we don't need it
while (true) {
currentConsumerIndex = lvConsumerIndex();// LoadLoad
seqOffset = calcSequenceOffset(currentConsumerIndex, mask);
final long seq = lvSequence(lSequenceBuffer, seqOffset);// LoadLoad
final long delta = seq - (currentConsumerIndex + 1);

if (delta == 0) {
if (casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1)) {
// Successful CAS: full barrier
break;
do {
cIndex = lvConsumerIndex();
seqOffset = calcSequenceOffset(cIndex, mask);
seq = lvSequence(sBuffer, seqOffset);
expectedSeq = cIndex + 1;
if (seq < expectedSeq) { // slot has not been moved by producer
if (cIndex >= pIndex && // test against cached pIndex
cIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
} else {
seq = expectedSeq + 1; // trip another go around
}
// failed cas, retry 1
} else if (delta < 0 && // slot has not been moved by producer
currentConsumerIndex >= pIndex && // test against cached pIndex
currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}
} while (seq > expectedSeq || // another consumer beat us to it
!casConsumerIndex(cIndex, cIndex + 1)); // failed the CAS

// another consumer beat us and moved sequence ahead, retry 2
}

// on 64bit(no compressed oops) JVM this is the same as seqOffset
final int offset = calcElementOffset(currentConsumerIndex, mask);
final E e = lpElement(offset);
spElement(offset, null);

// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry 2)
soSequence(lSequenceBuffer, seqOffset, currentConsumerIndex + mask + 1);// StoreStore

final int offset = calcElementOffset(cIndex, mask);
final E e = lpElement(buffer, offset);
soElement(buffer, offset, null);
soSequence(sBuffer, seqOffset, cIndex + mask + 1);// i.e. seq += capacity
return e;
}

@Override
public E peek() {
long currConsumerIndex;
long cIndex;
E e;
do {
currConsumerIndex = lvConsumerIndex();
cIndex = lvConsumerIndex();
// other consumers may have grabbed the element, or queue might be empty
e = lpElement(calcElementOffset(currConsumerIndex));
e = lpElement(calcElementOffset(cIndex));
// only return null if queue is empty
} while (e == null && currConsumerIndex != lvProducerIndex());
} while (e == null && cIndex != lvProducerIndex());
return e;
}

Expand Down

0 comments on commit 13175e5

Please sign in to comment.