From 0ac3cd99fd57d7eb46765c0dcc53d01b7bdd335f Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Mon, 8 Jul 2019 17:45:40 -0700 Subject: [PATCH 01/13] move head = null --- .../apache/kafka/streams/processor/internals/RecordQueue.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 6f3e70b9922f..26c4216787e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -112,10 +112,11 @@ int addRawRecords(final Iterable> rawRecords) { */ public StampedRecord poll() { final StampedRecord recordToReturn = headRecord; - headRecord = null; updateHead(); + headRecord = null; + return recordToReturn; } From 5c39664976608c61b59739504c399d5770f45519 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Mon, 8 Jul 2019 18:28:36 -0700 Subject: [PATCH 02/13] track partition time --- .../kafka/streams/processor/internals/RecordQueue.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 26c4216787e1..07471b7b6be5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -47,6 +47,7 @@ public class RecordQueue { private final ArrayDeque> fifoQueue; private StampedRecord headRecord = null; + private long partitionTime = RecordQueue.UNKNOWN; private Sensor skipRecordsSensor; @@ -154,6 +155,7 @@ public long timestamp() { public void clear() { fifoQueue.clear(); headRecord = null; + partitionTime = RecordQueue.UNKNOWN; } private void updateHead() { @@ -168,7 +170,7 @@ private void updateHead() { final long timestamp; try { - timestamp = timestampExtractor.extract(deserialized, timestamp()); + timestamp = timestampExtractor.extract(deserialized, partitionTime); } catch (final StreamsException internalFatalExtractorException) { throw internalFatalExtractorException; } catch (final Exception fatalUserException) { @@ -190,6 +192,11 @@ private void updateHead() { } headRecord = new StampedRecord(deserialized, timestamp); + + // update the partition timestamp if the current head record's timestamp has exceed its value + if (timestamp > partitionTime) { + partitionTime = timestamp; + } } } } From b9f3c9966a74ab994b65ed563903d4ac01a7a483 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Mon, 8 Jul 2019 18:31:37 -0700 Subject: [PATCH 03/13] rename previous timestamp --- .../streams/examples/pageview/JsonTimestampExtractor.java | 2 +- .../streams/processor/ExtractRecordMetadataTimestamp.java | 6 +++--- .../apache/kafka/streams/processor/TimestampExtractor.java | 4 ++-- .../streams/processor/WallclockTimestampExtractor.java | 4 ++-- .../java/org/apache/kafka/streams/StreamsConfigTest.java | 2 +- .../streams/processor/internals/ProcessorTopologyTest.java | 2 +- .../java/org/apache/kafka/test/MockTimestampExtractor.java | 2 +- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java index 4f6257ac74a5..d760183a8a52 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java @@ -27,7 +27,7 @@ public class JsonTimestampExtractor implements TimestampExtractor { @Override - public long extract(final ConsumerRecord record, final long previousTimestamp) { + public long extract(final ConsumerRecord record, final long partitionTime) { if (record.value() instanceof PageViewTypedDemo.PageView) { return ((PageViewTypedDemo.PageView) record.value()).timestamp; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java index 79c8dd34e542..a958b8b0dc0c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java @@ -50,15 +50,15 @@ abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor { * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}. * * @param record a data record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the embedded metadata timestamp of the given {@link ConsumerRecord} */ @Override - public long extract(final ConsumerRecord record, final long previousTimestamp) { + public long extract(final ConsumerRecord record, final long partitionTime) { final long timestamp = record.timestamp(); if (timestamp < 0) { - return onInvalidTimestamp(record, timestamp, previousTimestamp); + return onInvalidTimestamp(record, timestamp, partitionTime); } return timestamp; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index 0780dc09e591..d2e7373a6af5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -46,8 +46,8 @@ public interface TimestampExtractor { * * * @param record a data record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the timestamp of the record */ - long extract(ConsumerRecord record, long previousTimestamp); + long extract(ConsumerRecord record, long partitionTime); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java index ad3b3bc75de1..724a0da8648e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java @@ -38,11 +38,11 @@ public class WallclockTimestampExtractor implements TimestampExtractor { * Return the current wall clock time as timestamp. * * @param record a data record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC */ @Override - public long extract(final ConsumerRecord record, final long previousTimestamp) { + public long extract(final ConsumerRecord record, final long partitionTime) { return System.currentTimeMillis(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index aa3860e8bc25..27e225597b2f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -662,7 +662,7 @@ public Deserializer deserializer() { public static class MockTimestampExtractor implements TimestampExtractor { @Override - public long extract(final ConsumerRecord record, final long previousTimestamp) { + public long extract(final ConsumerRecord record, final long partitionTime) { return 0; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 76252c1581e1..2669039ed61d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -731,7 +731,7 @@ public static class CustomTimestampExtractor implements TimestampExtractor { private static final long DEFAULT_TIMESTAMP = 1000L; @Override - public long extract(final ConsumerRecord record, final long previousTimestamp) { + public long extract(final ConsumerRecord record, final long partitionTime) { if (record.value().toString().matches(".*@[0-9]+")) { return Long.parseLong(record.value().toString().split("@")[1]); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java index 17011643797e..f437772cf372 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java +++ b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java @@ -23,7 +23,7 @@ public class MockTimestampExtractor implements TimestampExtractor { @Override - public long extract(final ConsumerRecord record, final long previousTimestamp) { + public long extract(final ConsumerRecord record, final long partitionTime) { return record.offset(); } } From ac606c5cdecb873efeef002501b150d686b039fc Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Mon, 8 Jul 2019 18:50:16 -0700 Subject: [PATCH 04/13] move method --- .../apache/kafka/streams/processor/internals/RecordQueue.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 07471b7b6be5..afd5012a099f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -113,11 +113,10 @@ int addRawRecords(final Iterable> rawRecords) { */ public StampedRecord poll() { final StampedRecord recordToReturn = headRecord; + headRecord = null; updateHead(); - headRecord = null; - return recordToReturn; } From bcd654a33ef3481a17c313894b4f4e1b78df0bb9 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Mon, 8 Jul 2019 19:22:02 -0700 Subject: [PATCH 05/13] Added test --- .../processor/internals/RecordQueueTest.java | 42 +++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index c16cb2a71376..df7b2a56d786 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.ArrayList; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; @@ -38,7 +39,6 @@ import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockSourceNode; -import org.apache.kafka.test.MockTimestampExtractor; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -53,7 +53,7 @@ public class RecordQueueTest { private final Serializer intSerializer = new IntegerSerializer(); private final Deserializer intDeserializer = new IntegerDeserializer(); - private final TimestampExtractor timestampExtractor = new MockTimestampExtractor(); + private final TimestampExtractor timestampExtractor = new RecordQueueTestTimestampExtractor(); private final String[] topics = {"topic"}; private final Sensor skippedRecordsSensor = new Metrics().sensor("skipped-records"); @@ -182,7 +182,30 @@ public void testTimeTracking() { assertEquals(4L, queue.timestamp()); } - @Test(expected = StreamsException.class) + @Test + public void testTimestampExtractorPartitionTime() { + + RecordQueueTestTimestampExtractor testTimestampExtractor = (RecordQueueTestTimestampExtractor) timestampExtractor; + + assertTrue(queue.isEmpty()); + assertEquals(0, queue.size()); + assertEquals(RecordQueue.UNKNOWN, queue.timestamp()); + + // add three 3 out-of-order records with timestamp 2, 1, 3, 4 + final List> list1 = Arrays.asList( + new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); + + queue.addRawRecords(list1); + while (queue.poll() != null) {} + + assertEquals(testTimestampExtractor.observedPartitionTimes, new ArrayList<>(Arrays.asList(RecordQueue.UNKNOWN, 2L, 2L, 3L))); + + } + + @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() { final byte[] key = Serdes.Long().serializer().serialize("foo", 1L); final List> records = Collections.singletonList( @@ -253,4 +276,17 @@ public void shouldDropOnNegativeTimestamp() { assertEquals(0, queue.size()); } + + static class RecordQueueTestTimestampExtractor implements TimestampExtractor { + private List observedPartitionTimes = new ArrayList<>(); + + public long extract(ConsumerRecord record, long partitionTime) { + observedPartitionTimes.add(partitionTime); + return record.offset(); + } + + public List observedPartitionTimes() { + return observedPartitionTimes; + } + } } From 51203155c95a899bf9afaf8fbd7739d4a83328eb Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Mon, 8 Jul 2019 19:46:17 -0700 Subject: [PATCH 06/13] fix spacing --- .../streams/processor/internals/RecordQueueTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index df7b2a56d786..6ec9c9a286ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -185,7 +185,7 @@ public void testTimeTracking() { @Test public void testTimestampExtractorPartitionTime() { - RecordQueueTestTimestampExtractor testTimestampExtractor = (RecordQueueTestTimestampExtractor) timestampExtractor; + final RecordQueueTestTimestampExtractor testTimestampExtractor = (RecordQueueTestTimestampExtractor) timestampExtractor; assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); @@ -199,13 +199,13 @@ public void testTimestampExtractorPartitionTime() { new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); queue.addRawRecords(list1); - while (queue.poll() != null) {} + while (queue.poll() != null) { + } assertEquals(testTimestampExtractor.observedPartitionTimes, new ArrayList<>(Arrays.asList(RecordQueue.UNKNOWN, 2L, 2L, 3L))); - } - @Test(expected = StreamsException.class) + @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() { final byte[] key = Serdes.Long().serializer().serialize("foo", 1L); final List> records = Collections.singletonList( @@ -278,9 +278,9 @@ public void shouldDropOnNegativeTimestamp() { } static class RecordQueueTestTimestampExtractor implements TimestampExtractor { - private List observedPartitionTimes = new ArrayList<>(); + private final List observedPartitionTimes = new ArrayList<>(); - public long extract(ConsumerRecord record, long partitionTime) { + public long extract(final ConsumerRecord record, final long partitionTime) { observedPartitionTimes.add(partitionTime); return record.offset(); } From 682ef4c1f3cabbce5030becbc6cbf4c9170e33c4 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 10 Jul 2019 13:59:54 -0700 Subject: [PATCH 07/13] Update javadoc explanations --- .../streams/processor/ExtractRecordMetadataTimestamp.java | 6 +++--- .../kafka/streams/processor/FailOnInvalidTimestamp.java | 4 ++-- .../streams/processor/LogAndSkipOnInvalidTimestamp.java | 4 ++-- .../kafka/streams/processor/TimestampExtractor.java | 2 +- .../processor/UsePreviousTimeOnInvalidTimestamp.java | 8 ++++---- .../streams/processor/WallclockTimestampExtractor.java | 2 +- .../kafka/streams/processor/internals/RecordQueue.java | 1 - .../streams/processor/internals/RecordQueueTest.java | 4 ---- 8 files changed, 13 insertions(+), 18 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java index a958b8b0dc0c..3c7428a8c235 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java @@ -50,7 +50,7 @@ abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor { * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}. * * @param record a data record - * @param partitionTime the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the embedded metadata timestamp of the given {@link ConsumerRecord} */ @Override @@ -69,10 +69,10 @@ public long extract(final ConsumerRecord record, final long part * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return a new timestamp for the record (if negative, record will not be processed but dropped silently) */ public abstract long onInvalidTimestamp(final ConsumerRecord record, final long recordTimestamp, - final long previousTimestamp); + final long partitionTime); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java index 87cb0dec0e8c..1536e4a97a6d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java @@ -54,14 +54,14 @@ public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the largest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return nothing; always raises an exception * @throws StreamsException on every invocation */ @Override public long onInvalidTimestamp(final ConsumerRecord record, final long recordTimestamp, - final long previousTimestamp) + final long partitionTime) throws StreamsException { final String message = "Input record " + record + " has invalid (negative) timestamp. " + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java index 0561e61011d8..b759e5bd4249 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java @@ -56,13 +56,13 @@ public class LogAndSkipOnInvalidTimestamp extends ExtractRecordMetadataTimestamp * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the originally extracted timestamp of the record */ @Override public long onInvalidTimestamp(final ConsumerRecord record, final long recordTimestamp, - final long previousTimestamp) { + final long partitionTime) { log.warn("Input record {} will be dropped because it has an invalid (negative) timestamp.", record); return recordTimestamp; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index d2e7373a6af5..1e6d6cd65c17 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -46,7 +46,7 @@ public interface TimestampExtractor { * * * @param record a data record - * @param partitionTime the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the timestamp of the record */ long extract(ConsumerRecord record, long partitionTime); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java index dd952ccf1201..6f4fb4d91c4f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java @@ -51,20 +51,20 @@ public class UsePreviousTimeOnInvalidTimestamp extends ExtractRecordMetadataTime * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the provided latest extracted valid timestamp as new timestamp for the record * @throws StreamsException if latest extracted valid timestamp is unknown */ @Override public long onInvalidTimestamp(final ConsumerRecord record, final long recordTimestamp, - final long previousTimestamp) + final long partitionTime) throws StreamsException { - if (previousTimestamp < 0) { + if (partitionTime < 0) { throw new StreamsException("Could not infer new timestamp for input record " + record + " because latest extracted valid timestamp is unknown."); } - return previousTimestamp; + return partitionTime; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java index 724a0da8648e..baa1cb6dbea2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java @@ -38,7 +38,7 @@ public class WallclockTimestampExtractor implements TimestampExtractor { * Return the current wall clock time as timestamp. * * @param record a data record - * @param partitionTime the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC */ @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index afd5012a099f..033c652b5fef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -192,7 +192,6 @@ private void updateHead() { headRecord = new StampedRecord(deserialized, timestamp); - // update the partition timestamp if the current head record's timestamp has exceed its value if (timestamp > partitionTime) { partitionTime = timestamp; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index 6ec9c9a286ea..b40b38d99f00 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -284,9 +284,5 @@ public long extract(final ConsumerRecord record, final long part observedPartitionTimes.add(partitionTime); return record.offset(); } - - public List observedPartitionTimes() { - return observedPartitionTimes; - } } } From d134740f201779ed68cb8749e90fdb40680787ef Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 10 Jul 2019 19:09:13 -0700 Subject: [PATCH 08/13] updated test --- .../processor/internals/RecordQueue.java | 11 ++- .../processor/internals/RecordQueueTest.java | 69 ++++++++++++++++--- 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 033c652b5fef..4fccf05ceffc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -140,7 +140,7 @@ public boolean isEmpty() { } /** - * Returns the tracked partition timestamp + * Returns the head record's timestamp * * @return timestamp */ @@ -148,6 +148,15 @@ public long timestamp() { return headRecord == null ? UNKNOWN : headRecord.timestamp; } + /** + * Returns the tracked partition time + * + * @return partition time + */ + long partitionTime() { + return partitionTime; + } + /** * Clear the fifo queue of its elements, also clear the time tracker's kept stamped elements */ diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index b40b38d99f00..6a7166e6a0d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockSourceNode; +import org.apache.kafka.test.MockTimestampExtractor; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -53,7 +54,7 @@ public class RecordQueueTest { private final Serializer intSerializer = new IntegerSerializer(); private final Deserializer intDeserializer = new IntegerDeserializer(); - private final TimestampExtractor timestampExtractor = new RecordQueueTestTimestampExtractor(); + private final TimestampExtractor timestampExtractor = new MockTimestampExtractor(); private final String[] topics = {"topic"}; private final Sensor skippedRecordsSensor = new Metrics().sensor("skipped-records"); @@ -185,8 +186,6 @@ public void testTimeTracking() { @Test public void testTimestampExtractorPartitionTime() { - final RecordQueueTestTimestampExtractor testTimestampExtractor = (RecordQueueTestTimestampExtractor) timestampExtractor; - assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); assertEquals(RecordQueue.UNKNOWN, queue.timestamp()); @@ -198,11 +197,17 @@ public void testTimestampExtractorPartitionTime() { new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); + assertEquals(queue.partitionTime(), RecordQueue.UNKNOWN); + queue.addRawRecords(list1); - while (queue.poll() != null) { - } - assertEquals(testTimestampExtractor.observedPartitionTimes, new ArrayList<>(Arrays.asList(RecordQueue.UNKNOWN, 2L, 2L, 3L))); + assertEquals(queue.partitionTime(), 2L); + + queue.poll(); + assertEquals(queue.partitionTime(), 2L); + + queue.poll(); + assertEquals(queue.partitionTime(), 3L); } @Test(expected = StreamsException.class) @@ -277,11 +282,55 @@ public void shouldDropOnNegativeTimestamp() { assertEquals(0, queue.size()); } - static class RecordQueueTestTimestampExtractor implements TimestampExtractor { - private final List observedPartitionTimes = new ArrayList<>(); + @Test + public void shouldPassPartitionTimeToTimestampExtractor() { + + final PartitionTimeTrackingTimestampExtractor timestampExtractor = new PartitionTimeTrackingTimestampExtractor(); + final RecordQueue queue = new RecordQueue( + new TopicPartition(topics[0], 1), + mockSourceNodeWithMetrics, + timestampExtractor, + new LogAndFailExceptionHandler(), + context, + new LogContext()); + + assertTrue(queue.isEmpty()); + assertEquals(0, queue.size()); + assertEquals(RecordQueue.UNKNOWN, queue.timestamp()); + + // add three 3 out-of-order records with timestamp 2, 1, 3, 4 + final List> list1 = Arrays.asList( + new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); + + assertEquals(RecordQueue.UNKNOWN, timestampExtractor.partitionTime); + + queue.addRawRecords(list1); + + // no (known) timestamp has yet been passed to the timestamp extractor + assertEquals(RecordQueue.UNKNOWN, timestampExtractor.partitionTime); + + queue.poll(); + assertEquals(2L, timestampExtractor.partitionTime); + + queue.poll(); + assertEquals(2L, timestampExtractor.partitionTime); + + queue.poll(); + assertEquals(3L, timestampExtractor.partitionTime); + + } + + class PartitionTimeTrackingTimestampExtractor implements TimestampExtractor { + private long partitionTime = RecordQueue.UNKNOWN; - public long extract(final ConsumerRecord record, final long partitionTime) { - observedPartitionTimes.add(partitionTime); + public long extract(ConsumerRecord record, long partitionTime) { + if (partitionTime < this.partitionTime) { + throw new IllegalStateException("Partition time should not decrease"); + } + this.partitionTime = partitionTime; return record.offset(); } } From 91d9d2f8b5f095b930fb0fb41f5448a89ae123f5 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 10 Jul 2019 19:18:24 -0700 Subject: [PATCH 09/13] RecordQueue cleanup --- .../processor/internals/PartitionGroup.java | 2 +- .../processor/internals/RecordQueue.java | 10 +++--- .../processor/internals/RecordQueueTest.java | 31 +++++++++---------- 3 files changed, 20 insertions(+), 23 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index fbafa73aca2f..e44449bb4aae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -76,7 +76,7 @@ RecordQueue queue() { } PartitionGroup(final Map partitionQueues, final Sensor recordLatenessSensor) { - nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp)); + nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp)); this.partitionQueues = partitionQueues; this.recordLatenessSensor = recordLatenessSensor; totalBuffered = 0; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 4fccf05ceffc..fff7cb8b83a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -31,8 +31,8 @@ /** * RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the - * partition timestamp defined as the minimum timestamp of records in its queue; in addition, its partition - * timestamp is monotonically increasing such that once it is advanced, it will not be decremented. + * partition timestamp defined as the largest timestamp seen on the partition so far; this is passed to the + * timestamp extractor but not otherwise used. */ public class RecordQueue { @@ -144,7 +144,7 @@ public boolean isEmpty() { * * @return timestamp */ - public long timestamp() { + public long headRecordTimestamp() { return headRecord == null ? UNKNOWN : headRecord.timestamp; } @@ -201,9 +201,7 @@ private void updateHead() { headRecord = new StampedRecord(deserialized, timestamp); - if (timestamp > partitionTime) { - partitionTime = timestamp; - } + partitionTime = Math.max(partitionTime, timestamp); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index 6a7166e6a0d3..0622bfcf7a73 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.ArrayList; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; @@ -102,7 +101,7 @@ public void testTimeTracking() { assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(RecordQueue.UNKNOWN, queue.timestamp()); + assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); // add three 3 out-of-order records with timestamp 2, 1, 3 final List> list1 = Arrays.asList( @@ -113,17 +112,17 @@ public void testTimeTracking() { queue.addRawRecords(list1); assertEquals(3, queue.size()); - assertEquals(2L, queue.timestamp()); + assertEquals(2L, queue.headRecordTimestamp()); // poll the first record, now with 1, 3 assertEquals(2L, queue.poll().timestamp); assertEquals(2, queue.size()); - assertEquals(1L, queue.timestamp()); + assertEquals(1L, queue.headRecordTimestamp()); // poll the second record, now with 3 assertEquals(1L, queue.poll().timestamp); assertEquals(1, queue.size()); - assertEquals(3L, queue.timestamp()); + assertEquals(3L, queue.headRecordTimestamp()); // add three 3 out-of-order records with timestamp 4, 1, 2 // now with 3, 4, 1, 2 @@ -135,24 +134,24 @@ public void testTimeTracking() { queue.addRawRecords(list2); assertEquals(4, queue.size()); - assertEquals(3L, queue.timestamp()); + assertEquals(3L, queue.headRecordTimestamp()); // poll the third record, now with 4, 1, 2 assertEquals(3L, queue.poll().timestamp); assertEquals(3, queue.size()); - assertEquals(4L, queue.timestamp()); + assertEquals(4L, queue.headRecordTimestamp()); // poll the rest records assertEquals(4L, queue.poll().timestamp); - assertEquals(1L, queue.timestamp()); + assertEquals(1L, queue.headRecordTimestamp()); assertEquals(1L, queue.poll().timestamp); - assertEquals(2L, queue.timestamp()); + assertEquals(2L, queue.headRecordTimestamp()); assertEquals(2L, queue.poll().timestamp); assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(RecordQueue.UNKNOWN, queue.timestamp()); + assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); // add three more records with 4, 5, 6 final List> list3 = Arrays.asList( @@ -163,24 +162,24 @@ public void testTimeTracking() { queue.addRawRecords(list3); assertEquals(3, queue.size()); - assertEquals(4L, queue.timestamp()); + assertEquals(4L, queue.headRecordTimestamp()); // poll one record again, the timestamp should advance now assertEquals(4L, queue.poll().timestamp); assertEquals(2, queue.size()); - assertEquals(5L, queue.timestamp()); + assertEquals(5L, queue.headRecordTimestamp()); // clear the queue queue.clear(); assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(RecordQueue.UNKNOWN, queue.timestamp()); + assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); // re-insert the three records with 4, 5, 6 queue.addRawRecords(list3); assertEquals(3, queue.size()); - assertEquals(4L, queue.timestamp()); + assertEquals(4L, queue.headRecordTimestamp()); } @Test @@ -188,7 +187,7 @@ public void testTimestampExtractorPartitionTime() { assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(RecordQueue.UNKNOWN, queue.timestamp()); + assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); // add three 3 out-of-order records with timestamp 2, 1, 3, 4 final List> list1 = Arrays.asList( @@ -296,7 +295,7 @@ public void shouldPassPartitionTimeToTimestampExtractor() { assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); - assertEquals(RecordQueue.UNKNOWN, queue.timestamp()); + assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp()); // add three 3 out-of-order records with timestamp 2, 1, 3, 4 final List> list1 = Arrays.asList( From bfa788f6a87ab39ecc2a247a75a3951389bc385f Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 10 Jul 2019 20:29:39 -0700 Subject: [PATCH 10/13] PartitionGroup cleanup --- .../streams/processor/internals/PartitionGroup.java | 10 +++++----- .../kafka/streams/processor/internals/StreamTask.java | 2 +- .../processor/internals/PartitionGroupTest.java | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index e44449bb4aae..537d39d33782 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -41,8 +41,8 @@ * * PartitionGroup also maintains a stream-time for the group as a whole. * This is defined as the highest timestamp of any record yet polled from the PartitionGroup. - * The PartitionGroup's stream-time is also the stream-time of its task and is used as the - * stream-time for any computations that require it. + * Note however that any computation that depends on stream time tracks it on a per-operator basis to obtain an + * accurate view of the local stream time as seen by that node. * * The PartitionGroups's stream-time is initially UNKNOWN (-1), and it set to a known value upon first poll. * As a consequence of the definition, the PartitionGroup's stream-time is non-decreasing @@ -157,10 +157,9 @@ public Set partitions() { } /** - * Return the timestamp of this partition group as the smallest - * partition timestamp among all its partitions + * Return the stream time of this partition group defined as the largest timestamp seen across all partitions */ - public long timestamp() { + public long streamTime() { return streamTime; } @@ -192,6 +191,7 @@ public void close() { public void clear() { nonEmptyQueuesByTime.clear(); + streamTime = RecordQueue.UNKNOWN; for (final RecordQueue queue : partitionQueues.values()) { queue.clear(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 4fd57ba3db89..c9af28f22030 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -801,7 +801,7 @@ int numBuffered() { * @throws TaskMigratedException if the task producer got fenced (EOS only) */ public boolean maybePunctuateStreamTime() { - final long timestamp = partitionGroup.timestamp(); + final long timestamp = partitionGroup.streamTime(); // if the timestamp is not known yet, meaning there is not enough data accumulated // to reason stream partition time, then skip. diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 6b95bdf9f0a2..cfc814f2e42c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -107,7 +107,7 @@ public void testTimeTracking() { // st: -1 since no records was being processed yet verifyBuffered(6, 3, 3); - assertEquals(-1L, group.timestamp()); + assertEquals(-1L, group.streamTime()); assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); StampedRecord record; @@ -143,7 +143,7 @@ record = group.nextRecord(info); // 2:[4, 6] // st: 2 (just adding records shouldn't change it) verifyBuffered(6, 4, 2); - assertEquals(2L, group.timestamp()); + assertEquals(2L, group.streamTime()); assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); // get one record, time should be advanced @@ -221,7 +221,7 @@ public void shouldChooseNextRecordBasedOnHeadTimestamp() { group.addRawRecords(partition1, list1); verifyBuffered(3, 3, 0); - assertEquals(-1L, group.timestamp()); + assertEquals(-1L, group.streamTime()); assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue()); StampedRecord record; @@ -258,7 +258,7 @@ record = group.nextRecord(info); private void verifyTimes(final StampedRecord record, final long recordTime, final long streamTime) { assertEquals(recordTime, record.timestamp); - assertEquals(streamTime, group.timestamp()); + assertEquals(streamTime, group.streamTime()); } private void verifyBuffered(final int totalBuffered, final int partitionOneBuffered, final int partitionTwoBuffered) { From 5cd7f22ebe715cabf506605aa9e39a0f5c7d7050 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Thu, 11 Jul 2019 11:48:37 -0700 Subject: [PATCH 11/13] github comments --- .../kafka/streams/processor/FailOnInvalidTimestamp.java | 2 +- .../streams/processor/UsePreviousTimeOnInvalidTimestamp.java | 2 +- .../kafka/streams/processor/internals/PartitionGroup.java | 4 ++-- .../apache/kafka/streams/processor/internals/RecordQueue.java | 2 +- .../kafka/streams/processor/internals/RecordQueueTest.java | 4 ++-- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java index 1536e4a97a6d..40d3e0ea5955 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java @@ -54,7 +54,7 @@ public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param partitionTime the largest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) + * @param partitionTime the highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return nothing; always raises an exception * @throws StreamsException on every invocation */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java index 6f4fb4d91c4f..89e2fd3729bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java @@ -62,7 +62,7 @@ public long onInvalidTimestamp(final ConsumerRecord record, throws StreamsException { if (partitionTime < 0) { throw new StreamsException("Could not infer new timestamp for input record " + record - + " because latest extracted valid timestamp is unknown."); + + " because partition time is unknown."); } return partitionTime; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 537d39d33782..2844af0106a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -41,8 +41,8 @@ * * PartitionGroup also maintains a stream-time for the group as a whole. * This is defined as the highest timestamp of any record yet polled from the PartitionGroup. - * Note however that any computation that depends on stream time tracks it on a per-operator basis to obtain an - * accurate view of the local stream time as seen by that node. + * Note however that any computation that depends on stream time should track it on a per-operator basis to obtain an + * accurate view of the local time as seen by that processor. * * The PartitionGroups's stream-time is initially UNKNOWN (-1), and it set to a known value upon first poll. * As a consequence of the definition, the PartitionGroup's stream-time is non-decreasing diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index fff7cb8b83a3..de1d9a26bb35 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -32,7 +32,7 @@ /** * RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + timestamp). It also keeps track of the * partition timestamp defined as the largest timestamp seen on the partition so far; this is passed to the - * timestamp extractor but not otherwise used. + * timestamp extractor. */ public class RecordQueue { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index 0622bfcf7a73..6dadb49b8ab8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -183,7 +183,7 @@ public void testTimeTracking() { } @Test - public void testTimestampExtractorPartitionTime() { + public void shouldTrackPartitionTimeAsMaxSeenTimestamp() { assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); @@ -325,7 +325,7 @@ public void shouldPassPartitionTimeToTimestampExtractor() { class PartitionTimeTrackingTimestampExtractor implements TimestampExtractor { private long partitionTime = RecordQueue.UNKNOWN; - public long extract(ConsumerRecord record, long partitionTime) { + public long extract(final ConsumerRecord record, final long partitionTime) { if (partitionTime < this.partitionTime) { throw new IllegalStateException("Partition time should not decrease"); } From 7c66617368e7e3e10571d7e32b00a83b1ef1750a Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Thu, 11 Jul 2019 12:08:42 -0700 Subject: [PATCH 12/13] stream time -> stream-time --- .../streams/processor/internals/PartitionGroup.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 2844af0106a0..83b367346471 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -41,7 +41,7 @@ * * PartitionGroup also maintains a stream-time for the group as a whole. * This is defined as the highest timestamp of any record yet polled from the PartitionGroup. - * Note however that any computation that depends on stream time should track it on a per-operator basis to obtain an + * Note however that any computation that depends on stream-time should track it on a per-operator basis to obtain an * accurate view of the local time as seen by that processor. * * The PartitionGroups's stream-time is initially UNKNOWN (-1), and it set to a known value upon first poll. @@ -109,7 +109,7 @@ record = queue.poll(); nonEmptyQueuesByTime.offer(queue); } - // always update the stream time to the record's timestamp yet to be processed if it is larger + // always update the stream-time to the record's timestamp yet to be processed if it is larger if (record.timestamp > streamTime) { streamTime = record.timestamp; recordLatenessSensor.record(0); @@ -140,8 +140,8 @@ int addRawRecords(final TopicPartition partition, final Iterable partitions() { } /** - * Return the stream time of this partition group defined as the largest timestamp seen across all partitions + * Return the stream-time of this partition group defined as the largest timestamp seen across all partitions */ public long streamTime() { return streamTime; From 22f6f09267c53eef83b87ffb379358ff1d866de5 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Tue, 16 Jul 2019 14:18:54 -0700 Subject: [PATCH 13/13] Change names in maybePunctuateXXTime --- .../kafka/streams/processor/internals/StreamTask.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index c9af28f22030..1d60fe4780de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -801,14 +801,14 @@ int numBuffered() { * @throws TaskMigratedException if the task producer got fenced (EOS only) */ public boolean maybePunctuateStreamTime() { - final long timestamp = partitionGroup.streamTime(); + final long streamTime = partitionGroup.streamTime(); // if the timestamp is not known yet, meaning there is not enough data accumulated // to reason stream partition time, then skip. - if (timestamp == RecordQueue.UNKNOWN) { + if (streamTime == RecordQueue.UNKNOWN) { return false; } else { - final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, this); + final boolean punctuated = streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this); if (punctuated) { commitNeeded = true; @@ -826,9 +826,9 @@ public boolean maybePunctuateStreamTime() { * @throws TaskMigratedException if the task producer got fenced (EOS only) */ public boolean maybePunctuateSystemTime() { - final long timestamp = time.milliseconds(); + final long systemTime = time.milliseconds(); - final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this); + final boolean punctuated = systemTimePunctuationQueue.mayPunctuate(systemTime, PunctuationType.WALL_CLOCK_TIME, this); if (punctuated) { commitNeeded = true;