Skip to content

Commit

Permalink
MPMC drain fill need soElement, switching to ordered element store
Browse files Browse the repository at this point in the history
everywhere
  • Loading branch information
nitsanw committed Feb 27, 2016
1 parent 51b9c52 commit d60366d
Showing 1 changed file with 19 additions and 19 deletions.
38 changes: 19 additions & 19 deletions jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java
Expand Up @@ -16,7 +16,7 @@
import static org.jctools.util.JvmInfo.CPUs;
import static org.jctools.util.UnsafeAccess.UNSAFE;
import static org.jctools.util.UnsafeRefArrayAccess.lpElement;
import static org.jctools.util.UnsafeRefArrayAccess.spElement;
import static org.jctools.util.UnsafeRefArrayAccess.soElement;

abstract class MpmcArrayQueueL1Pad<E> extends ConcurrentSequencedCircularArrayQueue<E> {
long p00, p01, p02, p03, p04, p05, p06, p07;
Expand Down Expand Up @@ -95,7 +95,7 @@ protected final boolean casConsumerIndex(long expect, long newValue) {
* field of the struct. There is a further alternative in the experimental project which uses iteration phase
* markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as
* well as this implementation.<br>
*
*
* Tradeoffs to keep in mind:
* <ol>
* <li>Padding for false sharing: counter fields and queue fields are all padded as well as either side of
Expand All @@ -105,7 +105,7 @@ protected final boolean casConsumerIndex(long expect, long newValue) {
* <li>Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or
* equal to the requested capacity.
* </ol>
*
*
* @param <E>
* type of the element stored in the {@link java.util.Queue}
*/
Expand All @@ -129,7 +129,7 @@ public boolean offer(final E e) {
final long mask = this.mask;
final long capacity = mask + 1;
final long[] sBuffer = sequenceBuffer;

long pIndex;
long seqOffset;
long seq;
Expand All @@ -151,7 +151,7 @@ public boolean offer(final E e) {
!casProducerIndex(pIndex, pIndex + 1)); // failed to increment

assert null == lpElement(buffer, calcElementOffset(pIndex, mask));
spElement(buffer, calcElementOffset(pIndex, mask), e);
soElement(buffer, calcElementOffset(pIndex, mask), e);
soSequence(sBuffer, seqOffset, pIndex + 1); // seq++;
return true;

Expand All @@ -168,7 +168,7 @@ public E poll() {
// local load of field to avoid repeated loads after volatile reads
final long[] sBuffer = sequenceBuffer;
final long mask = this.mask;

long cIndex;
long seq;
long seqOffset;
Expand All @@ -190,11 +190,11 @@ public E poll() {
}
} while (seq > expectedSeq || // another consumer beat us to it
!casConsumerIndex(cIndex, cIndex + 1)); // failed the CAS

final long offset = calcElementOffset(cIndex, mask);
final E e = lpElement(buffer, offset);
assert e != null;
spElement(buffer, offset, null);
soElement(buffer, offset, null);
soSequence(sBuffer, seqOffset, cIndex + mask + 1);// i.e. seq += capacity
return e;
}
Expand Down Expand Up @@ -239,25 +239,25 @@ public boolean isEmpty() {
// nothing we can do to make this an exact method.
return (lvConsumerIndex() == lvProducerIndex());
}

@Override
public long currentProducerIndex() {
return lvProducerIndex();
}

@Override
public long currentConsumerIndex() {
return lvConsumerIndex();
}

@Override
public boolean relaxedOffer(E e) {
if (null == e) {
throw new NullPointerException("Null is not a valid element");
}
final long mask = this.mask;
final long[] sBuffer = sequenceBuffer;

long pIndex;
long seqOffset;
long seq;
Expand All @@ -271,7 +271,7 @@ public boolean relaxedOffer(E e) {
} while (seq > pIndex || // another producer has moved the sequence
!casProducerIndex(pIndex, pIndex + 1)); // failed to increment

spElement(buffer, calcElementOffset(pIndex, mask), e);
soElement(buffer, calcElementOffset(pIndex, mask), e);
soSequence(sBuffer, seqOffset, pIndex + 1);
return true;
}
Expand All @@ -280,7 +280,7 @@ public boolean relaxedOffer(E e) {
public E relaxedPoll() {
final long[] sBuffer = sequenceBuffer;
final long mask = this.mask;

long cIndex;
long seqOffset;
long seq;
Expand All @@ -295,10 +295,10 @@ public E relaxedPoll() {
}
} while (seq > expectedSeq || // another consumer beat us to it
!casConsumerIndex(cIndex, cIndex + 1)); // failed the CAS

final long offset = calcElementOffset(cIndex, mask);
final E e = lpElement(buffer, offset);
spElement(buffer, offset, null);
soElement(buffer, offset, null);
soSequence(sBuffer, seqOffset, cIndex + mask + 1);
return e;
}
Expand Down Expand Up @@ -361,7 +361,7 @@ public int drain(Consumer<E> c, int limit) {

final long offset = calcElementOffset(cIndex, mask);
final E e = lpElement(buffer, offset);
spElement(buffer, offset, null);
soElement(buffer, offset, null);
soSequence(sBuffer, seqOffset, cIndex + mask + 1);
c.accept(e);
}
Expand All @@ -388,7 +388,7 @@ public int fill(Supplier<E> s, int limit) {
} while (seq > pIndex || // another producer has moved the sequence
!casProducerIndex(pIndex, pIndex + 1)); // failed to increment

spElement(buffer, calcElementOffset(pIndex, mask), s.get());
soElement(buffer, calcElementOffset(pIndex, mask), s.get());
soSequence(sBuffer, seqOffset, pIndex + 1);
}
return limit;
Expand Down Expand Up @@ -418,7 +418,7 @@ public void fill(Supplier<E> s,
idleCounter = w.idle(idleCounter);
continue;
}
idleCounter = 0;
idleCounter = 0;
}
}
}

0 comments on commit d60366d

Please sign in to comment.