Skip to content

Commit

Permalink
Rework size utility to reduce code duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
nitsanw committed May 20, 2021
1 parent bcd275f commit 58a2faf
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,37 +205,7 @@ public BaseMpscLinkedArrayQueue(final int initialCapacity)
@Override
public int size()
{
// NOTE: because indices are on even numbers we cannot use the size util.

/*
* It is possible for a thread to be interrupted or reschedule between the read of the producer and
* consumer indices, therefore protection is required to ensure size is within valid range. In the
* event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
* index BEFORE the producer index.
*/
long after = lvConsumerIndex();
long size;
while (true)
{
final long before = after;
final long currentProducerIndex = lvProducerIndex();
after = lvConsumerIndex();
if (before == after)
{
size = ((currentProducerIndex - after) >> 1);
break;
}
}
// Long overflow is impossible, so size is always positive. Integer overflow is possible for the unbounded
// indexed queues.
if (size > Integer.MAX_VALUE)
{
return Integer.MAX_VALUE;
}
else
{
return (int) size;
}
return IndexedQueueSizeUtil.size(this, IndexedQueueSizeUtil.IGNORE_PARITY_DIVISOR);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public final Iterator<E> iterator()
@Override
public final int size()
{
return IndexedQueueSizeUtil.size(this);
return IndexedQueueSizeUtil.size(this, IndexedQueueSizeUtil.PLAIN_DIVISOR);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircularArrayQu
@Override
public int size()
{
return IndexedQueueSizeUtil.size(this);
return IndexedQueueSizeUtil.size(this, IndexedQueueSizeUtil.PLAIN_DIVISOR);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@
@InternalAPI
public final class IndexedQueueSizeUtil
{
public static int size(IndexedQueue iq)
{

public static final int PLAIN_DIVISOR = 1;
public static final int IGNORE_PARITY_DIVISOR = 2;

public static int size(IndexedQueue iq, int divisor) {
/*
* It is possible for a thread to be interrupted or reschedule between the reads of the producer and
* consumer indices. It is also for the indices to be updated in a `weakly` visible way. It follows that
Expand All @@ -51,10 +54,14 @@ public static int size(IndexedQueue iq)
after = iq.lvConsumerIndex();
if (before == after)
{
size = (currentProducerIndex - after);
size = (currentProducerIndex - after) / divisor;
break;
}
}
return sanitizedSize(iq.capacity(), size);
}

public static int sanitizedSize(int capacity, long size) {
// Long overflow is impossible here, so size is always positive. Integer overflow is possible for the unbounded
// indexed queues.
if (size > Integer.MAX_VALUE)
Expand All @@ -67,13 +74,15 @@ else if (size < 0)
{
return 0;
}
else if (iq.capacity() != MessagePassingQueue.UNBOUNDED_CAPACITY && size > iq.capacity())
{
return iq.capacity();
}
else
{
return (int) size;
else {
if (capacity != MessagePassingQueue.UNBOUNDED_CAPACITY && size > capacity)
{
return capacity;
}
else
{
return (int) size;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ public Iterator<E> iterator()
@Override
public int size()
{
return IndexedQueueSizeUtil.size(this);
return IndexedQueueSizeUtil.size(this, IndexedQueueSizeUtil.PLAIN_DIVISOR);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,37 +250,7 @@ public final Iterator<E> iterator()
@Override
public final int size()
{
// NOTE: because indices are on even numbers we cannot use the size util.

/*
* It is possible for a thread to be interrupted or reschedule between the read of the producer and
* consumer indices, therefore protection is required to ensure size is within valid range. In the
* event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
* index BEFORE the producer index.
*/
long after = lvConsumerIndex();
long size;
while (true)
{
final long before = after;
final long currentProducerIndex = lvProducerIndex();
after = lvConsumerIndex();
if (before == after)
{
size = ((currentProducerIndex - after) >> 1);
break;
}
}
// Long overflow is impossible, so size is always positive. Integer overflow is possible for the unbounded
// indexed queues.
if (size > Integer.MAX_VALUE)
{
return Integer.MAX_VALUE;
}
else
{
return (int) size;
}
return IndexedQueueSizeUtil.size(this, IndexedQueueSizeUtil.IGNORE_PARITY_DIVISOR);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public final int capacity()
@Override
public final int size()
{
return IndexedQueueSizeUtil.size(this);
return IndexedQueueSizeUtil.size(this, IndexedQueueSizeUtil.PLAIN_DIVISOR);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,30 +318,7 @@ public BaseMpscLinkedAtomicArrayQueue(final int initialCapacity) {

@Override
public int size() {
// NOTE: because indices are on even numbers we cannot use the size util.
/*
* It is possible for a thread to be interrupted or reschedule between the read of the producer and
* consumer indices, therefore protection is required to ensure size is within valid range. In the
* event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
* index BEFORE the producer index.
*/
long after = lvConsumerIndex();
long size;
while (true) {
final long before = after;
final long currentProducerIndex = lvProducerIndex();
after = lvConsumerIndex();
if (before == after) {
size = ((currentProducerIndex - after) >> 1);
break;
}
}
// indexed queues.
if (size > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
} else {
return (int) size;
}
return IndexedQueueSizeUtil.size(this, IndexedQueueSizeUtil.IGNORE_PARITY_DIVISOR);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public final Iterator<E> iterator() {

@Override
public final int size() {
return IndexedQueueSizeUtil.size(this);
return IndexedQueueSizeUtil.size(this, IndexedQueueSizeUtil.PLAIN_DIVISOR);
}

@Override
Expand Down

0 comments on commit 58a2faf

Please sign in to comment.