Skip to content

Commit

Permalink
RM-39: Backport #766 to 5.4.x.
Browse files Browse the repository at this point in the history
Backport #766 to 5.4.x. Changes can't be cleanly cherrypicked because many things changed in 5.4.0 and 5.5.0 (files moved around and stuff).
  • Loading branch information
rigelbm committed Nov 10, 2020
1 parent ce41560 commit 8bafcb7
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 230 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class KafkaConsumerReadTask<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> {

private static final Logger log = LoggerFactory.getLogger(KafkaConsumerReadTask.class);

private KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> parent;
private final KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> parent;
private final long requestTimeoutMs;
// the minimum bytes the task should accumulate
// before returning a response (or hitting the timeout)
Expand Down Expand Up @@ -138,11 +138,18 @@ public boolean isDone() {
*/
private void addRecords() {
while (!exceededMinResponseBytes && !exceededMaxResponseBytes && parent.hasNext()) {
maybeAddRecord();
synchronized (parent) {
if (parent.hasNext()) {
maybeAddRecord();
}
}
}
while (!exceededMaxResponseBytes && parent.hasNextCached()) {
// will not call poll() anymore. Continue draining loaded records
maybeAddRecord();
synchronized (parent) {
if (parent.hasNextCached()) {
maybeAddRecord();
}
}
}
}

Expand Down

0 comments on commit 8bafcb7

Please sign in to comment.