Skip to content

Commit

Permalink
Minor touches or review
Browse files Browse the repository at this point in the history
  • Loading branch information
nitsanw committed Oct 23, 2016
1 parent 7c6943c commit 40ff381
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 41 deletions.
Expand Up @@ -20,8 +20,8 @@
import org.jctools.util.Pow2;

public class SpscChunkedArrayQueue<E> extends BaseSpscLinkedArrayQueue<E> {
private int maxQueueCapacity; // ignored by the unbounded implementation
private long producerQueueLimit;// ignored by the unbounded implementation
private int maxQueueCapacity;
private long producerQueueLimit;

public SpscChunkedArrayQueue(final int capacity) {
this(Math.max(8, Pow2.roundToPowerOfTwo(capacity / 8)), capacity);
Expand All @@ -45,7 +45,7 @@ public SpscChunkedArrayQueue(final int chunkSize, final int capacity) {

long mask = chunkCapacity - 1;
// need extra element to point at next array
E[] buffer = allocate(chunkCapacity+1);
E[] buffer = allocate(chunkCapacity + 1);
producerBuffer = buffer;
producerMask = mask;
consumerBuffer = buffer;
Expand Down Expand Up @@ -89,17 +89,13 @@ else if (null == lvElement(buffer, calcElementOffset(pIndex + 1, mask))) { // bu
}
else {
// we got one slot left to write into, and we are not full. Need to link new buffer.
linkNewBuffer(buffer, pIndex, offset, e, mask);
// allocate new buffer of same length
final E[] newBuffer = allocate((int)(mask + 2));
producerBuffer = newBuffer;

linkOldToNew(pIndex, buffer, offset, newBuffer, offset, e);
}
return true;
}

protected void linkNewBuffer(final E[] oldBuffer, final long currIndex, final long offset, final E e,
final long mask) {
// allocate new buffer of same length
final E[] newBuffer = allocate((int)(mask + 2));
producerBuffer = newBuffer;

linkOldToNew(currIndex, oldBuffer, offset, newBuffer, offset, e);
}
}
Expand Up @@ -44,7 +44,7 @@ public SpscGrowableArrayQueue(final int chunkSize, final int capacity) {

long mask = chunkCapacity - 1;
// need extra element to point at next array
E[] buffer = allocate(chunkCapacity+1);
E[] buffer = allocate(chunkCapacity + 1);
producerBuffer = buffer;
producerMask = mask;
consumerBuffer = buffer;
Expand Down Expand Up @@ -80,7 +80,14 @@ protected final boolean offerColdPath(final E[] buffer, final long mask, final E
if (null == lvElement(buffer, calcElementOffset(index + 1, mask))) { // buffer is not full
writeToQueue(buffer, e, index, offset);
} else {
linkNewBuffer(buffer, index, offset, e, mask);
// allocate new buffer of same length
final E[] newBuffer = allocate((int) (2*(mask +1) + 1));

producerBuffer = newBuffer;
producerMask = newBuffer.length - 2;

final long offsetInNew = calcElementOffset(index, producerMask);
linkOldToNew(index, buffer, offset, newBuffer, offsetInNew, e);
int newCapacity = (int) (producerMask + 1);
if (newCapacity == maxCapacity) {
long currConsumerIndex = lvConsumerIndex();
Expand Down Expand Up @@ -128,16 +135,4 @@ protected final boolean offerColdPath(final E[] buffer, final long mask, final E
private void adjustLookAheadStep(int capacity) {
lookAheadStep = Math.min(capacity / 4, SpscArrayQueue.MAX_LOOK_AHEAD_STEP);
}

private void linkNewBuffer(final E[] oldBuffer, final long currIndex, final long offset, final E e,
final long mask) {
// allocate new buffer of same length
final E[] newBuffer = allocate((int) (2*(mask+1) + 1));

producerBuffer = newBuffer;
producerMask = newBuffer.length - 2;

final long offsetInNew = calcElementOffset(currIndex, producerMask);
linkOldToNew(currIndex, oldBuffer, offset, newBuffer, offsetInNew, e);
}
}
Expand Up @@ -13,18 +13,18 @@
*/
package org.jctools.queues;

import static org.jctools.queues.CircularArrayOffsetCalculator.allocate;
import static org.jctools.queues.CircularArrayOffsetCalculator.calcElementOffset;
import static org.jctools.util.UnsafeRefArrayAccess.lvElement;

import org.jctools.util.Pow2;

public class SpscUnboundedArrayQueue<E> extends BaseSpscLinkedArrayQueue<E> {

@SuppressWarnings("unchecked")
public SpscUnboundedArrayQueue(final int chunkSize) {
int p2capacity = Math.max(Pow2.roundToPowerOfTwo(chunkSize), 16);
long mask = p2capacity - 1;
E[] buffer = (E[]) new Object[p2capacity + 1];
int chunkCapacity = Math.max(Pow2.roundToPowerOfTwo(chunkSize), 16);
long mask = chunkCapacity - 1;
E[] buffer = allocate(chunkCapacity + 1);
producerBuffer = buffer;
producerMask = mask;
consumerBuffer = buffer;
Expand All @@ -49,19 +49,14 @@ else if (null == lvElement(buffer, calcElementOffset(pIndex + 1, mask))) { // bu
}
else {
// we got one slot left to write into, and we are not full. Need to link new buffer.
linkNewBuffer(buffer, pIndex, offset, e, mask);
// allocate new buffer of same length
final E[] newBuffer = allocate((int)(mask + 2));
producerBuffer = newBuffer;
producerBufferLimit = pIndex + mask - 1;

linkOldToNew(pIndex, buffer, offset, newBuffer, offset, e);
}
return true;
}

@SuppressWarnings("unchecked")
private void linkNewBuffer(final E[] oldBuffer, final long currIndex, final long offset, final E e,
final long mask) {
// allocate new buffer of same length
final E[] newBuffer = (E[]) new Object[oldBuffer.length];
producerBuffer = newBuffer;
producerBufferLimit = currIndex + mask - 1;

linkOldToNew(currIndex, oldBuffer, offset, newBuffer, offset, e);
}
}

0 comments on commit 40ff381

Please sign in to comment.