Skip to content

Commit

Permalink
Linked queues: Refactoring node construction and variable names
Browse files Browse the repository at this point in the history
Construction of a new LinkedQueueNode or new LinkedQueueAtomicNode is done in the base class

Some parameter variable names homogenized
  • Loading branch information
kay committed Aug 14, 2017
1 parent 26693ce commit 8f5c9ae
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 36 deletions.
16 changes: 12 additions & 4 deletions jctools-core/src/main/java/org/jctools/queues/BaseLinkedQueue.java
Expand Up @@ -39,8 +39,8 @@ abstract class BaseLinkedQueueProducerNodeRef<E> extends BaseLinkedQueuePad0<E>


protected LinkedQueueNode<E> producerNode; protected LinkedQueueNode<E> producerNode;


protected final void spProducerNode(LinkedQueueNode<E> node) { protected final void spProducerNode(LinkedQueueNode<E> newValue) {
producerNode = node; producerNode = newValue;
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Expand Down Expand Up @@ -73,8 +73,8 @@ abstract class BaseLinkedQueueConsumerNodeRef<E> extends BaseLinkedQueuePad1<E>


protected LinkedQueueNode<E> consumerNode; protected LinkedQueueNode<E> consumerNode;


protected final void spConsumerNode(LinkedQueueNode<E> node) { protected final void spConsumerNode(LinkedQueueNode<E> newValue) {
consumerNode = node; consumerNode = newValue;
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Expand Down Expand Up @@ -112,6 +112,14 @@ public String toString() {
return this.getClass().getName(); return this.getClass().getName();
} }


protected final LinkedQueueNode<E> newNode() {
return new LinkedQueueNode<E>();
}

protected final LinkedQueueNode<E> newNode(E e) {
return new LinkedQueueNode<E>(e);
}

/** /**
* {@inheritDoc} <br> * {@inheritDoc} <br>
* <p> * <p>
Expand Down
Expand Up @@ -47,7 +47,7 @@ public static <E> MpscLinkedQueue<E> newMpscLinkedQueue() {
} }


protected MpscLinkedQueue() { protected MpscLinkedQueue() {
LinkedQueueNode<E> node = new LinkedQueueNode<E>(); LinkedQueueNode<E> node = newNode();
spConsumerNode(node); spConsumerNode(node);
xchgProducerNode(node);// this ensures correct construction: StoreLoad xchgProducerNode(node);// this ensures correct construction: StoreLoad
} }
Expand Down Expand Up @@ -75,7 +75,7 @@ public final boolean offer(final E e) {
if (null == e) { if (null == e) {
throw new NullPointerException(); throw new NullPointerException();
} }
final LinkedQueueNode<E> nextNode = new LinkedQueueNode<E>(e); final LinkedQueueNode<E> nextNode = newNode(e);
final LinkedQueueNode<E> prevProducerNode = xchgProducerNode(nextNode); final LinkedQueueNode<E> prevProducerNode = xchgProducerNode(nextNode);
// Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed // Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed
// and completes the store in prev.next. // and completes the store in prev.next.
Expand Down Expand Up @@ -106,8 +106,7 @@ public final E poll() {
LinkedQueueNode<E> nextNode = currConsumerNode.lvNext(); LinkedQueueNode<E> nextNode = currConsumerNode.lvNext();
if (nextNode != null) { if (nextNode != null) {
return getSingleConsumerNodeValue(currConsumerNode, nextNode); return getSingleConsumerNodeValue(currConsumerNode, nextNode);
} } else if (currConsumerNode != lvProducerNode()) {
else if (currConsumerNode != lvProducerNode()) {
// spin, we are no longer wait free // spin, we are no longer wait free
while ((nextNode = currConsumerNode.lvNext()) == null); while ((nextNode = currConsumerNode.lvNext()) == null);
// got the next node... // got the next node...
Expand All @@ -123,8 +122,7 @@ public final E peek() {
LinkedQueueNode<E> nextNode = currConsumerNode.lvNext(); LinkedQueueNode<E> nextNode = currConsumerNode.lvNext();
if (nextNode != null) { if (nextNode != null) {
return nextNode.lpValue(); return nextNode.lpValue();
} } else if (currConsumerNode != lvProducerNode()) {
else if (currConsumerNode != lvProducerNode()) {
// spin, we are no longer wait free // spin, we are no longer wait free
while ((nextNode = currConsumerNode.lvNext()) == null); while ((nextNode = currConsumerNode.lvNext()) == null);
// got the next node... // got the next node...
Expand All @@ -146,10 +144,10 @@ public int fill(Supplier<E> s) {
@Override @Override
public int fill(Supplier<E> s, int limit) { public int fill(Supplier<E> s, int limit) {
if (limit == 0) return 0; if (limit == 0) return 0;
LinkedQueueNode<E> tail = new LinkedQueueNode<E>(s.get()); LinkedQueueNode<E> tail = newNode(s.get());
final LinkedQueueNode<E> head = tail; final LinkedQueueNode<E> head = tail;
for (int i = 1; i < limit; i++) { for (int i = 1; i < limit; i++) {
final LinkedQueueNode<E> temp = new LinkedQueueNode<E>(s.get()); final LinkedQueueNode<E> temp = newNode(s.get());
tail.soNext(temp); tail.soNext(temp);
tail = temp; tail = temp;
} }
Expand Down
Expand Up @@ -32,7 +32,7 @@
public class SpscLinkedQueue<E> extends BaseLinkedQueue<E> { public class SpscLinkedQueue<E> extends BaseLinkedQueue<E> {


public SpscLinkedQueue() { public SpscLinkedQueue() {
LinkedQueueNode<E> node = new LinkedQueueNode<E>(); LinkedQueueNode<E> node = newNode();
spProducerNode(node); spProducerNode(node);
spConsumerNode(node); spConsumerNode(node);
node.soNext(null); // this ensures correct construction: StoreStore node.soNext(null); // this ensures correct construction: StoreStore
Expand All @@ -58,7 +58,7 @@ public boolean offer(final E e) {
if (null == e) { if (null == e) {
throw new NullPointerException(); throw new NullPointerException();
} }
final LinkedQueueNode<E> nextNode = new LinkedQueueNode<E>(e); final LinkedQueueNode<E> nextNode = newNode(e);
lpProducerNode().soNext(nextNode); lpProducerNode().soNext(nextNode);
spProducerNode(nextNode); spProducerNode(nextNode);
return true; return true;
Expand Down Expand Up @@ -102,10 +102,10 @@ public int fill(Supplier<E> s) {
@Override @Override
public int fill(Supplier<E> s, int limit) { public int fill(Supplier<E> s, int limit) {
if (limit == 0) return 0; if (limit == 0) return 0;
LinkedQueueNode<E> tail = new LinkedQueueNode<E>(s.get()); LinkedQueueNode<E> tail = newNode(s.get());
final LinkedQueueNode<E> head = tail; final LinkedQueueNode<E> head = tail;
for (int i = 1; i < limit; i++) { for (int i = 1; i < limit; i++) {
final LinkedQueueNode<E> temp = new LinkedQueueNode<E>(s.get()); final LinkedQueueNode<E> temp = newNode(s.get());
tail.soNext(temp); tail.soNext(temp);
tail = temp; tail = temp;
} }
Expand All @@ -115,13 +115,12 @@ public int fill(Supplier<E> s, int limit) {
return limit; return limit;
} }



@Override @Override
public void fill(Supplier<E> s, WaitStrategy wait, ExitCondition exit) { public void fill(Supplier<E> s, WaitStrategy wait, ExitCondition exit) {
LinkedQueueNode<E> chaserNode = producerNode; LinkedQueueNode<E> chaserNode = producerNode;
while (exit.keepRunning()) { while (exit.keepRunning()) {
for (int i = 0; i < 4096; i++) { for (int i = 0; i < 4096; i++) {
final LinkedQueueNode<E> nextNode = new LinkedQueueNode<E>(s.get()); final LinkedQueueNode<E> nextNode = newNode(s.get());
chaserNode.soNext(nextNode); chaserNode.soNext(nextNode);
chaserNode = nextNode; chaserNode = nextNode;
this.producerNode = chaserNode; this.producerNode = chaserNode;
Expand Down
Expand Up @@ -17,8 +17,6 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;


import org.jctools.queues.MessagePassingQueue;

abstract class BaseLinkedAtomicQueuePad0<E> extends AbstractQueue<E> { abstract class BaseLinkedAtomicQueuePad0<E> extends AbstractQueue<E> {
long p00, p01, p02, p03, p04, p05, p06, p07; long p00, p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16; long p10, p11, p12, p13, p14, p15, p16;
Expand All @@ -29,8 +27,8 @@ abstract class BaseLinkedAtomicQueueProducerNodeRef<E> extends BaseLinkedAtomicQ


protected volatile LinkedQueueAtomicNode<E> producerNode; protected volatile LinkedQueueAtomicNode<E> producerNode;


protected final void spProducerNode(LinkedQueueAtomicNode<E> node) { protected final void spProducerNode(LinkedQueueAtomicNode<E> newValue) {
P_NODE_UPDATER.lazySet(this, node); P_NODE_UPDATER.lazySet(this, newValue);
} }


protected final LinkedQueueAtomicNode<E> lvProducerNode() { protected final LinkedQueueAtomicNode<E> lvProducerNode() {
Expand All @@ -41,8 +39,8 @@ protected final LinkedQueueAtomicNode<E> lpProducerNode() {
return lvProducerNode(); return lvProducerNode();
} }


protected final LinkedQueueAtomicNode<E> xchgProducerNode(LinkedQueueAtomicNode<E> node) { protected final LinkedQueueAtomicNode<E> xchgProducerNode(LinkedQueueAtomicNode<E> newValue) {
return P_NODE_UPDATER.getAndSet(this, node); return P_NODE_UPDATER.getAndSet(this, newValue);
} }
} }


Expand All @@ -56,8 +54,8 @@ abstract class BaseLinkedAtomicQueueConsumerNodeRef<E> extends BaseLinkedAtomicQ


protected volatile LinkedQueueAtomicNode<E> consumerNode; protected volatile LinkedQueueAtomicNode<E> consumerNode;


protected final void spConsumerNode(LinkedQueueAtomicNode<E> node) { protected final void spConsumerNode(LinkedQueueAtomicNode<E> newValue) {
C_NODE_UPDATER.lazySet(this, node); C_NODE_UPDATER.lazySet(this, newValue);
} }


protected final LinkedQueueAtomicNode<E> lvConsumerNode() { protected final LinkedQueueAtomicNode<E> lvConsumerNode() {
Expand Down Expand Up @@ -86,6 +84,14 @@ public String toString() {
return this.getClass().getName(); return this.getClass().getName();
} }


protected final LinkedQueueAtomicNode<E> newNode() {
return new LinkedQueueAtomicNode<E>();
}

protected final LinkedQueueAtomicNode<E> newNode(E e) {
return new LinkedQueueAtomicNode<E>(e);
}

public final int size() { public final int size() {
// Read consumer first, this is important because if the producer is node is 'older' than the consumer // Read consumer first, this is important because if the producer is node is 'older' than the consumer
// the consumer may overtake it (consume past it). This will lead to an infinite loop below. // the consumer may overtake it (consume past it). This will lead to an infinite loop below.
Expand Down Expand Up @@ -116,8 +122,6 @@ public final int size() {
* Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to * Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to
* observe the producerNode.value is null, which also means an empty queue because only the * observe the producerNode.value is null, which also means an empty queue because only the
* consumerNode.value is allowed to be null. * consumerNode.value is allowed to be null.
*
* @see MessagePassingQueue#isEmpty()
*/ */
@Override @Override
public final boolean isEmpty() { public final boolean isEmpty() {
Expand Down
Expand Up @@ -30,7 +30,7 @@
public final class MpscLinkedAtomicQueue<E> extends BaseLinkedAtomicQueue<E> { public final class MpscLinkedAtomicQueue<E> extends BaseLinkedAtomicQueue<E> {


public MpscLinkedAtomicQueue() { public MpscLinkedAtomicQueue() {
LinkedQueueAtomicNode<E> node = new LinkedQueueAtomicNode<E>(); LinkedQueueAtomicNode<E> node = newNode();
spConsumerNode(node); spConsumerNode(node);
xchgProducerNode(node);// this ensures correct construction: StoreLoad xchgProducerNode(node);// this ensures correct construction: StoreLoad
} }
Expand All @@ -55,7 +55,7 @@ public final boolean offer(final E e) {
if (null == e) { if (null == e) {
throw new NullPointerException(); throw new NullPointerException();
} }
final LinkedQueueAtomicNode<E> nextNode = new LinkedQueueAtomicNode<E>(e); final LinkedQueueAtomicNode<E> nextNode = newNode(e);
final LinkedQueueAtomicNode<E> prevProducerNode = xchgProducerNode(nextNode); final LinkedQueueAtomicNode<E> prevProducerNode = xchgProducerNode(nextNode);
// Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed // Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed
// and completes the store in prev.next. // and completes the store in prev.next.
Expand Down Expand Up @@ -86,8 +86,7 @@ public final E poll() {
LinkedQueueAtomicNode<E> nextNode = currConsumerNode.lvNext(); LinkedQueueAtomicNode<E> nextNode = currConsumerNode.lvNext();
if (nextNode != null) { if (nextNode != null) {
return getSingleConsumerNodeValue(currConsumerNode, nextNode); return getSingleConsumerNodeValue(currConsumerNode, nextNode);
} } else if (currConsumerNode != lvProducerNode()) {
else if (currConsumerNode != lvProducerNode()) {
// spin, we are no longer wait free // spin, we are no longer wait free
while ((nextNode = currConsumerNode.lvNext()) == null); while ((nextNode = currConsumerNode.lvNext()) == null);
// got the next node... // got the next node...
Expand All @@ -103,8 +102,7 @@ public final E peek() {
LinkedQueueAtomicNode<E> nextNode = currConsumerNode.lvNext(); LinkedQueueAtomicNode<E> nextNode = currConsumerNode.lvNext();
if (nextNode != null) { if (nextNode != null) {
return nextNode.lpValue(); return nextNode.lpValue();
} } else if (currConsumerNode != lvProducerNode()) {
else if (currConsumerNode != lvProducerNode()) {
// spin, we are no longer wait free // spin, we are no longer wait free
while ((nextNode = currConsumerNode.lvNext()) == null); while ((nextNode = currConsumerNode.lvNext()) == null);
// got the next node... // got the next node...
Expand Down
Expand Up @@ -31,7 +31,7 @@
public final class SpscLinkedAtomicQueue<E> extends BaseLinkedAtomicQueue<E> { public final class SpscLinkedAtomicQueue<E> extends BaseLinkedAtomicQueue<E> {


public SpscLinkedAtomicQueue() { public SpscLinkedAtomicQueue() {
LinkedQueueAtomicNode<E> node = new LinkedQueueAtomicNode<E>(); LinkedQueueAtomicNode<E> node = newNode();
spProducerNode(node); spProducerNode(node);
spConsumerNode(node); spConsumerNode(node);
node.soNext(null); // this ensures correct construction: StoreStore node.soNext(null); // this ensures correct construction: StoreStore
Expand All @@ -57,7 +57,7 @@ public boolean offer(final E e) {
if (null == e) { if (null == e) {
throw new NullPointerException(); throw new NullPointerException();
} }
final LinkedQueueAtomicNode<E> nextNode = new LinkedQueueAtomicNode<E>(e); final LinkedQueueAtomicNode<E> nextNode = newNode(e);
lpProducerNode().soNext(nextNode); lpProducerNode().soNext(nextNode);
spProducerNode(nextNode); spProducerNode(nextNode);
return true; return true;
Expand Down

0 comments on commit 8f5c9ae

Please sign in to comment.