From ffcf1ab941a7474f3f10b312a9c9226a8f16f881 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 8 Apr 2016 17:43:16 -0700 Subject: [PATCH 1/2] add special handling for first triggered punctuate --- .../processor/internals/PunctuationQueue.java | 2 +- .../processor/internals/PunctuationSchedule.java | 13 +++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java index d7d7eee90ef78..824e20a295f63 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java @@ -43,7 +43,7 @@ public boolean mayPunctuate(long timestamp, Punctuator punctuator) { PunctuationSchedule sched = top; pq.poll(); punctuator.punctuate(sched.node(), timestamp); - pq.add(sched.next()); + pq.add(sched.next(timestamp)); punctuated = true; top = pq.peek(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java index 758cfb001e035..98919d2716832 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java @@ -22,11 +22,11 @@ public class PunctuationSchedule extends Stamped { final long interval; public PunctuationSchedule(ProcessorNode node, long interval) { - this(node, 0, interval); + this(node, 0L, interval); } public PunctuationSchedule(ProcessorNode node, long time, long interval) { - super(node, time + interval); + super(node, time); this.interval = interval; } @@ -34,8 +34,13 @@ public ProcessorNode node() { return value; } - public PunctuationSchedule next() { - return new PunctuationSchedule(value, timestamp, interval); + public PunctuationSchedule next(long currTimestamp) { + // we need to special handle the case when it is firstly triggered (i.e. the timestamp + // is equal to the interval) by reschedule based on the currTimestamp + if (timestamp == 0L) + return new PunctuationSchedule(value, currTimestamp + interval, interval); + else + return new PunctuationSchedule(value, timestamp + interval, interval); } } From c1cfb7c62a804760b154401ba0c2094c2da91ebf Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 8 Apr 2016 17:44:18 -0700 Subject: [PATCH 2/2] modify unit test to cover this case --- .../streams/processor/internals/StreamTaskTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index dd4894744010a..6014c36ae5eb9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -226,15 +226,15 @@ public void testMaybePunctuate() throws Exception { StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null); task.addRecords(partition1, records( - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), - new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) )); task.addRecords(partition2, records( - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 15, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), - new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) )); assertTrue(task.maybePunctuate()); @@ -275,7 +275,7 @@ public void testMaybePunctuate() throws Exception { assertFalse(task.maybePunctuate()); - processor.supplier.checkAndClearPunctuateResult(10L, 20L, 30L); + processor.supplier.checkAndClearPunctuateResult(20L, 30L, 40L); task.close();