From 23e08d18585febdf34917e1d33f4c142ae36e748 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 8 Nov 2016 18:01:56 -0800 Subject: [PATCH 1/8] KAFKA-4393: Improve invalid/negative TS handling --- .../ConsumerRecordTimestampExtractor.java | 31 +++++++-- ...obustConsumerRecordTimestampExtractor.java | 65 +++++++++++++++++++ .../streams/processor/TimestampExtractor.java | 5 +- .../WallclockTimestampExtractor.java | 11 ++-- .../processor/internals/RecordQueue.java | 5 +- .../streams/processor/internals/SinkNode.java | 5 +- .../processor/internals/RecordQueueTest.java | 22 +++++++ 7 files changed, 128 insertions(+), 16 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/RobustConsumerRecordTimestampExtractor.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java index 0d3424e6ff23..e08ded666cea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java @@ -18,23 +18,46 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.errors.StreamsException; /** * Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps to Kafka message). - * + *

* Here, "built-in" refers to the fact that compatible Kafka producer clients automatically and * transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved * via this timestamp extractor. - * + *

* If CreateTime is used to define the built-in timestamps, using this extractor effectively provide * event-time semantics. If LogAppendTime is used to define the built-in timestamps, using * this extractor effectively provides ingestion-time semantics. - * + *

* If you need processing-time semantics, use {@link WallclockTimestampExtractor}. + *

+ * If a record has a negative timestamp value, this extractor raises an exception. + * + * @see RobustConsumerRecordTimestampExtractor + * @see WallclockTimestampExtractor */ public class ConsumerRecordTimestampExtractor implements TimestampExtractor { + + /** + * Extracts the embedded meta data timestamp from the given {@link ConsumerRecord}. + * + * @param record a data record + * @return the embedded meta-data timestamp of the given {@link ConsumerRecord} + * @throws StreamsException if the embedded meta-data timestamp is negative + */ @Override public long extract(ConsumerRecord record) { - return record.timestamp(); + final long timestamp = record.timestamp(); + + if (timestamp < 0) { + throw new StreamsException("Input record " + record + " has invalid (negative) timestamp. " + + "Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " + + "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + + "Use a different TimestampExtractor to process this data."); + } + + return timestamp; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/RobustConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/RobustConsumerRecordTimestampExtractor.java new file mode 100644 index 000000000000..2f6d1876afc3 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/RobustConsumerRecordTimestampExtractor.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.errors.StreamsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps to Kafka message). + *

+ * Here, "built-in" refers to the fact that compatible Kafka producer clients automatically and + * transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved + * via this timestamp extractor. + *

+ * If CreateTime is used to define the built-in timestamps, using this extractor effectively provide + * event-time semantics. If LogAppendTime is used to define the built-in timestamps, using + * this extractor effectively provides ingestion-time semantics. + *

+ * If you need processing-time semantics, use {@link WallclockTimestampExtractor}. + *

+ * If a record has a negative timestamp value, a log message is written and the timestamp is returned. + * This results in dropping the record, i.e., the record will not be processed. + * + * @see ConsumerRecordTimestampExtractor + * @see WallclockTimestampExtractor + */ +public class RobustConsumerRecordTimestampExtractor implements TimestampExtractor { + private static final Logger log = LoggerFactory.getLogger(RobustConsumerRecordTimestampExtractor.class); + + /** + * Extracts the embedded meta data timestamp from the given {@link ConsumerRecord} + * and writes a log warn message if the extracted timestamp is negative, + * because the record will no be processed but dropped. + * + * @param record a data record + * @return the embedded meta-data timestamp of the given {@link ConsumerRecord} + */ + @Override + public long extract(ConsumerRecord record) { + final long timestamp = record.timestamp(); + + if (timestamp < 0) { + log.warn("Dropping input record {} because it has an invalid (negative) timestamp.", record); + } + + return timestamp; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index c55518b1b918..ffa08fb7bc7c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -27,10 +27,11 @@ public interface TimestampExtractor { /** - * Extracts a timestamp from a record. + * Extracts a timestamp from a record. The timestamp must be positive. If a negative timestamp is returned, + * the record will not be processed but dropped silently. *

* The extracted timestamp MUST represent the milliseconds since midnight, January 1, 1970 UTC. - * + *

* It is important to note that this timestamp may become the message timestamp for any messages sent to changelogs updated by {@link KTable}s * and joins. The message timestamp is used for log retention and log rolling, so using nonsensical values may result in * excessive log rolling and therefore broker performance degradation. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java index 305573b1746b..3b26f6582e2f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java @@ -21,19 +21,22 @@ /** * Retrieves current wall clock timestamps as {@link System#currentTimeMillis()}. - * + *

* Using this extractor effectively provides processing-time semantics. - * + *

* If you need event-time semantics, use {@link ConsumerRecordTimestampExtractor} with * built-in CreateTime or LogAppendTime timestamp (see KIP-32: Add timestamps to Kafka message for details). + * + * @see ConsumerRecordTimestampExtractor + * @see RobustConsumerRecordTimestampExtractor */ public class WallclockTimestampExtractor implements TimestampExtractor { /** * Return the current wall clock time as timestamp. * - * @param record a data record - * @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC + * @param record a data record + * @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC */ @Override public long extract(ConsumerRecord record) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 44ef1460784c..1babd15daf0f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -107,8 +107,9 @@ public int addRawRecords(Iterable> rawRecords, Ti log.trace("Source node {} extracted timestamp {} for record {} when adding to buffered queue", source.name(), timestamp, record); // validate that timestamp must be non-negative - if (timestamp < 0) - throw new StreamsException("Extracted timestamp value is negative, which is not allowed."); + if (timestamp < 0) { + continue; + } StampedRecord stampedRecord = new StampedRecord(record, timestamp); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 8ac373ba2917..e71cc7aea2b0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -73,10 +73,7 @@ public void process(final K key, final V value) { final long timestamp = context.timestamp(); if (timestamp < 0) { - throw new StreamsException("A record consumed from an input topic has invalid (negative) timestamp, " + - "possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " + - "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + - "Use a different TimestampExtractor to process this data."); + throw new StreamsException("Invalid (negative) timestamp of " + timestamp + " for output record <" + key + ":" + value + ">."); } collector.send(new ProducerRecord(topic, null, timestamp, key, value), keySerializer, valSerializer, partitioner); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index f30e0e693c03..61b6b76d213e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -29,6 +29,8 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor; +import org.apache.kafka.streams.processor.RobustConsumerRecordTimestampExtractor; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockTimestampExtractor; @@ -136,4 +138,24 @@ public void shouldThrowStreamsExceptionWhenValueDeserializationFails() throws Ex queue.addRawRecords(records, timestampExtractor); } + + @Test(expected = StreamsException.class) + public void shouldThrowOnNegativeTimestamp() { + final byte[] value = Serdes.Long().serializer().serialize("foo", 1L); + final List> records = Collections.singletonList( + new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); + + queue.addRawRecords(records, new ConsumerRecordTimestampExtractor()); + } + + @Test + public void shouldDropOnNegativeTimestamp() { + final byte[] value = Serdes.Long().serializer().serialize("foo", 1L); + final List> records = Collections.singletonList( + new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); + + queue.addRawRecords(records, new RobustConsumerRecordTimestampExtractor()); + + assertEquals(0, queue.size()); + } } From 9a732d5510337658ab756d261727197ba6452c8c Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 9 Nov 2016 11:38:55 -0800 Subject: [PATCH 2/8] addressed github comments - added new parameter "Streams time" to extract() - added third timestamp extractor --- .../ConsumerRecordTimestampExtractor.java | 29 +++++--- ...rringConsumerRecordTimestampExtractor.java | 74 +++++++++++++++++++ ...obustConsumerRecordTimestampExtractor.java | 32 +++++--- .../streams/processor/TimestampExtractor.java | 5 +- .../processor/internals/RecordQueue.java | 2 +- 5 files changed, 116 insertions(+), 26 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/InferringConsumerRecordTimestampExtractor.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java index e08ded666cea..57a45f3bdb79 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java @@ -21,34 +21,40 @@ import org.apache.kafka.streams.errors.StreamsException; /** - * Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps to Kafka message). + * Retrieves embedded metadata timestamps from Kafka messages. + * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new + * 0.10+ Kafka message format. *

- * Here, "built-in" refers to the fact that compatible Kafka producer clients automatically and - * transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved + * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and + * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved * via this timestamp extractor. *

- * If CreateTime is used to define the built-in timestamps, using this extractor effectively provide - * event-time semantics. If LogAppendTime is used to define the built-in timestamps, using - * this extractor effectively provides ingestion-time semantics. + * If the embedded metadata timestamp represents CreateTime (cf. Kafka broker setting + * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}), + * this extractor effectively provides event-time semantics. + * If LogAppendTime is used as broker/topic setting to define the embedded metadata timestamps, + * using this extractor effectively provides ingestion-time semantics. *

* If you need processing-time semantics, use {@link WallclockTimestampExtractor}. *

- * If a record has a negative timestamp value, this extractor raises an exception. + * If a record has a negative (invalid) timestamp value, this extractor raises an exception. * * @see RobustConsumerRecordTimestampExtractor + * @see InferringConsumerRecordTimestampExtractor * @see WallclockTimestampExtractor */ public class ConsumerRecordTimestampExtractor implements TimestampExtractor { /** - * Extracts the embedded meta data timestamp from the given {@link ConsumerRecord}. + * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}. * * @param record a data record - * @return the embedded meta-data timestamp of the given {@link ConsumerRecord} - * @throws StreamsException if the embedded meta-data timestamp is negative + * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) + * @return the embedded metadata timestamp of the given {@link ConsumerRecord} + * @throws StreamsException if the embedded metadata timestamp is negative */ @Override - public long extract(ConsumerRecord record) { + public long extract(final ConsumerRecord record, final long currentStreamsTime) { final long timestamp = record.timestamp(); if (timestamp < 0) { @@ -60,4 +66,5 @@ public long extract(ConsumerRecord record) { return timestamp; } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/InferringConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/InferringConsumerRecordTimestampExtractor.java new file mode 100644 index 000000000000..cfaaef23ef5a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/InferringConsumerRecordTimestampExtractor.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.errors.StreamsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Retrieves embedded metadata timestamps from Kafka messages. + * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new + * 0.10+ Kafka message format. + *

+ * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and + * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved + * via this timestamp extractor. + *

+ * If the embedded metadata timestamp represents CreateTime (cf. Kafka broker setting + * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}), + * this extractor effectively provides event-time semantics. + * If LogAppendTime is used as broker/topic setting to define the embedded metadata timestamps, + * using this extractor effectively provides ingestion-time semantics. + *

+ * If you need processing-time semantics, use {@link WallclockTimestampExtractor}. + *

+ * If a record has a negative (invalid) timestamp value a new timestamp will be inferred as the current + * Streams time. + * + * @see ConsumerRecordTimestampExtractor + * @see RobustConsumerRecordTimestampExtractor + * @see WallclockTimestampExtractor + */ +public class InferringConsumerRecordTimestampExtractor implements TimestampExtractor { + + /** + * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}. + * If no valid timestamp can be extracted, a new timestamp will be inferred as the current Streams time. + * + * @param record a data record + * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) + * @return the embedded metadata timestamp of the given {@link ConsumerRecord} or current Streams time + */ + @Override + public long extract(final ConsumerRecord record, final long currentStreamsTime) { + final long timestamp = record.timestamp(); + + if (timestamp < 0) { + if (currentStreamsTime == -1) { + throw new StreamsException("Could not infer new timestamp for input record " + record + + " because current internal Streams time in unknown"); + } + return currentStreamsTime; + } + + return timestamp; + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/RobustConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/RobustConsumerRecordTimestampExtractor.java index 2f6d1876afc3..670addf537be 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/RobustConsumerRecordTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/RobustConsumerRecordTimestampExtractor.java @@ -23,43 +23,51 @@ import org.slf4j.LoggerFactory; /** - * Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps to Kafka message). + * Retrieves embedded metadata timestamps from Kafka messages. + * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new + * 0.10+ Kafka message format. *

- * Here, "built-in" refers to the fact that compatible Kafka producer clients automatically and - * transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved + * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and + * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved * via this timestamp extractor. *

- * If CreateTime is used to define the built-in timestamps, using this extractor effectively provide - * event-time semantics. If LogAppendTime is used to define the built-in timestamps, using - * this extractor effectively provides ingestion-time semantics. + * If the embedded metadata timestamp represents CreateTime (cf. Kafka broker setting + * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}), + * this extractor effectively provides event-time semantics. + * If LogAppendTime is used as broker/topic setting to define the embedded metadata timestamps, + * using this extractor effectively provides ingestion-time semantics. *

* If you need processing-time semantics, use {@link WallclockTimestampExtractor}. *

- * If a record has a negative timestamp value, a log message is written and the timestamp is returned. - * This results in dropping the record, i.e., the record will not be processed. + * If a record has a negative (invalid) timestamp value the timestamp is returned as-is; + * in addition, a WARN message is logged in your application. + * Returning the timestamp as-is results in dropping the record, i.e., the record will not be processed. * * @see ConsumerRecordTimestampExtractor + * @see InferringConsumerRecordTimestampExtractor * @see WallclockTimestampExtractor */ public class RobustConsumerRecordTimestampExtractor implements TimestampExtractor { private static final Logger log = LoggerFactory.getLogger(RobustConsumerRecordTimestampExtractor.class); /** - * Extracts the embedded meta data timestamp from the given {@link ConsumerRecord} + * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord} * and writes a log warn message if the extracted timestamp is negative, * because the record will no be processed but dropped. * * @param record a data record - * @return the embedded meta-data timestamp of the given {@link ConsumerRecord} + * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) + * @return the embedded metadata timestamp of the given {@link ConsumerRecord} */ @Override - public long extract(ConsumerRecord record) { + public long extract(final ConsumerRecord record, final long currentStreamsTime) { final long timestamp = record.timestamp(); if (timestamp < 0) { - log.warn("Dropping input record {} because it has an invalid (negative) timestamp.", record); + log.warn("Input record {} will be dropped because it has an invalid (negative) timestamp.", record); } return timestamp; } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index ffa08fb7bc7c..4f333f93c9fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -37,8 +37,9 @@ public interface TimestampExtractor { * excessive log rolling and therefore broker performance degradation. * * - * @param record a data record + * @param record a data record + * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) * @return the timestamp of the record */ - long extract(ConsumerRecord record); + long extract(ConsumerRecord record, long currentStreamsTime); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 1babd15daf0f..e05011b998d4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -106,7 +106,7 @@ public int addRawRecords(Iterable> rawRecords, Ti log.trace("Source node {} extracted timestamp {} for record {} when adding to buffered queue", source.name(), timestamp, record); - // validate that timestamp must be non-negative + // drop message if TS is invalid, ie, negative if (timestamp < 0) { continue; } From 0090d56b7c3fa861ab32ff4dba2448b8297d5dbe Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 10 Nov 2016 15:59:41 -0800 Subject: [PATCH 3/8] addressed Github comments --- .../pageview/JsonTimestampExtractor.java | 2 +- .../apache/kafka/streams/StreamsConfig.java | 4 +- ...ractConsumerRecordTimestampExtractor.java} | 27 +++++--- ...ilingConsumerRecordTimestampExtractor.java | 68 +++++++++++++++++++ ...rringConsumerRecordTimestampExtractor.java | 40 +++++------ ...alidConsumerRecordTimestampExtractor.java} | 35 ++++------ .../streams/processor/TimestampExtractor.java | 2 +- .../WallclockTimestampExtractor.java | 9 +-- .../processor/internals/RecordQueue.java | 4 +- .../internals/ProcessorTopologyTest.java | 10 +-- .../processor/internals/RecordQueueTest.java | 8 +-- .../smoketest/TestTimestampExtractor.java | 2 +- .../kafka/test/MockTimestampExtractor.java | 2 +- 13 files changed, 140 insertions(+), 73 deletions(-) rename streams/src/main/java/org/apache/kafka/streams/processor/{ConsumerRecordTimestampExtractor.java => AbstractConsumerRecordTimestampExtractor.java} (72%) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/FailingConsumerRecordTimestampExtractor.java rename streams/src/main/java/org/apache/kafka/streams/processor/{RobustConsumerRecordTimestampExtractor.java => SkipInvalidConsumerRecordTimestampExtractor.java} (72%) diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java index 63e8377d6520..b31b0fd004e1 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java @@ -28,7 +28,7 @@ public class JsonTimestampExtractor implements TimestampExtractor { @Override - public long extract(ConsumerRecord record) { + public long extract(final ConsumerRecord record, final long currentStreamsTime) { if (record.value() instanceof PageViewTypedDemo.PageView) { return ((PageViewTypedDemo.PageView) record.value()).timestamp; } diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 5ba438376f62..82e61baa33e9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -28,7 +28,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor; +import org.apache.kafka.streams.processor.FailingConsumerRecordTimestampExtractor; import org.apache.kafka.streams.processor.DefaultPartitionGrouper; import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; import org.apache.kafka.streams.processor.internals.StreamThread; @@ -172,7 +172,7 @@ public class StreamsConfig extends AbstractConfig { REPLICATION_FACTOR_DOC) .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, Type.CLASS, - ConsumerRecordTimestampExtractor.class.getName(), + FailingConsumerRecordTimestampExtractor.class.getName(), Importance.MEDIUM, TIMESTAMP_EXTRACTOR_CLASS_DOC) .define(PARTITION_GROUPER_CLASS_CONFIG, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractConsumerRecordTimestampExtractor.java similarity index 72% rename from streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java rename to streams/src/main/java/org/apache/kafka/streams/processor/AbstractConsumerRecordTimestampExtractor.java index 57a45f3bdb79..2ff363b8e20f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractConsumerRecordTimestampExtractor.java @@ -18,10 +18,11 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.streams.errors.StreamsException; /** * Retrieves embedded metadata timestamps from Kafka messages. + * If a record has a negative (invalid) timestamp value, an error handler method is called. + *

* Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new * 0.10+ Kafka message format. *

@@ -36,14 +37,13 @@ * using this extractor effectively provides ingestion-time semantics. *

* If you need processing-time semantics, use {@link WallclockTimestampExtractor}. - *

- * If a record has a negative (invalid) timestamp value, this extractor raises an exception. * - * @see RobustConsumerRecordTimestampExtractor + * @see FailingConsumerRecordTimestampExtractor + * @see SkipInvalidConsumerRecordTimestampExtractor * @see InferringConsumerRecordTimestampExtractor * @see WallclockTimestampExtractor */ -public class ConsumerRecordTimestampExtractor implements TimestampExtractor { +public abstract class AbstractConsumerRecordTimestampExtractor implements TimestampExtractor { /** * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}. @@ -51,20 +51,27 @@ public class ConsumerRecordTimestampExtractor implements TimestampExtractor { * @param record a data record * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) * @return the embedded metadata timestamp of the given {@link ConsumerRecord} - * @throws StreamsException if the embedded metadata timestamp is negative */ @Override public long extract(final ConsumerRecord record, final long currentStreamsTime) { final long timestamp = record.timestamp(); if (timestamp < 0) { - throw new StreamsException("Input record " + record + " has invalid (negative) timestamp. " + - "Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " + - "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + - "Use a different TimestampExtractor to process this data."); + return onInvalidTimestamp(record, timestamp, currentStreamsTime); } return timestamp; } + /** + * Called if no valid timestamp is embedded in the record meta data. + * + * @param record a data record + * @param recordTimestamp the timestamp extractor from the record + * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) + * @return a new timestamp for the record + */ + public abstract long onInvalidTimestamp(final ConsumerRecord record, + final long recordTimestamp, + final long currentStreamsTime); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/FailingConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/FailingConsumerRecordTimestampExtractor.java new file mode 100644 index 000000000000..22e0abf73a4a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/FailingConsumerRecordTimestampExtractor.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.errors.StreamsException; + +/** + * Retrieves embedded metadata timestamps from Kafka messages. + * If a record has a negative (invalid) timestamp value, this extractor raises an exception. + *

+ * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new + * 0.10+ Kafka message format. + *

+ * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and + * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved + * via this timestamp extractor. + *

+ * If the embedded metadata timestamp represents CreateTime (cf. Kafka broker setting + * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}), + * this extractor effectively provides event-time semantics. + * If LogAppendTime is used as broker/topic setting to define the embedded metadata timestamps, + * using this extractor effectively provides ingestion-time semantics. + *

+ * If you need processing-time semantics, use {@link WallclockTimestampExtractor}. + * + * @see SkipInvalidConsumerRecordTimestampExtractor + * @see InferringConsumerRecordTimestampExtractor + * @see WallclockTimestampExtractor + */ +public class FailingConsumerRecordTimestampExtractor extends AbstractConsumerRecordTimestampExtractor { + + /** + * Raised an exception. + * + * @param record a data record + * @param recordTimestamp the timestamp extractor from the record + * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) + * @return nothing; always raises an exception + * @throws StreamsException on every invocation + */ + @Override + public long onInvalidTimestamp(final ConsumerRecord record, + final long recordTimestamp, + final long currentStreamsTime) + throws StreamsException { + throw new StreamsException("Input record " + record + " has invalid (negative) timestamp. " + + "Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " + + "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + + "Use a different TimestampExtractor to process this data."); + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/InferringConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/InferringConsumerRecordTimestampExtractor.java index cfaaef23ef5a..32006cdd59d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/InferringConsumerRecordTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/InferringConsumerRecordTimestampExtractor.java @@ -19,11 +19,11 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.streams.errors.StreamsException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Retrieves embedded metadata timestamps from Kafka messages. + * If a record has a negative (invalid) timestamp value a new timestamp will be inferred as the current stream-time. + *

* Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new * 0.10+ Kafka message format. *

@@ -38,37 +38,33 @@ * using this extractor effectively provides ingestion-time semantics. *

* If you need processing-time semantics, use {@link WallclockTimestampExtractor}. - *

- * If a record has a negative (invalid) timestamp value a new timestamp will be inferred as the current - * Streams time. * - * @see ConsumerRecordTimestampExtractor - * @see RobustConsumerRecordTimestampExtractor + * @see FailingConsumerRecordTimestampExtractor + * @see SkipInvalidConsumerRecordTimestampExtractor * @see WallclockTimestampExtractor */ -public class InferringConsumerRecordTimestampExtractor implements TimestampExtractor { +public class InferringConsumerRecordTimestampExtractor extends AbstractConsumerRecordTimestampExtractor { /** - * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}. - * If no valid timestamp can be extracted, a new timestamp will be inferred as the current Streams time. + * Returns the current stream-time as new timestamp for the record. * * @param record a data record + * @param recordTimestamp the timestamp extractor from the record * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) - * @return the embedded metadata timestamp of the given {@link ConsumerRecord} or current Streams time + * @return the current stream-time as new timestamp for the record + * @throws StreamsException if current streams-time is unknown */ @Override - public long extract(final ConsumerRecord record, final long currentStreamsTime) { - final long timestamp = record.timestamp(); - - if (timestamp < 0) { - if (currentStreamsTime == -1) { - throw new StreamsException("Could not infer new timestamp for input record " + record - + " because current internal Streams time in unknown"); - } - return currentStreamsTime; + public long onInvalidTimestamp(final ConsumerRecord record, + final long recordTimestamp, + final long currentStreamsTime) + throws StreamsException { + if (currentStreamsTime == -1) { + throw new StreamsException("Could not infer new timestamp for input record " + record + + " because current internal Streams time in unknown"); } - - return timestamp; + return currentStreamsTime; } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/RobustConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/SkipInvalidConsumerRecordTimestampExtractor.java similarity index 72% rename from streams/src/main/java/org/apache/kafka/streams/processor/RobustConsumerRecordTimestampExtractor.java rename to streams/src/main/java/org/apache/kafka/streams/processor/SkipInvalidConsumerRecordTimestampExtractor.java index 670addf537be..6a0e6f14f232 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/RobustConsumerRecordTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/SkipInvalidConsumerRecordTimestampExtractor.java @@ -18,12 +18,15 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.streams.errors.StreamsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Retrieves embedded metadata timestamps from Kafka messages. + * If a record has a negative (invalid) timestamp value the timestamp is returned as-is; + * in addition, a WARN message is logged in your application. + * Returning the timestamp as-is results in dropping the record, i.e., the record will not be processed. + *

* Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new * 0.10+ Kafka message format. *

@@ -38,36 +41,28 @@ * using this extractor effectively provides ingestion-time semantics. *

* If you need processing-time semantics, use {@link WallclockTimestampExtractor}. - *

- * If a record has a negative (invalid) timestamp value the timestamp is returned as-is; - * in addition, a WARN message is logged in your application. - * Returning the timestamp as-is results in dropping the record, i.e., the record will not be processed. * - * @see ConsumerRecordTimestampExtractor + * @see FailingConsumerRecordTimestampExtractor * @see InferringConsumerRecordTimestampExtractor * @see WallclockTimestampExtractor */ -public class RobustConsumerRecordTimestampExtractor implements TimestampExtractor { - private static final Logger log = LoggerFactory.getLogger(RobustConsumerRecordTimestampExtractor.class); +public class SkipInvalidConsumerRecordTimestampExtractor extends AbstractConsumerRecordTimestampExtractor { + private static final Logger log = LoggerFactory.getLogger(SkipInvalidConsumerRecordTimestampExtractor.class); /** - * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord} - * and writes a log warn message if the extracted timestamp is negative, - * because the record will no be processed but dropped. + * Writes a log warn message as the extracted timestamp is negative and the record will no be processed but dropped. * * @param record a data record + * @param recordTimestamp the timestamp extractor from the record * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) - * @return the embedded metadata timestamp of the given {@link ConsumerRecord} + * @return the originally extracted timestamp of the record */ @Override - public long extract(final ConsumerRecord record, final long currentStreamsTime) { - final long timestamp = record.timestamp(); - - if (timestamp < 0) { - log.warn("Input record {} will be dropped because it has an invalid (negative) timestamp.", record); - } - - return timestamp; + public long onInvalidTimestamp(final ConsumerRecord record, + final long recordTimestamp, + final long currentStreamsTime) { + log.warn("Input record {} will be dropped because it has an invalid (negative) timestamp.", record); + return recordTimestamp; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index 4f333f93c9fd..2c3fcae3fcff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -38,7 +38,7 @@ public interface TimestampExtractor { * * * @param record a data record - * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) + * @param currentStreamsTime the current value of the internally tracked stream-time (could be -1 if unknown) * @return the timestamp of the record */ long extract(ConsumerRecord record, long currentStreamsTime); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java index 3b26f6582e2f..7c66c81eb2e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java @@ -24,11 +24,11 @@ *

* Using this extractor effectively provides processing-time semantics. *

- * If you need event-time semantics, use {@link ConsumerRecordTimestampExtractor} with + * If you need event-time semantics, use {@link FailingConsumerRecordTimestampExtractor} with * built-in CreateTime or LogAppendTime timestamp (see KIP-32: Add timestamps to Kafka message for details). * - * @see ConsumerRecordTimestampExtractor - * @see RobustConsumerRecordTimestampExtractor + * @see FailingConsumerRecordTimestampExtractor + * @see SkipInvalidConsumerRecordTimestampExtractor */ public class WallclockTimestampExtractor implements TimestampExtractor { @@ -36,10 +36,11 @@ public class WallclockTimestampExtractor implements TimestampExtractor { * Return the current wall clock time as timestamp. * * @param record a data record + * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) * @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC */ @Override - public long extract(ConsumerRecord record) { + public long extract(final ConsumerRecord record, final long currentStreamsTime) { return System.currentTimeMillis(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index e05011b998d4..a40b9ff91283 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -102,11 +102,11 @@ public int addRawRecords(Iterable> rawRecords, Ti rawRecord.checksum(), rawRecord.serializedKeySize(), rawRecord.serializedValueSize(), key, value); - long timestamp = timestampExtractor.extract(record); + long timestamp = timestampExtractor.extract(record, timeTracker.get()); log.trace("Source node {} extracted timestamp {} for record {} when adding to buffered queue", source.name(), timestamp, record); - // drop message if TS is invalid, ie, negative + // drop message if TS is invalid, i.e., negative if (timestamp < 0) { continue; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 54ee43cd5436..32456c1b9e3c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -17,10 +17,6 @@ package org.apache.kafka.streams.processor.internals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Deserializer; @@ -49,6 +45,10 @@ import java.io.File; import java.util.Properties; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + public class ProcessorTopologyTest { private static final Serializer STRING_SERIALIZER = new StringSerializer(); @@ -403,7 +403,7 @@ public Processor get() { public static class CustomTimestampExtractor implements TimestampExtractor { @Override - public long extract(ConsumerRecord record) { + public long extract(final ConsumerRecord record, final long currentStreamsTime) { return timestamp; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index 61b6b76d213e..efd319a2002e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -29,8 +29,8 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor; -import org.apache.kafka.streams.processor.RobustConsumerRecordTimestampExtractor; +import org.apache.kafka.streams.processor.FailingConsumerRecordTimestampExtractor; +import org.apache.kafka.streams.processor.SkipInvalidConsumerRecordTimestampExtractor; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockTimestampExtractor; @@ -145,7 +145,7 @@ public void shouldThrowOnNegativeTimestamp() { final List> records = Collections.singletonList( new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); - queue.addRawRecords(records, new ConsumerRecordTimestampExtractor()); + queue.addRawRecords(records, new FailingConsumerRecordTimestampExtractor()); } @Test @@ -154,7 +154,7 @@ public void shouldDropOnNegativeTimestamp() { final List> records = Collections.singletonList( new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); - queue.addRawRecords(records, new RobustConsumerRecordTimestampExtractor()); + queue.addRawRecords(records, new SkipInvalidConsumerRecordTimestampExtractor()); assertEquals(0, queue.size()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java index 04e264c22876..00570865688d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java @@ -25,7 +25,7 @@ public class TestTimestampExtractor implements TimestampExtractor { private final long base = SmokeTestUtil.START_TIME; @Override - public long extract(ConsumerRecord record) { + public long extract(final ConsumerRecord record, final long currentStreamsTime) { switch (record.topic()) { case "data": return base + (Integer) record.value(); diff --git a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java index 274e7b5bd7d4..3f8b4fe07329 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java +++ b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java @@ -24,7 +24,7 @@ public class MockTimestampExtractor implements TimestampExtractor { @Override - public long extract(ConsumerRecord record) { + public long extract(final ConsumerRecord record, final long currentStreamsTime) { return record.offset(); } } From 9d7813b05ea945b8a9592fa7dc621b1dad98b0dc Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 14 Nov 2016 14:34:42 -0800 Subject: [PATCH 4/8] updated parameter and class names added test for new TimestampExtractors added new metrics --- checkstyle/import-control.xml | 2 +- .../pageview/JsonTimestampExtractor.java | 2 +- .../apache/kafka/streams/StreamsConfig.java | 4 +- ...va => ExtractRecordMetadataTimestamp.java} | 18 +-- ...actor.java => FailOnInvalidTimestamp.java} | 12 +- ...java => LogAndSkipOnInvalidTimestamp.java} | 15 +- .../streams/processor/TimestampExtractor.java | 8 +- ...=> UsePreviousTimeOnInvalidTimestamp.java} | 16 +-- .../WallclockTimestampExtractor.java | 10 +- .../processor/internals/StreamTask.java | 15 +- .../processor/internals/StreamThread.java | 13 +- .../processor/TimestampExtractorTest.java | 128 ++++++++++++++++++ .../internals/ProcessorTopologyTest.java | 2 +- .../processor/internals/RecordQueueTest.java | 8 +- .../smoketest/TestTimestampExtractor.java | 2 +- .../kafka/test/MockTimestampExtractor.java | 2 +- 16 files changed, 201 insertions(+), 56 deletions(-) rename streams/src/main/java/org/apache/kafka/streams/processor/{AbstractConsumerRecordTimestampExtractor.java => ExtractRecordMetadataTimestamp.java} (81%) rename streams/src/main/java/org/apache/kafka/streams/processor/{FailingConsumerRecordTimestampExtractor.java => FailOnInvalidTimestamp.java} (87%) rename streams/src/main/java/org/apache/kafka/streams/processor/{SkipInvalidConsumerRecordTimestampExtractor.java => LogAndSkipOnInvalidTimestamp.java} (80%) rename streams/src/main/java/org/apache/kafka/streams/processor/{InferringConsumerRecordTimestampExtractor.java => UsePreviousTimeOnInvalidTimestamp.java} (82%) create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 7716f4349708..4103a2385c56 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -27,6 +27,7 @@ + @@ -149,7 +150,6 @@ - diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java index b31b0fd004e1..918cd652805f 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java @@ -28,7 +28,7 @@ public class JsonTimestampExtractor implements TimestampExtractor { @Override - public long extract(final ConsumerRecord record, final long currentStreamsTime) { + public long extract(final ConsumerRecord record, final long previousTimestamp) { if (record.value() instanceof PageViewTypedDemo.PageView) { return ((PageViewTypedDemo.PageView) record.value()).timestamp; } diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 82e61baa33e9..53f49ec075d2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -28,7 +28,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.FailingConsumerRecordTimestampExtractor; +import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.DefaultPartitionGrouper; import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; import org.apache.kafka.streams.processor.internals.StreamThread; @@ -172,7 +172,7 @@ public class StreamsConfig extends AbstractConfig { REPLICATION_FACTOR_DOC) .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, Type.CLASS, - FailingConsumerRecordTimestampExtractor.class.getName(), + FailOnInvalidTimestamp.class.getName(), Importance.MEDIUM, TIMESTAMP_EXTRACTOR_CLASS_DOC) .define(PARTITION_GROUPER_CLASS_CONFIG, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java similarity index 81% rename from streams/src/main/java/org/apache/kafka/streams/processor/AbstractConsumerRecordTimestampExtractor.java rename to streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java index 2ff363b8e20f..a01ef2f7d1e5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractConsumerRecordTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java @@ -38,26 +38,26 @@ *

* If you need processing-time semantics, use {@link WallclockTimestampExtractor}. * - * @see FailingConsumerRecordTimestampExtractor - * @see SkipInvalidConsumerRecordTimestampExtractor - * @see InferringConsumerRecordTimestampExtractor + * @see FailOnInvalidTimestamp + * @see LogAndSkipOnInvalidTimestamp + * @see UsePreviousTimeOnInvalidTimestamp * @see WallclockTimestampExtractor */ -public abstract class AbstractConsumerRecordTimestampExtractor implements TimestampExtractor { +abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor { /** * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}. * * @param record a data record - * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) + * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the embedded metadata timestamp of the given {@link ConsumerRecord} */ @Override - public long extract(final ConsumerRecord record, final long currentStreamsTime) { + public long extract(final ConsumerRecord record, final long previousTimestamp) { final long timestamp = record.timestamp(); if (timestamp < 0) { - return onInvalidTimestamp(record, timestamp, currentStreamsTime); + return onInvalidTimestamp(record, timestamp, previousTimestamp); } return timestamp; @@ -68,10 +68,10 @@ public long extract(final ConsumerRecord record, final long curr * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) + * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return a new timestamp for the record */ public abstract long onInvalidTimestamp(final ConsumerRecord record, final long recordTimestamp, - final long currentStreamsTime); + final long previousTimestamp); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/FailingConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java similarity index 87% rename from streams/src/main/java/org/apache/kafka/streams/processor/FailingConsumerRecordTimestampExtractor.java rename to streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java index 22e0abf73a4a..d7f64a297d93 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/FailingConsumerRecordTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java @@ -39,25 +39,25 @@ *

* If you need processing-time semantics, use {@link WallclockTimestampExtractor}. * - * @see SkipInvalidConsumerRecordTimestampExtractor - * @see InferringConsumerRecordTimestampExtractor + * @see LogAndSkipOnInvalidTimestamp + * @see UsePreviousTimeOnInvalidTimestamp * @see WallclockTimestampExtractor */ -public class FailingConsumerRecordTimestampExtractor extends AbstractConsumerRecordTimestampExtractor { +public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { /** - * Raised an exception. + * Raises an exception on every call. * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) + * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return nothing; always raises an exception * @throws StreamsException on every invocation */ @Override public long onInvalidTimestamp(final ConsumerRecord record, final long recordTimestamp, - final long currentStreamsTime) + final long previousTimestamp) throws StreamsException { throw new StreamsException("Input record " + record + " has invalid (negative) timestamp. " + "Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/SkipInvalidConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java similarity index 80% rename from streams/src/main/java/org/apache/kafka/streams/processor/SkipInvalidConsumerRecordTimestampExtractor.java rename to streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java index 6a0e6f14f232..f24fd158c9be 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/SkipInvalidConsumerRecordTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java @@ -42,25 +42,26 @@ *

* If you need processing-time semantics, use {@link WallclockTimestampExtractor}. * - * @see FailingConsumerRecordTimestampExtractor - * @see InferringConsumerRecordTimestampExtractor + * @see FailOnInvalidTimestamp + * @see UsePreviousTimeOnInvalidTimestamp * @see WallclockTimestampExtractor */ -public class SkipInvalidConsumerRecordTimestampExtractor extends AbstractConsumerRecordTimestampExtractor { - private static final Logger log = LoggerFactory.getLogger(SkipInvalidConsumerRecordTimestampExtractor.class); +public class LogAndSkipOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { + private static final Logger log = LoggerFactory.getLogger(LogAndSkipOnInvalidTimestamp.class); /** - * Writes a log warn message as the extracted timestamp is negative and the record will no be processed but dropped. + * Writes a log WARN message when the extracted timestamp is invalid (negative) but returns the invalid timestamp as-is, + * which ultimately causes the record to be skipped and not to be processed. * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) + * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the originally extracted timestamp of the record */ @Override public long onInvalidTimestamp(final ConsumerRecord record, final long recordTimestamp, - final long currentStreamsTime) { + final long previousTimestamp) { log.warn("Input record {} will be dropped because it has an invalid (negative) timestamp.", record); return recordTimestamp; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index 2c3fcae3fcff..0c666248411f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -27,8 +27,8 @@ public interface TimestampExtractor { /** - * Extracts a timestamp from a record. The timestamp must be positive. If a negative timestamp is returned, - * the record will not be processed but dropped silently. + * Extracts a timestamp from a record. The timestamp must be positive to be considered a valid timestamp. + * Returning a negative timestamp will cause the record not to be processed but rather silently skipped. *

* The extracted timestamp MUST represent the milliseconds since midnight, January 1, 1970 UTC. *

@@ -38,8 +38,8 @@ public interface TimestampExtractor { * * * @param record a data record - * @param currentStreamsTime the current value of the internally tracked stream-time (could be -1 if unknown) + * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the timestamp of the record */ - long extract(ConsumerRecord record, long currentStreamsTime); + long extract(ConsumerRecord record, long previousTimestamp); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/InferringConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java similarity index 82% rename from streams/src/main/java/org/apache/kafka/streams/processor/InferringConsumerRecordTimestampExtractor.java rename to streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java index 32006cdd59d8..27eaf4efbbba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/InferringConsumerRecordTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java @@ -22,7 +22,7 @@ /** * Retrieves embedded metadata timestamps from Kafka messages. - * If a record has a negative (invalid) timestamp value a new timestamp will be inferred as the current stream-time. + * If a record has a negative (invalid) timestamp, a new timestamp will be inferred from the current stream-time. *

* Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new * 0.10+ Kafka message format. @@ -39,31 +39,31 @@ *

* If you need processing-time semantics, use {@link WallclockTimestampExtractor}. * - * @see FailingConsumerRecordTimestampExtractor - * @see SkipInvalidConsumerRecordTimestampExtractor + * @see FailOnInvalidTimestamp + * @see LogAndSkipOnInvalidTimestamp * @see WallclockTimestampExtractor */ -public class InferringConsumerRecordTimestampExtractor extends AbstractConsumerRecordTimestampExtractor { +public class UsePreviousTimeOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { /** * Returns the current stream-time as new timestamp for the record. * * @param record a data record * @param recordTimestamp the timestamp extractor from the record - * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) + * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the current stream-time as new timestamp for the record * @throws StreamsException if current streams-time is unknown */ @Override public long onInvalidTimestamp(final ConsumerRecord record, final long recordTimestamp, - final long currentStreamsTime) + final long previousTimestamp) throws StreamsException { - if (currentStreamsTime == -1) { + if (previousTimestamp < 0) { throw new StreamsException("Could not infer new timestamp for input record " + record + " because current internal Streams time in unknown"); } - return currentStreamsTime; + return previousTimestamp; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java index 7c66c81eb2e8..6df948133153 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java @@ -24,11 +24,11 @@ *

* Using this extractor effectively provides processing-time semantics. *

- * If you need event-time semantics, use {@link FailingConsumerRecordTimestampExtractor} with + * If you need event-time semantics, use {@link FailOnInvalidTimestamp} with * built-in CreateTime or LogAppendTime timestamp (see KIP-32: Add timestamps to Kafka message for details). * - * @see FailingConsumerRecordTimestampExtractor - * @see SkipInvalidConsumerRecordTimestampExtractor + * @see FailOnInvalidTimestamp + * @see LogAndSkipOnInvalidTimestamp */ public class WallclockTimestampExtractor implements TimestampExtractor { @@ -36,11 +36,11 @@ public class WallclockTimestampExtractor implements TimestampExtractor { * Return the current wall clock time as timestamp. * * @param record a data record - * @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown) + * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) * @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC */ @Override - public long extract(final ConsumerRecord record, final long currentStreamsTime) { + public long extract(final ConsumerRecord record, final long previousTimestamp) { return System.currentTimeMillis(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 72b0b992698c..aa6aafe29fbc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -126,22 +126,27 @@ public StreamTask(TaskId id, } /** - * Adds records to queues + * Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped + * and not added to the queue for processing * * @param partition the partition * @param records the records + * @returns the number of added records */ @SuppressWarnings("unchecked") - public void addRecords(TopicPartition partition, Iterable> records) { - int queueSize = partitionGroup.addRawRecords(partition, records); + public int addRecords(TopicPartition partition, Iterable> records) { + final int oldQueueSize = partitionGroup.numBuffered(); + final int newQueueSize = partitionGroup.addRawRecords(partition, records); - log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, queueSize); + log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, newQueueSize); // if after adding these records, its partition queue's buffered size has been // increased beyond the threshold, we can then pause the consumption for this partition - if (queueSize > this.maxBufferedSize) { + if (newQueueSize > this.maxBufferedSize) { consumer.pause(singleton(partition)); } + + return newQueueSize - oldQueueSize; } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index fa5b5f11a540..21c525d77371 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -444,10 +444,13 @@ private void runLoop() { throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException); if (!records.isEmpty()) { + int numAddedRecords = 0; for (TopicPartition partition : records.partitions()) { StreamTask task = activeTasksByPartition.get(partition); - task.addRecords(partition, records.records(partition)); + numAddedRecords += task.addRecords(partition, records.records(partition)); } + sensors.processedRecordsSensor.record(numAddedRecords, timerStartedMs); + sensors.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs); polledRecords = true; } else { polledRecords = false; @@ -919,6 +922,8 @@ private class StreamsMetricsImpl implements StreamsMetrics, ThreadCacheMetrics { final Sensor punctuateTimeSensor; final Sensor taskCreationSensor; final Sensor taskDestructionSensor; + final Sensor processedRecordsSensor; + final Sensor skippedRecordsSensor; public StreamsMetricsImpl(Metrics metrics) { this.metrics = metrics; @@ -951,6 +956,12 @@ public StreamsMetricsImpl(Metrics metrics) { this.taskDestructionSensor = metrics.sensor(sensorNamePrefix + ".task-destruction"); this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count())); + + this.processedRecordsSensor = metrics.sensor(sensorNamePrefix + ".processed-records"); + this.processedRecordsSensor.add(metrics.metricName("processed-records-count", metricGrpName, "The number of processed records.", metricTags), new Count()); + + this.skippedRecordsSensor = metrics.sensor(sensorNamePrefix + ".skipped-records"); + this.skippedRecordsSensor.add(metrics.metricName("skipped-records-count", metricGrpName, "The number of skipped records.", metricTags), new Count()); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java new file mode 100644 index 000000000000..09e13a193059 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.streams.errors.StreamsException; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class TimestampExtractorTest { + + @Test + public void extractMetadataTimestamp() { + final long metadataTimestamp = 42; + + final TimestampExtractor[] extractors = new TimestampExtractor[]{ + new FailOnInvalidTimestamp(), + new LogAndSkipOnInvalidTimestamp(), + new UsePreviousTimeOnInvalidTimestamp() + }; + + for (final TimestampExtractor extractor : extractors) { + final long timestamp = extractor.extract( + new ConsumerRecord( + "anyTopic", + 0, + 0, + metadataTimestamp, + TimestampType.NO_TIMESTAMP_TYPE, + 0, + 0, + 0, + null, + null), + 0 + ); + + assertThat(timestamp, is(metadataTimestamp)); + } + } + + @Test + public void extractSystemTimestamp() { + final TimestampExtractor extractor = new WallclockTimestampExtractor(); + + final long before = System.currentTimeMillis(); + final long timestamp = extractor.extract(new ConsumerRecord("anyTopic", 0, 0, null, null), 42); + final long after = System.currentTimeMillis(); + + assertThat(timestamp, is(new Matcher() { + @Override + public boolean matches(Object item) { + return before <= timestamp && timestamp <= after; + } + + @Override + public void describeMismatch(Object item, Description mismatchDescription) {} + + @Override + public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {} + + @Override + public void describeTo(Description description) {} + })); + } + + @Test(expected = StreamsException.class) + public void failOnInvalidTimestamp() { + final TimestampExtractor extractor = new FailOnInvalidTimestamp(); + extractor.extract(new ConsumerRecord("anyTopic", 0, 0, null, null), 42); + } + + @Test + public void logAndSkipOnInvalidTimestamp() { + final long invalidMetadataTimestamp = -42; + + final TimestampExtractor extractor = new LogAndSkipOnInvalidTimestamp(); + final long timestamp = extractor.extract( + new ConsumerRecord( + "anyTopic", + 0, + 0, + invalidMetadataTimestamp, + TimestampType.NO_TIMESTAMP_TYPE, + 0, + 0, + 0, + null, + null), + 0 + ); + + assertThat(timestamp, is(invalidMetadataTimestamp)); + } + + @Test + public void usePreviousTimeOnInvalidTimestamp() { + final long previousTime = 42; + + final TimestampExtractor extractor = new UsePreviousTimeOnInvalidTimestamp(); + final long timestamp = extractor.extract( + new ConsumerRecord("anyTopic", 0, 0, null, null), + previousTime + ); + + assertThat(timestamp, is(previousTime)); + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 32456c1b9e3c..ccd6b41553fa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -403,7 +403,7 @@ public Processor get() { public static class CustomTimestampExtractor implements TimestampExtractor { @Override - public long extract(final ConsumerRecord record, final long currentStreamsTime) { + public long extract(final ConsumerRecord record, final long previousTimestamp) { return timestamp; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index efd319a2002e..e0ee3cec5011 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -29,8 +29,8 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.FailingConsumerRecordTimestampExtractor; -import org.apache.kafka.streams.processor.SkipInvalidConsumerRecordTimestampExtractor; +import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; +import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockTimestampExtractor; @@ -145,7 +145,7 @@ public void shouldThrowOnNegativeTimestamp() { final List> records = Collections.singletonList( new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); - queue.addRawRecords(records, new FailingConsumerRecordTimestampExtractor()); + queue.addRawRecords(records, new FailOnInvalidTimestamp()); } @Test @@ -154,7 +154,7 @@ public void shouldDropOnNegativeTimestamp() { final List> records = Collections.singletonList( new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); - queue.addRawRecords(records, new SkipInvalidConsumerRecordTimestampExtractor()); + queue.addRawRecords(records, new LogAndSkipOnInvalidTimestamp()); assertEquals(0, queue.size()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java index 00570865688d..0cab7f51bda6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java @@ -25,7 +25,7 @@ public class TestTimestampExtractor implements TimestampExtractor { private final long base = SmokeTestUtil.START_TIME; @Override - public long extract(final ConsumerRecord record, final long currentStreamsTime) { + public long extract(final ConsumerRecord record, final long previousTimestamp) { switch (record.topic()) { case "data": return base + (Integer) record.value(); diff --git a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java index 3f8b4fe07329..2b245784f9cb 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java +++ b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java @@ -24,7 +24,7 @@ public class MockTimestampExtractor implements TimestampExtractor { @Override - public long extract(final ConsumerRecord record, final long currentStreamsTime) { + public long extract(final ConsumerRecord record, final long previousTimestamp) { return record.offset(); } } From 6508e7f604abd88b7d592344a62e4756f83c6a09 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 18 Nov 2016 16:53:16 -0800 Subject: [PATCH 5/8] Addressed Damian's comments --- .../streams/processor/TimestampExtractor.java | 2 +- .../UsePreviousTimeOnInvalidTimestamp.java | 6 +-- .../processor/TimestampExtractorTest.java | 50 +++++++++++-------- 3 files changed, 33 insertions(+), 25 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index 0c666248411f..0de96ba287e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -39,7 +39,7 @@ public interface TimestampExtractor { * * @param record a data record * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) - * @return the timestamp of the record + * @return the timestamp of the record */ long extract(ConsumerRecord record, long previousTimestamp); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java index 27eaf4efbbba..7718b5cea7f1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java @@ -51,8 +51,8 @@ public class UsePreviousTimeOnInvalidTimestamp extends ExtractRecordMetadataTime * @param record a data record * @param recordTimestamp the timestamp extractor from the record * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) - * @return the current stream-time as new timestamp for the record - * @throws StreamsException if current streams-time is unknown + * @return the provided latest extracted valid timestamp as new timestamp for the record + * @throws StreamsException if latest extracted valid timestamp is unknown */ @Override public long onInvalidTimestamp(final ConsumerRecord record, @@ -61,7 +61,7 @@ public long onInvalidTimestamp(final ConsumerRecord record, throws StreamsException { if (previousTimestamp < 0) { throw new StreamsException("Could not infer new timestamp for input record " + record - + " because current internal Streams time in unknown"); + + " because latest extracted valid timestamp is unknown."); } return previousTimestamp; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java index 09e13a193059..289fa51b116c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java @@ -19,8 +19,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.streams.errors.StreamsException; +import org.hamcrest.BaseMatcher; import org.hamcrest.Description; -import org.hamcrest.Matcher; import org.junit.Test; import static org.hamcrest.CoreMatchers.is; @@ -40,7 +40,7 @@ public void extractMetadataTimestamp() { for (final TimestampExtractor extractor : extractors) { final long timestamp = extractor.extract( - new ConsumerRecord( + new ConsumerRecord<>( "anyTopic", 0, 0, @@ -63,30 +63,16 @@ public void extractSystemTimestamp() { final TimestampExtractor extractor = new WallclockTimestampExtractor(); final long before = System.currentTimeMillis(); - final long timestamp = extractor.extract(new ConsumerRecord("anyTopic", 0, 0, null, null), 42); + final long timestamp = extractor.extract(new ConsumerRecord<>("anyTopic", 0, 0, null, null), 42); final long after = System.currentTimeMillis(); - assertThat(timestamp, is(new Matcher() { - @Override - public boolean matches(Object item) { - return before <= timestamp && timestamp <= after; - } - - @Override - public void describeMismatch(Object item, Description mismatchDescription) {} - - @Override - public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {} - - @Override - public void describeTo(Description description) {} - })); + assertThat(timestamp, is(new InBetween(before, after))); } @Test(expected = StreamsException.class) public void failOnInvalidTimestamp() { final TimestampExtractor extractor = new FailOnInvalidTimestamp(); - extractor.extract(new ConsumerRecord("anyTopic", 0, 0, null, null), 42); + extractor.extract(new ConsumerRecord<>("anyTopic", 0, 0, null, null), 42); } @Test @@ -95,7 +81,7 @@ public void logAndSkipOnInvalidTimestamp() { final TimestampExtractor extractor = new LogAndSkipOnInvalidTimestamp(); final long timestamp = extractor.extract( - new ConsumerRecord( + new ConsumerRecord<>( "anyTopic", 0, 0, @@ -118,11 +104,33 @@ public void usePreviousTimeOnInvalidTimestamp() { final TimestampExtractor extractor = new UsePreviousTimeOnInvalidTimestamp(); final long timestamp = extractor.extract( - new ConsumerRecord("anyTopic", 0, 0, null, null), + new ConsumerRecord<>("anyTopic", 0, 0, null, null), previousTime ); assertThat(timestamp, is(previousTime)); } + private static class InBetween extends BaseMatcher { + private final long before; + private final long after; + + public InBetween(long before, long after) { + this.before = before; + this.after = after; + } + + @Override + public boolean matches(Object item) { + final long timestamp = (Long) item; + return before <= timestamp && timestamp <= after; + } + + @Override + public void describeMismatch(Object item, Description mismatchDescription) {} + + @Override + public void describeTo(Description description) {} + } + } From 67a4eb843d8b5019f8a7aefa312356a7a45be09d Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 7 Dec 2016 23:45:34 -0800 Subject: [PATCH 6/8] Damian's and Guozhang's comments Added upgrade section to docs --- docs/upgrade.html | 13 +++++++++++++ .../processor/ExtractRecordMetadataTimestamp.java | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/docs/upgrade.html b/docs/upgrade.html index e9fef1fe1930..52d08a84a6e0 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -15,6 +15,19 @@ limitations under the License. --> +

Upgrading from 0.8.x, 0.9.x, 0.10.0.X, or 0.10.1.X to 0.10.2.0

+ +

For a rolling upgrade:

+ +
    +
  1. Upgrading a Kafka Streams Applications: +
      +
    • You need to recompile your code. Just swapping the jar file will not work and will break your appliation.
    • +
    • If you use a custom timestamp extractor, you will need to update this code, because the TimestampExtractor interface got changed.
    • +
    +
  2. +
+

Upgrading from 0.8.x, 0.9.x or 0.10.0.X to 0.10.1.0

0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please notice the Potential breaking changes in 0.10.1.0 before upgrade. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java index a01ef2f7d1e5..cbe024ee6cf4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java @@ -69,7 +69,7 @@ public long extract(final ConsumerRecord record, final long prev * @param record a data record * @param recordTimestamp the timestamp extractor from the record * @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown) - * @return a new timestamp for the record + * @return a new timestamp for the record (if negative, record will not be processed but dropped silently) */ public abstract long onInvalidTimestamp(final ConsumerRecord record, final long recordTimestamp, From 53b3996c0881b72332b6b72d821d77503f765a37 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 8 Dec 2016 14:14:57 -0800 Subject: [PATCH 7/8] removed 'processed' sensor split test into multiple classes --- .../processor/internals/StreamThread.java | 5 - .../processor/FailOnInvalidTimestampTest.java | 36 +++++++ .../LogAndSkipOnInvalidTimestampTest.java | 56 +++++++++++ .../processor/TimestampExtractorTest.java | 96 +------------------ ...UsePreviousTimeOnInvalidTimestampTest.java | 45 +++++++++ .../WallclockTimestampExtractorTest.java | 62 ++++++++++++ 6 files changed, 203 insertions(+), 97 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/FailOnInvalidTimestampTest.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestampTest.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/WallclockTimestampExtractorTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 21c525d77371..c9f07bf06398 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -449,7 +449,6 @@ private void runLoop() { StreamTask task = activeTasksByPartition.get(partition); numAddedRecords += task.addRecords(partition, records.records(partition)); } - sensors.processedRecordsSensor.record(numAddedRecords, timerStartedMs); sensors.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs); polledRecords = true; } else { @@ -922,7 +921,6 @@ private class StreamsMetricsImpl implements StreamsMetrics, ThreadCacheMetrics { final Sensor punctuateTimeSensor; final Sensor taskCreationSensor; final Sensor taskDestructionSensor; - final Sensor processedRecordsSensor; final Sensor skippedRecordsSensor; public StreamsMetricsImpl(Metrics metrics) { @@ -957,9 +955,6 @@ public StreamsMetricsImpl(Metrics metrics) { this.taskDestructionSensor = metrics.sensor(sensorNamePrefix + ".task-destruction"); this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count())); - this.processedRecordsSensor = metrics.sensor(sensorNamePrefix + ".processed-records"); - this.processedRecordsSensor.add(metrics.metricName("processed-records-count", metricGrpName, "The number of processed records.", metricTags), new Count()); - this.skippedRecordsSensor = metrics.sensor(sensorNamePrefix + ".skipped-records"); this.skippedRecordsSensor.add(metrics.metricName("skipped-records-count", metricGrpName, "The number of skipped records.", metricTags), new Count()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/FailOnInvalidTimestampTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/FailOnInvalidTimestampTest.java new file mode 100644 index 000000000000..738e956cddc1 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/FailOnInvalidTimestampTest.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.errors.StreamsException; +import org.junit.Test; + +public class FailOnInvalidTimestampTest extends TimestampExtractorTest { + + @Test + public void extractMetadataTimestamp() { + testExtractMetadataTimestamp(new FailOnInvalidTimestamp()); + } + + @Test(expected = StreamsException.class) + public void failOnInvalidTimestamp() { + final TimestampExtractor extractor = new FailOnInvalidTimestamp(); + extractor.extract(new ConsumerRecord<>("anyTopic", 0, 0, null, null), 42); + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java new file mode 100644 index 000000000000..92d87094522b --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.record.TimestampType; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class LogAndSkipOnInvalidTimestampTest extends TimestampExtractorTest { + + @Test + public void extractMetadataTimestamp() { + testExtractMetadataTimestamp(new LogAndSkipOnInvalidTimestamp()); + } + + @Test + public void logAndSkipOnInvalidTimestamp() { + final long invalidMetadataTimestamp = -42; + + final TimestampExtractor extractor = new LogAndSkipOnInvalidTimestamp(); + final long timestamp = extractor.extract( + new ConsumerRecord<>( + "anyTopic", + 0, + 0, + invalidMetadataTimestamp, + TimestampType.NO_TIMESTAMP_TYPE, + 0, + 0, + 0, + null, + null), + 0 + ); + + assertThat(timestamp, is(invalidMetadataTimestamp)); + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java index 289fa51b116c..93e0b5bffa57 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java @@ -18,74 +18,21 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.streams.errors.StreamsException; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; -import org.junit.Test; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -public class TimestampExtractorTest { +class TimestampExtractorTest { - @Test - public void extractMetadataTimestamp() { + void testExtractMetadataTimestamp(TimestampExtractor extractor) { final long metadataTimestamp = 42; - final TimestampExtractor[] extractors = new TimestampExtractor[]{ - new FailOnInvalidTimestamp(), - new LogAndSkipOnInvalidTimestamp(), - new UsePreviousTimeOnInvalidTimestamp() - }; - - for (final TimestampExtractor extractor : extractors) { - final long timestamp = extractor.extract( - new ConsumerRecord<>( - "anyTopic", - 0, - 0, - metadataTimestamp, - TimestampType.NO_TIMESTAMP_TYPE, - 0, - 0, - 0, - null, - null), - 0 - ); - - assertThat(timestamp, is(metadataTimestamp)); - } - } - - @Test - public void extractSystemTimestamp() { - final TimestampExtractor extractor = new WallclockTimestampExtractor(); - - final long before = System.currentTimeMillis(); - final long timestamp = extractor.extract(new ConsumerRecord<>("anyTopic", 0, 0, null, null), 42); - final long after = System.currentTimeMillis(); - - assertThat(timestamp, is(new InBetween(before, after))); - } - - @Test(expected = StreamsException.class) - public void failOnInvalidTimestamp() { - final TimestampExtractor extractor = new FailOnInvalidTimestamp(); - extractor.extract(new ConsumerRecord<>("anyTopic", 0, 0, null, null), 42); - } - - @Test - public void logAndSkipOnInvalidTimestamp() { - final long invalidMetadataTimestamp = -42; - - final TimestampExtractor extractor = new LogAndSkipOnInvalidTimestamp(); final long timestamp = extractor.extract( new ConsumerRecord<>( "anyTopic", 0, 0, - invalidMetadataTimestamp, + metadataTimestamp, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, @@ -95,42 +42,7 @@ public void logAndSkipOnInvalidTimestamp() { 0 ); - assertThat(timestamp, is(invalidMetadataTimestamp)); - } - - @Test - public void usePreviousTimeOnInvalidTimestamp() { - final long previousTime = 42; - - final TimestampExtractor extractor = new UsePreviousTimeOnInvalidTimestamp(); - final long timestamp = extractor.extract( - new ConsumerRecord<>("anyTopic", 0, 0, null, null), - previousTime - ); - - assertThat(timestamp, is(previousTime)); - } - - private static class InBetween extends BaseMatcher { - private final long before; - private final long after; - - public InBetween(long before, long after) { - this.before = before; - this.after = after; - } - - @Override - public boolean matches(Object item) { - final long timestamp = (Long) item; - return before <= timestamp && timestamp <= after; - } - - @Override - public void describeMismatch(Object item, Description mismatchDescription) {} - - @Override - public void describeTo(Description description) {} + assertThat(timestamp, is(metadataTimestamp)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestampTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestampTest.java new file mode 100644 index 000000000000..09617fa99699 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestampTest.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class UsePreviousTimeOnInvalidTimestampTest extends TimestampExtractorTest { + + @Test + public void extractMetadataTimestamp() { + testExtractMetadataTimestamp(new UsePreviousTimeOnInvalidTimestamp()); + } + + @Test + public void usePreviousTimeOnInvalidTimestamp() { + final long previousTime = 42; + + final TimestampExtractor extractor = new UsePreviousTimeOnInvalidTimestamp(); + final long timestamp = extractor.extract( + new ConsumerRecord<>("anyTopic", 0, 0, null, null), + previousTime + ); + + assertThat(timestamp, is(previousTime)); + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/WallclockTimestampExtractorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/WallclockTimestampExtractorTest.java new file mode 100644 index 000000000000..b7b49bbf5374 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/WallclockTimestampExtractorTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class WallclockTimestampExtractorTest { + + @Test + public void extractSystemTimestamp() { + final TimestampExtractor extractor = new WallclockTimestampExtractor(); + + final long before = System.currentTimeMillis(); + final long timestamp = extractor.extract(new ConsumerRecord<>("anyTopic", 0, 0, null, null), 42); + final long after = System.currentTimeMillis(); + + assertThat(timestamp, is(new InBetween(before, after))); + } + + private static class InBetween extends BaseMatcher { + private final long before; + private final long after; + + public InBetween(long before, long after) { + this.before = before; + this.after = after; + } + + @Override + public boolean matches(Object item) { + final long timestamp = (Long) item; + return before <= timestamp && timestamp <= after; + } + + @Override + public void describeMismatch(Object item, Description mismatchDescription) {} + + @Override + public void describeTo(Description description) {} + } + +} From 477bdb817178d1db46ae14c9019d22be4d44a320 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 9 Dec 2016 11:42:00 -0800 Subject: [PATCH 8/8] Change metric to use Rate() --- .../apache/kafka/streams/processor/internals/StreamThread.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index c9f07bf06398..36e2dc6a026a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -956,7 +956,7 @@ public StreamsMetricsImpl(Metrics metrics) { this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count())); this.skippedRecordsSensor = metrics.sensor(sensorNamePrefix + ".skipped-records"); - this.skippedRecordsSensor.add(metrics.metricName("skipped-records-count", metricGrpName, "The number of skipped records.", metricTags), new Count()); + this.skippedRecordsSensor.add(metrics.metricName("skipped-records-count", metricGrpName, "The average per-second number of skipped records.", metricTags), new Rate(new Count())); } @Override