Skip to content

Commit

Permalink
SPSC variant: Refactored BaseSpscLinkedArrayQueue to be similiar to t…
Browse files Browse the repository at this point in the history
…he unsafe variant
  • Loading branch information
kay committed Aug 14, 2017
1 parent eed855a commit 7ddcf61
Show file tree
Hide file tree
Showing 8 changed files with 342 additions and 249 deletions.
Expand Up @@ -13,6 +13,7 @@
*/
package org.jctools.queues;

import org.jctools.queues.IndexedQueueSizeUtil.IndexedQueue;
import org.jctools.util.Pow2;
import org.jctools.util.RangeUtil;

Expand All @@ -24,15 +25,44 @@
import static org.jctools.util.UnsafeAccess.UNSAFE;
import static org.jctools.util.UnsafeRefArrayAccess.*;

abstract class BaseMpscLinkedArrayQueuePad1<E> extends AbstractQueue<E>
abstract class BaseMpscLinkedArrayQueuePad1<E> extends AbstractQueue<E> implements IndexedQueue
{
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;
}

abstract class BaseMpscLinkedArrayQueueProducerFields<E> extends BaseMpscLinkedArrayQueuePad1<E>
{
private final static long P_INDEX_OFFSET;

static {
try
{
Field iField = BaseMpscLinkedArrayQueueProducerFields.class.getDeclaredField("producerIndex");
P_INDEX_OFFSET = UNSAFE.objectFieldOffset(iField);
}
catch (NoSuchFieldException e)
{
throw new RuntimeException(e);
}
}
protected long producerIndex;

@Override
public final long lvProducerIndex()
{
return UNSAFE.getLongVolatile(this, P_INDEX_OFFSET);
}

final void soProducerIndex(long newValue)
{
UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, newValue);
}

final boolean casProducerIndex(long expect, long newValue)
{
return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
}
}

abstract class BaseMpscLinkedArrayQueuePad2<E> extends BaseMpscLinkedArrayQueueProducerFields<E>
Expand All @@ -43,9 +73,32 @@ abstract class BaseMpscLinkedArrayQueuePad2<E> extends BaseMpscLinkedArrayQueueP

abstract class BaseMpscLinkedArrayQueueConsumerFields<E> extends BaseMpscLinkedArrayQueuePad2<E>
{
private final static long C_INDEX_OFFSET;
static {
try
{
Field iField = BaseMpscLinkedArrayQueueConsumerFields.class.getDeclaredField("consumerIndex");
C_INDEX_OFFSET = UNSAFE.objectFieldOffset(iField);
}
catch (NoSuchFieldException e)
{
throw new RuntimeException(e);
}
}
protected long consumerMask;
protected E[] consumerBuffer;
protected long consumerIndex;

@Override
public final long lvConsumerIndex()
{
return UNSAFE.getLongVolatile(this, C_INDEX_OFFSET);
}

final void soConsumerIndex(long newValue)
{
UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);
}
}

abstract class BaseMpscLinkedArrayQueuePad3<E> extends BaseMpscLinkedArrayQueueConsumerFields<E>
Expand All @@ -56,9 +109,38 @@ abstract class BaseMpscLinkedArrayQueuePad3<E> extends BaseMpscLinkedArrayQueueC

abstract class BaseMpscLinkedArrayQueueColdProducerFields<E> extends BaseMpscLinkedArrayQueuePad3<E>
{
private final static long P_LIMIT_OFFSET;
static
{
try
{
Field iField = BaseMpscLinkedArrayQueueColdProducerFields.class.getDeclaredField("producerLimit");
P_LIMIT_OFFSET = UNSAFE.objectFieldOffset(iField);
}
catch (NoSuchFieldException e)
{
throw new RuntimeException(e);
}
}

protected volatile long producerLimit;
protected long producerMask;
protected E[] producerBuffer;

final long lvProducerLimit()
{
return producerLimit;
}

final boolean casProducerLimit(long expect, long newValue)
{
return UNSAFE.compareAndSwapLong(this, P_LIMIT_OFFSET, expect, newValue);
}

final void soProducerLimit(long newValue)
{
UNSAFE.putOrderedLong(this, P_LIMIT_OFFSET, newValue);
}
}


Expand All @@ -74,42 +156,8 @@ public abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQue
{
// No post padding here, subclasses must add

private final static long P_INDEX_OFFSET;
private final static long C_INDEX_OFFSET;
private final static long P_LIMIT_OFFSET;
private final static Object JUMP = new Object();

static
{
try
{
Field iField = BaseMpscLinkedArrayQueueProducerFields.class.getDeclaredField("producerIndex");
P_INDEX_OFFSET = UNSAFE.objectFieldOffset(iField);
}
catch (NoSuchFieldException e)
{
throw new RuntimeException(e);
}
try
{
Field iField = BaseMpscLinkedArrayQueueConsumerFields.class.getDeclaredField("consumerIndex");
C_INDEX_OFFSET = UNSAFE.objectFieldOffset(iField);
}
catch (NoSuchFieldException e)
{
throw new RuntimeException(e);
}
try
{
Field iField = BaseMpscLinkedArrayQueueColdProducerFields.class.getDeclaredField("producerLimit");
P_LIMIT_OFFSET = UNSAFE.objectFieldOffset(iField);
}
catch (NoSuchFieldException e)
{
throw new RuntimeException(e);
}
}


/**
* @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the chunk size.
Expand Down Expand Up @@ -412,46 +460,6 @@ private long newBufferAndOffset(E[] nextBuffer, long index)
return offsetInNew;
}

private long lvProducerIndex()
{
return UNSAFE.getLongVolatile(this, P_INDEX_OFFSET);
}

private long lvConsumerIndex()
{
return UNSAFE.getLongVolatile(this, C_INDEX_OFFSET);
}

private void soProducerIndex(long v)
{
UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, v);
}

private boolean casProducerIndex(long expect, long newValue)
{
return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
}

private void soConsumerIndex(long v)
{
UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, v);
}

private long lvProducerLimit()
{
return producerLimit;
}

private boolean casProducerLimit(long expect, long newValue)
{
return UNSAFE.compareAndSwapLong(this, P_LIMIT_OFFSET, expect, newValue);
}

private void soProducerLimit(long v)
{
UNSAFE.putOrderedLong(this, P_LIMIT_OFFSET, v);
}

@Override
public long currentProducerIndex()
{
Expand Down

0 comments on commit 7ddcf61

Please sign in to comment.