From 0e9acf0a1d037e3ee0d4916cd515d09c47b77657 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Tue, 27 Oct 2015 13:45:29 -0700 Subject: [PATCH] HOTFIX: call consumer.poll() even when no task is assigned --- .../processor/internals/StreamThread.java | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index abc5c5ddec37e..0bf51d7514f69 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -265,22 +265,29 @@ private void runLoop() { sensors.pollTimeSensor.record(endPoll - startPoll); } - // try to process one record from each task totalNumBuffered = 0; - requiresPoll = false; - for (StreamTask task : tasks.values()) { - long startProcess = time.milliseconds(); + if (!tasks.isEmpty()) { + // try to process one record from each task + requiresPoll = false; - totalNumBuffered += task.process(); - requiresPoll = requiresPoll || task.requiresPoll(); + for (StreamTask task : tasks.values()) { + long startProcess = time.milliseconds(); - sensors.processTimeSensor.record(time.milliseconds() - startProcess); + totalNumBuffered += task.process(); + requiresPoll = requiresPoll || task.requiresPoll(); + + sensors.processTimeSensor.record(time.milliseconds() - startProcess); + } + + maybePunctuate(); + maybeCommit(); + } else { + // even when no task is assigned, we must poll to get a task. + requiresPoll = true; } - maybePunctuate(); maybeClean(); - maybeCommit(); } } catch (Exception e) { throw new KafkaException(e);