Skip to content

Commit

Permalink
[FLINK-30056][kafka] Use new Consumer#poll(Duration)
Browse files Browse the repository at this point in the history
This changes the blocking behavior, as the previous method essentially ignored the timeout.
  • Loading branch information
snuyanzin authored and zentol committed Nov 25, 2022
1 parent 9d87d33 commit 5e0e0fd
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import javax.annotation.Nonnull;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -258,7 +259,7 @@ public void run() {
// over
if (records == null) {
try {
records = consumer.poll(pollTimeout);
records = consumer.poll(Duration.ofMillis(pollTimeout));
} catch (WakeupException we) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private void assertRecord(String topicName, String expectedKey, String expectedV
kafkaConsumer.subscribe(Collections.singletonList(topicName));
ConsumerRecords<String, String> records = ConsumerRecords.empty();
while (records.isEmpty()) {
records = kafkaConsumer.poll(10000);
records = kafkaConsumer.poll(Duration.ofMillis(10000));
}

ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
Expand Down

0 comments on commit 5e0e0fd

Please sign in to comment.