Skip to content

Commit

Permalink
Merge pull request apache#14728 from ihji/cherry-pick-12118
Browse files Browse the repository at this point in the history
[BEAM-12262] Cherry-pick BEAM-12118
  • Loading branch information
ihji committed May 5, 2021
2 parents fbd09de + efa150f commit ae3525b
Showing 1 changed file with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,22 @@ boolean offer(ConsumerAndData<?> e, long l, TimeUnit t) throws InterruptedExcept
// empty in which case it returns null.
@Nullable
ConsumerAndData<?> take() throws InterruptedException {
// We first poll without blocking to optimize for the case there is data.
// If there is no data we end up blocking on take() and thus the extra
// poll doesn't matter.
@Nullable ConsumerAndData<?> result = queue.poll();
if (result == null) {
if (closed.get()) {
return null;
// Poll again to ensure that there is nothing in the queue. Once we observe closed as true
// we are guaranteed no additional elements other than the POISON will be added. However
// we can't rely on the previous poll result as it could race with additional offers and
// close.
result = queue.poll();
} else {
// We are not closed so we perform a blocking take. We are guaranteed that additional
// elements will be offered or the POISON will be added by close to unblock this thread.
result = queue.take();
}
result = queue.take();
}
if (result == POISON) {
return null;
Expand Down

0 comments on commit ae3525b

Please sign in to comment.