Skip to content

Commit

Permalink
fix(backPressure): fix a race condition that may block enqueue.
Browse files Browse the repository at this point in the history
  • Loading branch information
PhantomThief committed Oct 12, 2020
1 parent a4d4dee commit dd54b63
Showing 1 changed file with 22 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,23 @@ public void enqueue(E element) {
long currentCount = counter.get();
long thisMaxBufferCount = maxBufferCount.getAsLong();
if (thisMaxBufferCount > 0 && currentCount >= thisMaxBufferCount) {
boolean pass = fireRejectHandler(element);
boolean pass = false;
if (rejectHandler != null) {
if (writeLock != null && writeCondition != null) {
writeLock.lock();
}
try {
currentCount = counter.get();
thisMaxBufferCount = maxBufferCount.getAsLong();
if (thisMaxBufferCount > 0 && currentCount >= thisMaxBufferCount) {
pass = fireRejectHandler(element);
}
} finally {
if (writeLock != null && writeCondition != null) {
writeLock.unlock();
}
}
}
if (!pass) {
return;
}
Expand Down Expand Up @@ -167,23 +183,12 @@ public void enqueue(E element) {
}

private boolean fireRejectHandler(E element) {
boolean pass = false;
if (rejectHandler != null) {
if (writeLock != null && writeCondition != null) {
writeLock.lock();
}
try {
pass = rejectHandler.onReject(element, writeCondition);
} catch (Throwable e) {
throwIfUnchecked(e);
throw new RuntimeException(e);
} finally {
if (writeLock != null && writeCondition != null) {
writeLock.unlock();
}
}
try {
return rejectHandler.onReject(element, writeCondition);
} catch (Throwable e) {
throwIfUnchecked(e);
throw new RuntimeException(e);
}
return pass;
}

@Override
Expand Down

0 comments on commit dd54b63

Please sign in to comment.