Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4393: Improve invalid/negative TS handling #2117

Closed
wants to merge 8 commits into from

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Nov 9, 2016

No description provided.

@mjsax
Copy link
Member Author

mjsax commented Nov 9, 2016

@guozhangwang @enothereska @dguy @miguno @hjafarpour

I am not happy with the name RobustConsumerRecordTimestampExtractor but could not come up with anything better... Suggestions are welcome.

I am also not sure, if we still need the TS check in SinkNode (and the corresponding test). I doubt it can happen that a negative TS is received. (btw: the user errors reported with exception in KafkaProducer are only valid for 0.10.0.0 -- in 0.10.0.1 we introduced a TS check, ie, fail-fast policy already.

Copy link
Contributor

@miguno miguno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now I think I am confused about this change.

We are logging WARN messages in the new extractor (and say we are "dropping" messages with invalid timestamps, though in fact we are not doing that in the extractor -- so how can the extractor claim that to happen with its local-only knowledge?). And in other parts of the code we are now silently dropping/skipping any such messages.

We need more clarity/structure here. My understanding is as follows:

  • The default timestamp extractor and the new timestamp extractor differ only in whether or not they are logging a WARN message when a message has an invalid timestamp.
  • We are changing other parts of the code to silently skip messages with invalid timestamps.

* Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps to Kafka message).
* <p>
* Here, "built-in" refers to the fact that compatible Kafka producer clients automatically and
* transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we mention the KIP above, but I think we should still say sth like "the new 0.10 message format". Why? Because users have always been able to embed timestamps into the message keys and/or message values, but when we say "embedded timestamps" here we mean the new message field for timestamps.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just c&p from original ConsumerRecordTimestampExtractor -- maybe we want to update both.

* transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved
* via this timestamp extractor.
* <p>
* If <i>CreateTime</i> is used to define the built-in timestamps, using this extractor effectively provide
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provide[s]

* transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved
* via this timestamp extractor.
* <p>
* If <i>CreateTime</i> is used to define the built-in timestamps, using this extractor effectively provide
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Define" and this section sound a bit weird.

What about sth like:

If the embedded timestamps of messages in 0.10+ message format represent CreateTime (cf. Kafka broker setting log.message.timestamp.type and Kafka topic setting message.timestamp.type), this extractor effectively provides ...

* Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps to Kafka message).
* <p>
* Here, "built-in" refers to the fact that compatible Kafka producer clients automatically and
* transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they sen[d]

* <p>
* If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}.
* <p>
* If a record has a negative timestamp value, a log message is written and the timestamp is returned.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I guess we need to be a bit careful here because "log" and "message" are overloaded terms in Kafka. ;-)

What about sth like:

If a record has a negative (invalid) timestamp, the timestamp is returned as-is, but in addition a WARN message is logged in your application.

private static final Logger log = LoggerFactory.getLogger(RobustConsumerRecordTimestampExtractor.class);

/**
* Extracts the embedded meta data timestamp from the given {@link ConsumerRecord}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"meta data" -> "metadata"

* because the record will no be processed but dropped.
*
* @param record a data record
* @return the embedded meta-data timestamp of the given {@link ConsumerRecord}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meta-data -> metadata

final long timestamp = record.timestamp();

if (timestamp < 0) {
log.warn("Dropping input record {} because it has an invalid (negative) timestamp.", record);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is IMHO quite confusing (and, ultimately, wrong).

We say "We are dropping this record", but in fact we are not -- we are returning the invalid timestamp as-is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The record gets dropped in RecordQueue if the timestamp is negative

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dguy: I am aware that this ultimately happens. My point is that the current code structure needs improvement. RobustConsumerRecordTimestampExtractor should only make claims based on its local knowledge. The above is a symptom of the problem, but not the root cause.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change this to "Input record {} will be dropped because it has an invalid (negative) timestamp.".

@@ -107,8 +107,9 @@ public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords, Ti
log.trace("Source node {} extracted timestamp {} for record {} when adding to buffered queue", source.name(), timestamp, record);

// validate that timestamp must be non-negative
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code comment is no longer correct. We are not validating any longer, we are silently skipping.

* @see ConsumerRecordTimestampExtractor
* @see WallclockTimestampExtractor
*/
public class RobustConsumerRecordTimestampExtractor implements TimestampExtractor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we need a better name, and we can do that once we have more clarity on how we want to improve the current code.

@miguno
Copy link
Contributor

miguno commented Nov 9, 2016

I think I realize that the original idea of splitting the functionalities of the timestamp extractor and the handler (for messages with invalid timestamps) might have been the better idea: The timestamp extractor should only be responsible for extracting the timestamp, whereas the handler (which would be used e.g. inside addRawRecords) should only be responsible for determining what happens to records with invalid timestamps.

At the moment, we are in the unfortunate situation where, for example, the (new) timestamp extractor is effectively identical to the default extractor with regards to timestamp extraction. The only difference is that it also logs a WARN message (which arguably is ok -- e.g. it may log information to say "Hey, I came across a message that doesn't include a timestamp, and in my case I won't do anything about that and return a negative timestamp as-is, but at least I can let you know") saying "this message will be dropped" (and this wording/information is wrong, because, based on its local knowledge, it can't determine whether or not any downstream code will or will not drop the message).

@dguy
Copy link
Contributor

dguy commented Nov 9, 2016

If we are going to go with this approach, i'm wondering if we should change the signature of TimestampExtractor.extract to: long extract(ConsumerRecord<Object, Object> record, long currentStreamTime) - this at least gives the user the ability to use the stream time in the event that the time can't be extracted from the record.

@mjsax
Copy link
Member Author

mjsax commented Nov 9, 2016

@miguno You current understanding of the change is no completely correct.

The default timestamp extractor and the new timestamp extractor differ only in whether or not they are logging a WARN message when a message has an invalid timestamp.

The default TS extractor raises an exception, thus, killing the running instance, while the new TS extractor just logs a WARN message.

I furthermore do understand your argument about splitting extractor and handler. However, from a user point of view, assume a user wants to react on corrupted records and also does provides a custom TS extractor (ie, not using default extractor). First, user must understand that if no valid TS can be extracted, -1 must be returned, and this will later on trigger a call to an additional error handler she must also provide. In this handler, the handling of the corrupted record will happen. So why not just handle the corrupted record directly in the extractor... that would be way more straight forward.

About local knowledge etc. If Streams policy is to silently drop records with invalid/negative timestamps, the extractor will know whats gonna happening. I don't see a big gab here.

 - added new parameter "Streams time" to extract()
 - added third timestamp extractor
@mjsax
Copy link
Member Author

mjsax commented Nov 9, 2016

Updates this.

Added InterringConsumerRecordTimestampExtractor (again, name is subject to discussion).

@@ -106,7 +106,7 @@ public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords, Ti

log.trace("Source node {} extracted timestamp {} for record {} when adding to buffered queue", source.name(), timestamp, record);

// validate that timestamp must be non-negative
// drop message if TS is invalid, ie, negative
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:"if timestamp is invalid, i.e., negative"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not even a JavaDoc comment...

@@ -37,8 +37,9 @@
* excessive log rolling and therefore broker performance degradation.
*
*
* @param record a data record
* @param record a data record
* @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Streams time" -> "stream-time" ?

import org.slf4j.LoggerFactory;

/**
* Retrieves embedded metadata timestamps from Kafka messages.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need better 1-liners for each extractor. (they are all the same right now)

*
* @see ConsumerRecordTimestampExtractor
* @see InferringConsumerRecordTimestampExtractor
* @see WallclockTimestampExtractor
*/
public class RobustConsumerRecordTimestampExtractor implements TimestampExtractor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name: Perhaps "SkipInvalidTimestampExtractor"?

@miguno
Copy link
Contributor

miguno commented Nov 10, 2016

Question: The three embedded-timestamp extractors are basically all the same with the exception of the if (timestamp < 0) conditional logic. Should we actually make them have e.g. a common parent class so that users see a proper a type-relationship between them? Or have the default extractor be the base class, and the two new ones override the conditional logic?

@mjsax
Copy link
Member Author

mjsax commented Nov 11, 2016

@miguno Added AbstractConsumerRecordTimestampExtractor as base class for the new extractors.

Copy link
Contributor

@miguno miguno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add unit tests for the various new extractors. If, for example, we say that one extractor will always log a WARN message (on invalids), then we should also validate this behavior with a unit test. Same for e.g. the extractor that always throws an exception (on invalids).

@@ -172,7 +172,7 @@
REPLICATION_FACTOR_DOC)
.define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
Type.CLASS,
ConsumerRecordTimestampExtractor.class.getName(),
FailingConsumerRecordTimestampExtractor.class.getName(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider what renaming ConsumerRecordTimestampExtractor to FailingConsumerRecordTimestampExtractor means for backwards compatibility.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it is the default extractor, use will not explicitly set it. Thus it should not be a big issue.
Of course, strictly it is a breaking change and we must mention it in the KIP (but we need a KIP anyway).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might even keep the name as-is. The reason is, that right now, if ConsumerRecordTimestampExtractor returns -1, a failure occurs too. Just not in the extractor but in RecordQueue.

*
* @param record a data record
* @param recordTimestamp the timestamp extractor from the record
* @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be more strict and say: will be -1 if unknown?


/**
* Retrieves embedded metadata timestamps from Kafka messages.
* If a record has a negative (invalid) timestamp value a new timestamp will be inferred as the current stream-time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a record has a negative (invalid) timestamp[,] a new timestamp will be inferred [from] the current stream-time.

* @see InferringConsumerRecordTimestampExtractor
* @see WallclockTimestampExtractor
*/
public class RobustConsumerRecordTimestampExtractor implements TimestampExtractor {
private static final Logger log = LoggerFactory.getLogger(RobustConsumerRecordTimestampExtractor.class);
public class SkipInvalidConsumerRecordTimestampExtractor extends AbstractConsumerRecordTimestampExtractor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about a shorter name like SkipRecordsWithInvalidTimestamps?

Also, I don't think "TimestampExtractor" must be in the name of the class. The term "ConsumerRecord" is also pretty confusing IMHO to the average user because it's not a record OF a consumer etc.

* @see InferringConsumerRecordTimestampExtractor
* @see WallclockTimestampExtractor
*/
public class FailingConsumerRecordTimestampExtractor extends AbstractConsumerRecordTimestampExtractor {
Copy link
Contributor

@miguno miguno Nov 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we go with a new name, what about sth like FailOnRecordsWithInvalidTimestamps?

My issues with "FailingConsumerRecordTimestampExtractor":

  • It sounds as if the extractor itself is failing ("all the time"). That is, the name does not tell you when (and when not) it fails.
  • "ConsumerRecord" is IMHO very confusing (see my other comment).
  • It is a really, really long name. ;-)
  • As I said down below, I don't think "TimestampExtractor" must necessarily be in the name of the implementing classes.

public class FailingConsumerRecordTimestampExtractor extends AbstractConsumerRecordTimestampExtractor {

/**
* Raised an exception.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Raised an exception"?

I guess: "Raises an exception whenever it is invoked"?

*
* @param record a data record
* @param recordTimestamp the timestamp extractor from the record
* @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Streams time" -> "stream-time"

* @see WallclockTimestampExtractor
*/
public class InferringConsumerRecordTimestampExtractor implements TimestampExtractor {
public class InferringConsumerRecordTimestampExtractor extends AbstractConsumerRecordTimestampExtractor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about a name like "ReplaceInvalidTimestampWithCurrentStreamTime"? This suggestion is still a bit too long IMHO, but it is much more descriptive (I think) than "InferringConsumerRecordTimestampExtractor".

final long recordTimestamp,
final long currentStreamsTime)
throws StreamsException {
if (currentStreamsTime == -1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we say == -1 or < 0?

* Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}
* and writes a log warn message if the extracted timestamp is negative,
* because the record will no be processed but dropped.
* Writes a log warn message as the extracted timestamp is negative and the record will no be processed but dropped.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typos:

Writes a log WARN message when the extracted timestamp is invalid (negative) but returns the the invalid timestamp as-is, which ultimately causes the record to be skipped and not to be processed.

@@ -27,17 +27,19 @@
public interface TimestampExtractor {

/**
* Extracts a timestamp from a record.
* Extracts a timestamp from a record. The timestamp must be positive. If a negative timestamp is returned,
Copy link
Contributor

@miguno miguno Nov 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest:

Extracts a timestamp from a record. The timestamp must be positive to be considered a valid timestamp. Returning a negative timestamp will cause the record not to be processed but rather silently skipped.

Question: What happens when timestamp == 0? Is this a valid or an invalid timestamp? The answer should get included in the javadoc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I would like to write The timestamp must be non-negative which includes zero, but this is hard to parse to the user... But The timestamp must be zero or positive does also sound clumsy... Not sure how to phrase is nicely... Any suggestion?

* @param record a data record
* @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC
* @param record a data record
* @param currentStreamsTime the current value of the internally tracked Streams time (could be -1 if unknown)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Streams time" -> "stream-time"

@miguno
Copy link
Contributor

miguno commented Nov 11, 2016

Further comments. Sorry for the back and forth, but I think we really need to come up with a good setup/descriptive names in order to not confuse users. :-)

@guozhangwang
Copy link
Contributor

One meta-comment: as for the name of currentStreamTime, it is not actually the stream time of the task we talked about before, but rather the partition time, and stream time is the min over all task's owned partition's time. Currently we are not exposing this partition time anywhere to the users, by doing this we are effectively doing this, and hence users need to think about partition / stream time in addition to record timestamp.

@mjsax
Copy link
Member Author

mjsax commented Nov 11, 2016

@miguno Thanks. I completely agree that the naming must be chosen wisely :) I was never happy with the naming from the beginning on. Thanks for you input.

@guozhangwang Good point. Will think this through and include a discussion in the KIP.

* @see InferringConsumerRecordTimestampExtractor
* @see WallclockTimestampExtractor
*/
public abstract class AbstractConsumerRecordTimestampExtractor implements TimestampExtractor {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make package private
Change Name

added test for new TimestampExtractors
added new metrics
@mjsax
Copy link
Member Author

mjsax commented Nov 18, 2016

@guozhangwang @enothereska @dguy @miguno @hjafarpour
Updates this according to our KIP discussions.

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of minor comments, but LGTM

* @param record a data record
* @param recordTimestamp the timestamp extractor from the record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return the current stream-time as new timestamp for the record
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @guozhangwang mentioned this already, but i believe this is actually partition time rather than stream-time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to update the text of the return statement... Thanks!

throws StreamsException {
if (previousTimestamp < 0) {
throw new StreamsException("Could not infer new timestamp for input record " + record
+ " because current internal Streams time in unknown");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

public void describeMismatch(Object item, Description mismatchDescription) {}

@Override
public void _dont_implement_Matcher___instead_extend_BaseMatcher_() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why you didn't take the advice of this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not pay attention... Will fix it.

final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));

queue.addRawRecords(records, new LogAndSkipOnInvalidTimestamp());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not part of this JIRA, but i'm wondering why the TimestampExtractor is a method param rather than a field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, it is to allow for different extractors for different topics -- even if the user API does not allow to specify different extractors right now, this should be useful if we allow for this at some point. But @guozhangwang should know better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have thought about different extractors per topic, yes, but even with that I think we can keep a field per RecordQueue, so that is not a big deal really and we can consider changing it accordingly if necessary.

@mjsax mjsax force-pushed the kafka-4393-improveInvalidTsHandling branch from 3b53b11 to 90f652c Compare November 20, 2016 06:58
@mjsax mjsax force-pushed the kafka-4393-improveInvalidTsHandling branch from 90f652c to 6508e7f Compare November 20, 2016 07:00
@mjsax
Copy link
Member Author

mjsax commented Dec 5, 2016

KIP-93 was accepted.

Call for final review @guozhangwang @enothereska @dguy @miguno

@@ -27,6 +27,7 @@
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<allow pkg="org.hamcrest" />
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this absolutely needed? cc @ijuma

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO - it is useful and i don't see why it should not be allowed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with it although @junrao seemed less convinced last time the subject came up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did talk to @ijuma and @guozhangwang and both told me they are ok with the change. Why no allowing hamcrest for tests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as the usage is valid, this is fine for me.

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of minor comments, but overall LGTM

* @param record a data record
* @param recordTimestamp the timestamp extractor from the record
* @param previousTimestamp the latest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown)
* @return a new timestamp for the record
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be mentioned here that returning a negative timestamp will result in the record being dropped?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's repetitive as this behavior is explained in detail in the interface, but I'll add it. Can't hurt.

final long metadataTimestamp = 42;

final TimestampExtractor[] extractors = new TimestampExtractor[]{
new FailOnInvalidTimestamp(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a question of taste, but IMO each of these TimestampExtractor implementations should have their own Test class rather than there being a single class covering all of them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FailOnInvalidTimestamp, LogAndSkipOnInvalidTimestamp, and UsePreviousTimeOnInvalidTimestamp extract record metadata timestamp and are thus similar -- I think it's easier to do a single test class to avoid code duplication (c.f. #extractMetadataTimestamp()).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would second @dguy for this comment: having different classes' test as test functions within a class has a potential risk that we may miss some test case for some classes when adding more later. It is easier to check that for each XX.class we have an XXTest.class.

@guozhangwang
Copy link
Contributor

I have a question about the added sensors, otherwise LGTM.

Could you add the corresponding upgrade section in docs as well? We'd encourage doing code change and doc change in a single PR so that people won't forget about them.

Added upgrade section to docs
@@ -951,6 +956,12 @@ public StreamsMetricsImpl(Metrics metrics) {

this.taskDestructionSensor = metrics.sensor(sensorNamePrefix + ".task-destruction");
this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));

this.processedRecordsSensor = metrics.sensor(sensorNamePrefix + ".processed-records");
Copy link
Contributor

@guozhangwang guozhangwang Dec 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd suggest renaming to fetchedRecordsSensor or bufferedRecordsSensor since it is not really processed yet, and similarly for its sensor string, .buffered-records);

Also what metrics are you trying to record really? Note that Count() will increment the value forever within a single sampling window, or are you trying to use Rate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I'd suggest we only keep a skip sensor for user to monitor correctness since the the "processed record sensor" is well covered in the throughput sensor already. And for that skip sensor we can use Rate(). For example, as in record-error-rate in Sender.class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you point about the naming. However, reporting buffered record is not too helpful for the user IMHO. So not adding this sensor does make sense to me.

Count() is internally. Because we assume that only a few record will be dropped, to me it seems most valuable if we report the absolute number of dropped records. Or do you have a strong objection again Count()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Count is a {@link SampledStat} that maintains a simple count of what it has seen. So with this stat, its value will be increased for the window period, then suddenly drops to zero, then start rising again. So it's hard to alert on such a metric, on the other hand Rate(Count()) will record the average rate per time unit (here second), so users in practice can easily set a threshold for alerting, and even if they want "zero tolerance", setting the threshold to be 0 can still satisfy their needs.

final long metadataTimestamp = 42;

final TimestampExtractor[] extractors = new TimestampExtractor[]{
new FailOnInvalidTimestamp(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would second @dguy for this comment: having different classes' test as test functions within a class has a potential risk that we may miss some test case for some classes when adding more later. It is easier to check that for each XX.class we have an XXTest.class.

split test into multiple classes
@mjsax
Copy link
Member Author

mjsax commented Dec 8, 2016

Updated. @guozhangwang @dguy

@asfbot
Copy link

asfbot commented Dec 8, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/19/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Dec 9, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/21/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Dec 9, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/20/
Test FAILed (JDK 8 and Scala 2.12).

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mjsax LGTM

@mjsax
Copy link
Member Author

mjsax commented Dec 9, 2016

@guozhangwang Updated according to your comment.

@asfbot
Copy link

asfbot commented Dec 9, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/47/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Dec 9, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/45/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Dec 9, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/46/
Test FAILed (JDK 8 and Scala 2.12).

@guozhangwang
Copy link
Contributor

Unit test passed locally.

@guozhangwang
Copy link
Contributor

Merged to trunk.

@asfgit asfgit closed this in 9bed8fb Dec 10, 2016
@mjsax mjsax deleted the kafka-4393-improveInvalidTsHandling branch December 10, 2016 00:26
soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Michael G. Noll, Eno Thereska, Damian Guy, Guozhang Wang

Closes apache#2117 from mjsax/kafka-4393-improveInvalidTsHandling
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants