Skip to content

Commit

Permalink
Protect the producer index in case of OutOfMemoryError (atomic q)
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Apr 7, 2019
1 parent 9f745a0 commit b617919
Showing 1 changed file with 13 additions and 5 deletions.
Expand Up @@ -257,7 +257,7 @@ public boolean offer(final E e) {
case QUEUE_FULL: case QUEUE_FULL:
return false; return false;
case QUEUE_RESIZE: case QUEUE_RESIZE:
resize(mask, buffer, pIndex, e); resize(mask, buffer, pIndex, e, null);
return true; return true;
} }
} }
Expand Down Expand Up @@ -504,7 +504,7 @@ public int fill(Supplier<E> s, int batchSize) {
case QUEUE_FULL: case QUEUE_FULL:
return 0; return 0;
case QUEUE_RESIZE: case QUEUE_RESIZE:
resize(mask, buffer, pIndex, s.get()); resize(mask, buffer, pIndex, null, s);
return 1; return 1;
} }
} }
Expand Down Expand Up @@ -639,16 +639,24 @@ private E getNext() {
} }
} }


private void resize(long oldMask, AtomicReferenceArray<E> oldBuffer, long pIndex, E e) { private void resize(long oldMask, AtomicReferenceArray<E> oldBuffer, long pIndex, E e, Supplier<E> s) {
assert (e != null && s == null) || (e == null || s != null);
int newBufferLength = getNextBufferSize(oldBuffer); int newBufferLength = getNextBufferSize(oldBuffer);
final AtomicReferenceArray<E> newBuffer = allocate(newBufferLength); final AtomicReferenceArray<E> newBuffer;
try {
newBuffer = allocate(newBufferLength);
} catch (OutOfMemoryError oom) {
assert lvProducerIndex() == pIndex + 1;
soProducerIndex(pIndex);
throw oom;
}
producerBuffer = newBuffer; producerBuffer = newBuffer;
final int newMask = (newBufferLength - 2) << 1; final int newMask = (newBufferLength - 2) << 1;
producerMask = newMask; producerMask = newMask;
final int offsetInOld = modifiedCalcElementOffset(pIndex, oldMask); final int offsetInOld = modifiedCalcElementOffset(pIndex, oldMask);
final int offsetInNew = modifiedCalcElementOffset(pIndex, newMask); final int offsetInNew = modifiedCalcElementOffset(pIndex, newMask);
// element in new array // element in new array
soElement(newBuffer, offsetInNew, e); soElement(newBuffer, offsetInNew, e == null ? s.get() : e);
// buffer linked // buffer linked
soElement(oldBuffer, nextArrayOffset(oldMask), newBuffer); soElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);
// ASSERT code // ASSERT code
Expand Down

0 comments on commit b617919

Please sign in to comment.