From 5c3f9288c065362fc23933bc629cc8adbb2e5148 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 13 Jun 2016 15:32:19 -0700 Subject: [PATCH 1/2] Wait for Elements to be fetched in KafkaIO#start This makes it more likely that the reader has elements after the call to start returns. --- .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index d540a8df1518..8511bde2b5df 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -759,6 +759,8 @@ private static class UnboundedKafkaReader extends UnboundedReader curBatch = Collections.emptyIterator(); private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000); + // how long to wait for new records from kafka consumer inside start() + private static final Duration START_NEW_RECORDS_POLL_TIMEOUT = Duration.standardSeconds(5); // how long to wait for new records from kafka consumer inside advance() private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10); @@ -891,12 +893,12 @@ private void consumerPollLoop() { LOG.info("{}: Returning from consumer pool loop", this); } - private void nextBatch() { + private void nextBatch(Duration timeout) { curBatch = Collections.emptyIterator(); ConsumerRecords records; try { - records = availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(), + records = availableRecordsQueue.poll(timeout.getMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -965,6 +967,7 @@ public void run() { } }, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS); + nextBatch(START_NEW_RECORDS_POLL_TIMEOUT); return advance(); } @@ -1028,7 +1031,7 @@ public boolean advance() throws IOException { return true; } else { // -- (b) - nextBatch(); + nextBatch(NEW_RECORDS_POLL_TIMEOUT); if (!curBatch.hasNext()) { return false; From e704dec733c2645c2f39242a515a13f6d3f95dfb Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 14 Jun 2016 10:19:05 -0700 Subject: [PATCH 2/2] fixup! Wait for Elements to be fetched in KafkaIO#start --- .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 8511bde2b5df..3b64bd523ffa 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -967,6 +967,8 @@ public void run() { } }, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS); + // Wait for longer than normal when fetching a batch to improve chances a record is available + // when start() returns. nextBatch(START_NEW_RECORDS_POLL_TIMEOUT); return advance(); }