Skip to content

Commit

Permalink
Padded lookAheadStep, producerIndex/producerLimit, consumerIndex from
Browse files Browse the repository at this point in the history
one another
  • Loading branch information
kay committed Aug 8, 2017
1 parent bdc44ee commit aafc977
Showing 1 changed file with 66 additions and 14 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,6 +20,53 @@
import org.jctools.queues.IndexedQueueSizeUtil.IndexedQueue; import org.jctools.queues.IndexedQueueSizeUtil.IndexedQueue;
import org.jctools.queues.QueueProgressIndicators; import org.jctools.queues.QueueProgressIndicators;


abstract class SpscAtomicArrayQueueColdField<E> extends AtomicReferenceArrayQueue<E> {
public static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096);
protected final int lookAheadStep;
public SpscAtomicArrayQueueColdField(int capacity) {
super(capacity);
lookAheadStep = Math.min(capacity() / 4, MAX_LOOK_AHEAD_STEP);
}
}
abstract class SpscAtomicArrayQueueL1Pad<E> extends SpscAtomicArrayQueueColdField<E> {
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;

public SpscAtomicArrayQueueL1Pad(int capacity) {
super(capacity);
}
}

abstract class SpscAtomicArrayQueueProducerFields<E> extends SpscAtomicArrayQueueL1Pad<E> {
protected static final AtomicLongFieldUpdater<SpscAtomicArrayQueueProducerFields> P_INDEX_UPDATER = AtomicLongFieldUpdater
.newUpdater(SpscAtomicArrayQueueProducerFields.class, "producerIndex");
protected volatile long producerIndex;
protected long producerLimit;

public SpscAtomicArrayQueueProducerFields(int capacity) {
super(capacity);
}
}

abstract class SpscAtomicArrayQueueL2Pad<E> extends SpscAtomicArrayQueueProducerFields<E> {
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;

public SpscAtomicArrayQueueL2Pad(int capacity) {
super(capacity);
}
}

abstract class SpscAtomicArrayQueueConsumerField<E> extends SpscAtomicArrayQueueL2Pad<E> {
protected static final AtomicLongFieldUpdater<SpscAtomicArrayQueueConsumerField> C_INDEX_UPDATER = AtomicLongFieldUpdater
.newUpdater(SpscAtomicArrayQueueConsumerField.class, "consumerIndex");
protected volatile long consumerIndex;

public SpscAtomicArrayQueueConsumerField(int capacity) {
super(capacity);
}
}

/** /**
* A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer. * A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer.
* <p> * <p>
Expand All @@ -36,24 +83,19 @@
* *
* @param <E> * @param <E>
*/ */
public final class SpscAtomicArrayQueue<E> extends AtomicReferenceArrayQueue<E> implements IndexedQueue, QueueProgressIndicators { public final class SpscAtomicArrayQueue<E> extends SpscAtomicArrayQueueConsumerField<E> implements IndexedQueue, QueueProgressIndicators {
private static final AtomicLongFieldUpdater<SpscAtomicArrayQueue> P_INDEX_UPDATER = AtomicLongFieldUpdater long p01, p02, p03, p04, p05, p06, p07;
.newUpdater(SpscAtomicArrayQueue.class, "producerIndex"); long p10, p11, p12, p13, p14, p15, p16, p17;
private static final AtomicLongFieldUpdater<SpscAtomicArrayQueue> C_INDEX_UPDATER = AtomicLongFieldUpdater
.newUpdater(SpscAtomicArrayQueue.class, "consumerIndex");

private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096);
volatile long producerIndex;
protected long producerLimit;
volatile long consumerIndex;
final int lookAheadStep;



public SpscAtomicArrayQueue(int capacity) { public SpscAtomicArrayQueue(int capacity) {
super(Math.max(capacity, 4)); super(Math.max(capacity, 4));
lookAheadStep = Math.min(capacity() / 4, MAX_LOOK_AHEAD_STEP);
} }


/**
* {@inheritDoc}
* <p>
* This implementation is correct for single producer thread use only.
*/
@Override @Override
public boolean offer(final E e) { public boolean offer(final E e) {
if (null == e) { if (null == e) {
Expand Down Expand Up @@ -89,6 +131,11 @@ private boolean offerSlowPath(final AtomicReferenceArray<E> buffer, final int ma
return true; return true;
} }


/**
* {@inheritDoc}
* <p>
* This implementation is correct for single consumer thread use only.
*/
@Override @Override
public E poll() { public E poll() {
final long consumerIndex = this.consumerIndex; final long consumerIndex = this.consumerIndex;
Expand All @@ -104,6 +151,11 @@ public E poll() {
return e; return e;
} }


/**
* {@inheritDoc}
* <p>
* This implementation is correct for single consumer thread use only.
*/
@Override @Override
public E peek() { public E peek() {
return lvElement(buffer, calcElementOffset(consumerIndex)); return lvElement(buffer, calcElementOffset(consumerIndex));
Expand All @@ -116,7 +168,7 @@ private void soProducerIndex(long newIndex) {
private void soConsumerIndex(long newIndex) { private void soConsumerIndex(long newIndex) {
C_INDEX_UPDATER.lazySet(this, newIndex); C_INDEX_UPDATER.lazySet(this, newIndex);
} }

@Override @Override
public long lvProducerIndex() { public long lvProducerIndex() {
return producerIndex; return producerIndex;
Expand Down

0 comments on commit aafc977

Please sign in to comment.