From d60366d5cdef9091f7dfacb6713fb7fc2b153516 Mon Sep 17 00:00:00 2001 From: nitsanw Date: Sat, 27 Feb 2016 16:13:14 +0200 Subject: [PATCH] MPMC drain fill need soElement, switching to ordered element store everywhere --- .../org/jctools/queues/MpmcArrayQueue.java | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java index a587e1e9..a900ca1c 100755 --- a/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java @@ -16,7 +16,7 @@ import static org.jctools.util.JvmInfo.CPUs; import static org.jctools.util.UnsafeAccess.UNSAFE; import static org.jctools.util.UnsafeRefArrayAccess.lpElement; -import static org.jctools.util.UnsafeRefArrayAccess.spElement; +import static org.jctools.util.UnsafeRefArrayAccess.soElement; abstract class MpmcArrayQueueL1Pad extends ConcurrentSequencedCircularArrayQueue { long p00, p01, p02, p03, p04, p05, p06, p07; @@ -95,7 +95,7 @@ protected final boolean casConsumerIndex(long expect, long newValue) { * field of the struct. There is a further alternative in the experimental project which uses iteration phase * markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as * well as this implementation.
- * + * * Tradeoffs to keep in mind: *
    *
  1. Padding for false sharing: counter fields and queue fields are all padded as well as either side of @@ -105,7 +105,7 @@ protected final boolean casConsumerIndex(long expect, long newValue) { *
  2. Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or * equal to the requested capacity. *
- * + * * @param * type of the element stored in the {@link java.util.Queue} */ @@ -129,7 +129,7 @@ public boolean offer(final E e) { final long mask = this.mask; final long capacity = mask + 1; final long[] sBuffer = sequenceBuffer; - + long pIndex; long seqOffset; long seq; @@ -151,7 +151,7 @@ public boolean offer(final E e) { !casProducerIndex(pIndex, pIndex + 1)); // failed to increment assert null == lpElement(buffer, calcElementOffset(pIndex, mask)); - spElement(buffer, calcElementOffset(pIndex, mask), e); + soElement(buffer, calcElementOffset(pIndex, mask), e); soSequence(sBuffer, seqOffset, pIndex + 1); // seq++; return true; @@ -168,7 +168,7 @@ public E poll() { // local load of field to avoid repeated loads after volatile reads final long[] sBuffer = sequenceBuffer; final long mask = this.mask; - + long cIndex; long seq; long seqOffset; @@ -190,11 +190,11 @@ public E poll() { } } while (seq > expectedSeq || // another consumer beat us to it !casConsumerIndex(cIndex, cIndex + 1)); // failed the CAS - + final long offset = calcElementOffset(cIndex, mask); final E e = lpElement(buffer, offset); assert e != null; - spElement(buffer, offset, null); + soElement(buffer, offset, null); soSequence(sBuffer, seqOffset, cIndex + mask + 1);// i.e. seq += capacity return e; } @@ -239,17 +239,17 @@ public boolean isEmpty() { // nothing we can do to make this an exact method. return (lvConsumerIndex() == lvProducerIndex()); } - + @Override public long currentProducerIndex() { return lvProducerIndex(); } - + @Override public long currentConsumerIndex() { return lvConsumerIndex(); } - + @Override public boolean relaxedOffer(E e) { if (null == e) { @@ -257,7 +257,7 @@ public boolean relaxedOffer(E e) { } final long mask = this.mask; final long[] sBuffer = sequenceBuffer; - + long pIndex; long seqOffset; long seq; @@ -271,7 +271,7 @@ public boolean relaxedOffer(E e) { } while (seq > pIndex || // another producer has moved the sequence !casProducerIndex(pIndex, pIndex + 1)); // failed to increment - spElement(buffer, calcElementOffset(pIndex, mask), e); + soElement(buffer, calcElementOffset(pIndex, mask), e); soSequence(sBuffer, seqOffset, pIndex + 1); return true; } @@ -280,7 +280,7 @@ public boolean relaxedOffer(E e) { public E relaxedPoll() { final long[] sBuffer = sequenceBuffer; final long mask = this.mask; - + long cIndex; long seqOffset; long seq; @@ -295,10 +295,10 @@ public E relaxedPoll() { } } while (seq > expectedSeq || // another consumer beat us to it !casConsumerIndex(cIndex, cIndex + 1)); // failed the CAS - + final long offset = calcElementOffset(cIndex, mask); final E e = lpElement(buffer, offset); - spElement(buffer, offset, null); + soElement(buffer, offset, null); soSequence(sBuffer, seqOffset, cIndex + mask + 1); return e; } @@ -361,7 +361,7 @@ public int drain(Consumer c, int limit) { final long offset = calcElementOffset(cIndex, mask); final E e = lpElement(buffer, offset); - spElement(buffer, offset, null); + soElement(buffer, offset, null); soSequence(sBuffer, seqOffset, cIndex + mask + 1); c.accept(e); } @@ -388,7 +388,7 @@ public int fill(Supplier s, int limit) { } while (seq > pIndex || // another producer has moved the sequence !casProducerIndex(pIndex, pIndex + 1)); // failed to increment - spElement(buffer, calcElementOffset(pIndex, mask), s.get()); + soElement(buffer, calcElementOffset(pIndex, mask), s.get()); soSequence(sBuffer, seqOffset, pIndex + 1); } return limit; @@ -418,7 +418,7 @@ public void fill(Supplier s, idleCounter = w.idle(idleCounter); continue; } - idleCounter = 0; + idleCounter = 0; } } }