Skip to content

Commit

Permalink
Replaced the existing SpscUnboundedAtomicArrayQueue with a newer vers…
Browse files Browse the repository at this point in the history
…ion after converting the existing SpscUnboundedArrayQueue into an Atomic version
  • Loading branch information
neomatrix369 committed May 11, 2017
1 parent 7d7d68f commit c0d44b8
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 230 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -13,245 +13,48 @@
*/ */
package org.jctools.queues.atomic; package org.jctools.queues.atomic;


import java.util.AbstractQueue;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;

import org.jctools.queues.QueueProgressIndicators;
import org.jctools.util.Pow2; import org.jctools.util.Pow2;


public class SpscUnboundedAtomicArrayQueue<E> extends AbstractQueue<E> implements QueueProgressIndicators{ import java.util.concurrent.atomic.AtomicReferenceArray;
static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096);
protected final AtomicLong producerIndex;
protected int producerLookAheadStep;
protected long producerLookAhead;
protected int producerMask;
protected AtomicReferenceArray<Object> producerBuffer;
protected int consumerMask;
protected AtomicReferenceArray<Object> consumerBuffer;
protected final AtomicLong consumerIndex;
private static final Object HAS_NEXT = new Object();


public SpscUnboundedAtomicArrayQueue(final int chunkSize) { public class SpscUnboundedAtomicArrayQueue<E> extends BaseSpscLinkedAtomicArrayQueue<E> {
int p2ChunkSize = Math.max(Pow2.roundToPowerOfTwo(chunkSize), 16);


int mask = p2ChunkSize - 1; public SpscUnboundedAtomicArrayQueue(final int chunkSize) {
AtomicReferenceArray<Object> buffer = new AtomicReferenceArray<Object>(p2ChunkSize + 1); int chunkCapacity = Math.max(Pow2.roundToPowerOfTwo(chunkSize), 16);
long mask = chunkCapacity - 1;
AtomicReferenceArray<E> buffer = allocate(chunkCapacity + 1);
producerBuffer = buffer; producerBuffer = buffer;
producerMask = mask; producerMask = mask;
adjustLookAheadStep(p2ChunkSize);
consumerBuffer = buffer; consumerBuffer = buffer;
consumerMask = mask; consumerMask = mask;
producerLookAhead = mask - 1; // we know it's all empty to start with producerBufferLimit = mask - 1; // we know it's all empty to start with
producerIndex = new AtomicLong();
consumerIndex = new AtomicLong();
soProducerIndex(0L); soProducerIndex(0L);
} }


@Override @Override
public final Iterator<E> iterator() { protected boolean offerColdPath(AtomicReferenceArray<E> buffer, long mask, E e, long pIndex, int offset) {
throw new UnsupportedOperationException(); // use a fixed lookahead step based on buffer capacity
} final long lookAheadStep = (mask + 1) / 4;

long pBufferLimit = pIndex + lookAheadStep;

@Override
public String toString() {
return this.getClass().getName();
}


/** // go around the buffer or add a new buffer
* {@inheritDoc} if (null == lvElement(buffer, calcElementOffset(pBufferLimit, mask))) {
* <p> producerBufferLimit = pBufferLimit - 1; // joy, there's plenty of room
* This implementation is correct for single producer thread use only. writeToQueue(buffer, e, pIndex, offset);
*/
@Override
public final boolean offer(final E e) {
if (null == e) {
throw new NullPointerException();
} }
// local load of field to avoid repeated loads after volatile reads else if (null == lvElement(buffer, calcElementOffset(pIndex + 1, mask))) { // buffer is not full
final AtomicReferenceArray<Object> buffer = producerBuffer; writeToQueue(buffer, e, pIndex, offset);
final long index = lpProducerIndex();
final int mask = producerMask;
final int offset = calcWrappedOffset(index, mask);
if (index < producerLookAhead) {
return writeToQueue(buffer, e, index, offset);
} else {
final int lookAheadStep = producerLookAheadStep;
// go around the buffer or resize if full (unless we hit max capacity)
int lookAheadElementOffset = calcWrappedOffset(index + lookAheadStep, mask);
if (null == lvElement(buffer, lookAheadElementOffset)) {// LoadLoad
producerLookAhead = index + lookAheadStep - 1; // joy, there's plenty of room
return writeToQueue(buffer, e, index, offset);
} else if (null == lvElement(buffer, calcWrappedOffset(index + 1, mask))) { // buffer is not full
return writeToQueue(buffer, e, index, offset);
} else {
resize(buffer, index, offset, e, mask); // add a buffer and link old to new
return true;
}
} }
} else {

// we got one slot left to write into, and we are not full. Need to link new buffer.
private boolean writeToQueue(final AtomicReferenceArray<Object> buffer, final E e, final long index, final int offset) { // allocate new buffer of same length
soElement(buffer, offset, e);// StoreStore final AtomicReferenceArray<E> newBuffer = allocate((int)(mask + 2));
soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms producerBuffer = newBuffer;
return true; producerBufferLimit = pIndex + mask - 1;
}

linkOldToNew(pIndex, buffer, offset, newBuffer, offset, e);
private void resize(final AtomicReferenceArray<Object> oldBuffer, final long currIndex, final int offset, final E e,
final long mask) {
final int capacity = oldBuffer.length();
final AtomicReferenceArray<Object> newBuffer = new AtomicReferenceArray<Object>(capacity);
producerBuffer = newBuffer;
producerLookAhead = currIndex + mask - 1;
soElement(newBuffer, offset, e);// StoreStore
soNext(oldBuffer, newBuffer);
soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is inserted
soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms
}

private void soNext(AtomicReferenceArray<Object> curr, AtomicReferenceArray<Object> next) {
soElement(curr, calcDirectOffset(curr.length() - 1), next);
}
@SuppressWarnings("unchecked")
private AtomicReferenceArray<Object> lvNext(AtomicReferenceArray<Object> curr) {
return (AtomicReferenceArray<Object>)lvElement(curr, calcDirectOffset(curr.length() - 1));
}
/**
* {@inheritDoc}
* <p>
* This implementation is correct for single consumer thread use only.
*/
@SuppressWarnings("unchecked")
@Override
public final E poll() {
// local load of field to avoid repeated loads after volatile reads
final AtomicReferenceArray<Object> buffer = consumerBuffer;
final long index = lpConsumerIndex();
final int mask = consumerMask;
final int offset = calcWrappedOffset(index, mask);
final Object e = lvElement(buffer, offset);// LoadLoad
boolean isNextBuffer = e == HAS_NEXT;
if (null != e && !isNextBuffer) {
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
soElement(buffer, offset, null);// StoreStore
return (E) e;
} else if (isNextBuffer) {
return newBufferPoll(buffer, index, mask);
} }

return true;
return null;
}

@SuppressWarnings("unchecked")
private E newBufferPoll(AtomicReferenceArray<Object> buffer, final long index, final int mask) {
AtomicReferenceArray<Object> nextBuffer = lvNext(buffer);
consumerBuffer = nextBuffer;
final int offsetInNew = calcWrappedOffset(index, mask);
final E n = (E) lvElement(nextBuffer, offsetInNew);// LoadLoad
soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
soElement(nextBuffer, offsetInNew, null);// StoreStore
// prevent extended retention if the buffer is in old gen and the nextBuffer is in young gen
soNext(buffer, null);
return n;
}

/**
* {@inheritDoc}
* <p>
* This implementation is correct for single consumer thread use only.
*/
@SuppressWarnings("unchecked")
@Override
public final E peek() {
final AtomicReferenceArray<Object> buffer = consumerBuffer;
final long index = lpConsumerIndex();
final int mask = consumerMask;
final int offset = calcWrappedOffset(index, mask);
final Object e = lvElement(buffer, offset);// LoadLoad
if (e == HAS_NEXT) {
return newBufferPeek(lvNext(buffer), index, mask);
}

return (E) e;
}

@SuppressWarnings("unchecked")
private E newBufferPeek(AtomicReferenceArray<Object> nextBuffer, final long index, final int mask) {
consumerBuffer = nextBuffer;
final int offsetInNew = calcWrappedOffset(index, mask);
return (E) lvElement(nextBuffer, offsetInNew);// LoadLoad
}

@Override
public final int size() {
/*
* It is possible for a thread to be interrupted or reschedule between the read of the producer and
* consumer indices, therefore protection is required to ensure size is within valid range. In the
* event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
* index BEFORE the producer index.
*/
long after = lvConsumerIndex();
while (true) {
final long before = after;
final long currentProducerIndex = lvProducerIndex();
after = lvConsumerIndex();
if (before == after) {
return (int) (currentProducerIndex - after);
}
}
}

private void adjustLookAheadStep(int capacity) {
producerLookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP);
}

private long lvProducerIndex() {
return producerIndex.get();
}

private long lvConsumerIndex() {
return consumerIndex.get();
}

private long lpProducerIndex() {
return producerIndex.get();
}

private long lpConsumerIndex() {
return consumerIndex.get();
}

private void soProducerIndex(long v) {
producerIndex.lazySet(v);
}

private void soConsumerIndex(long v) {
consumerIndex.lazySet(v);
}

private static int calcWrappedOffset(long index, int mask) {
return calcDirectOffset((int)index & mask);
}
private static int calcDirectOffset(int index) {
return index;
}
private static void soElement(AtomicReferenceArray<Object> buffer, int offset, Object e) {
buffer.lazySet(offset, e);
}

private static <E> Object lvElement(AtomicReferenceArray<Object> buffer, int offset) {
return buffer.get(offset);
}

@Override
public long currentProducerIndex() {
return lvProducerIndex();
} }


@Override
public long currentConsumerIndex() {
return lvConsumerIndex();
}
} }
Original file line number Original file line Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.jctools.queues; package org.jctools.queues;


import org.jctools.queues.atomic.AtomicQueueFactory; import org.jctools.queues.atomic.AtomicQueueFactory;
import org.jctools.queues.atomic.SpscUnboundedAtomicArrayQueue;
import org.jctools.queues.spec.ConcurrentQueueSpec; import org.jctools.queues.spec.ConcurrentQueueSpec;
import org.jctools.queues.spec.Ordering; import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference; import org.jctools.queues.spec.Preference;
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -1,17 +1,17 @@
package org.jctools.queues.atomic; package org.jctools.queues.atomic;


import static org.jctools.util.JvmInfo.CPUs;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Queue;

import org.jctools.queues.QueueSanityTest; import org.jctools.queues.QueueSanityTest;
import org.jctools.queues.spec.ConcurrentQueueSpec; import org.jctools.queues.spec.ConcurrentQueueSpec;
import org.jctools.queues.spec.Ordering; import org.jctools.queues.spec.Ordering;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;


import java.util.ArrayList;
import java.util.Collection;
import java.util.Queue;

import static org.jctools.util.JvmInfo.CPUs;

@RunWith(Parameterized.class) @RunWith(Parameterized.class)


public class AtomicQueueSanityTest extends QueueSanityTest { public class AtomicQueueSanityTest extends QueueSanityTest {
Expand Down

0 comments on commit c0d44b8

Please sign in to comment.