Skip to content

Commit

Permalink
Fixes #337 Provide blocking peeks for MpscBlockingConsumerArrayQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Mar 3, 2021
1 parent 3539dfa commit fb1bbdf
Showing 1 changed file with 39 additions and 4 deletions.
Expand Up @@ -468,6 +468,20 @@ private void wakeupConsumer()
LockSupport.unpark(consumerThread);
}

public E blockedPeek() throws InterruptedException
{
final E[] buffer = consumerBuffer;
final long mask = consumerMask;

final long cIndex = lpConsumerIndex();
final long offset = modifiedCalcCircularRefElementOffset(cIndex, mask);
final E e = lvRefElement(buffer, offset);
if (e != null) {
return e;
}
return parkUntilNext(buffer, cIndex, offset, Long.MAX_VALUE, true);
}

/**
* {@inheritDoc}
* <p>
Expand All @@ -483,7 +497,7 @@ public E take() throws InterruptedException
E e = lvRefElement(buffer, offset);
if (e == null)
{
return parkUntilNext(buffer, cIndex, offset, Long.MAX_VALUE);
return parkUntilNext(buffer, cIndex, offset, Long.MAX_VALUE, false);
}

soRefElement(buffer, offset, null); // release element null
Expand All @@ -492,6 +506,25 @@ public E take() throws InterruptedException
return e;
}

public E blockedPeek(long timeout, TimeUnit unit) throws InterruptedException
{
final E[] buffer = consumerBuffer;
final long mask = consumerMask;

final long cIndex = lpConsumerIndex();
final long offset = modifiedCalcCircularRefElementOffset(cIndex, mask);
final E e = lvRefElement(buffer, offset);
if (e != null) {
return e;
}
long timeoutNs = unit.toNanos(timeout);
if (timeoutNs <= 0)
{
return null;
}
return parkUntilNext(buffer, cIndex, offset, timeoutNs, true);
}

/**
* {@inheritDoc}
* <p>
Expand All @@ -512,7 +545,7 @@ public E poll(long timeout, TimeUnit unit) throws InterruptedException
{
return null;
}
return parkUntilNext(buffer, cIndex, offset, timeoutNs);
return parkUntilNext(buffer, cIndex, offset, timeoutNs, false);
}

soRefElement(buffer, offset, null); // release element null
Expand All @@ -521,7 +554,7 @@ public E poll(long timeout, TimeUnit unit) throws InterruptedException
return e;
}

private E parkUntilNext(E[] buffer, long cIndex, long offset, long timeoutNs) throws InterruptedException {
private E parkUntilNext(E[] buffer, long cIndex, long offset, long timeoutNs, boolean peek) throws InterruptedException {
E e;
final long pIndex = lvProducerIndex();
if (cIndex == pIndex && // queue is empty
Expand Down Expand Up @@ -567,7 +600,9 @@ private E parkUntilNext(E[] buffer, long cIndex, long offset, long timeoutNs) th
// producer index is visible before element, so if we wake up between the index moving and the element
// store we could see a null.
e = spinWaitForElement(buffer, offset);

if (peek) {
return e;
}
soRefElement(buffer, offset, null); // release element null
soConsumerIndex(cIndex + 2); // release cIndex

Expand Down

0 comments on commit fb1bbdf

Please sign in to comment.