Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6376: streams skip metrics #4812

Closed
wants to merge 4 commits into from
Closed

KAFKA-6376: streams skip metrics #4812

wants to merge 4 commits into from

Conversation

vvcephei
Copy link
Contributor

@vvcephei vvcephei commented Apr 2, 2018

  • unify skipped records metering
  • log warnings when things get skipped
  • tighten up metrics usage a bit

Testing strategy:

Unit testing of the metrics and the logs should be sufficient.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@vvcephei
Copy link
Contributor Author

vvcephei commented Apr 2, 2018

Just starting on this; still need to add the missing measurements and actually run the tests.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not seen the skippedRecordsSensor to be recorded for other reasons yet, but I think from processorContext it should be able to include the record metadata on all such occurrences.

@@ -77,10 +77,15 @@
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
deserializationException);
} else {
sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record();
log.warn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can still use the log4j format here, e.g.

"Skipping record due to deserialization error. topic={} partition={} offset={}", rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), deserializationException

With four parameters, the last one is auto interpreted as the exception; maybe we can validate if this is the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed, switching to the variant you mentioned still prints:

[2018-04-04 18:23:20,310] WARN task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[1] (org.apache.kafka.streams.processor.internals.RecordDeserializer:80)
org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4


private Sensor skippedRecordsSensor() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is a per-thread metric, I'd suggest we pre-register them at the beginning of the application. This way some other tools like JMXTool do not need to wait for the object name to show up. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

I meant to make a comment before you read this to say that there had been a concern in the discussion about having metrics reported from processor nodes (when the proposal was at the node level) that would never actually skip records, thereby polluting the metrics. I thought I'd throw the lazy registration pattern in just to see what you all thought.

I'll switch it back to pre-registration.

@guozhangwang
Copy link
Contributor

Jenkins failures are relevant to Unused import - ...

@vvcephei
Copy link
Contributor Author

vvcephei commented Apr 4, 2018

Ah, thanks, by the time I went to look at the last failure, the logs were already gone.

@vvcephei
Copy link
Contributor Author

vvcephei commented Apr 5, 2018

I took the liberty of resolving all my ide warnings for the test files. Let me know if I went too far, and I can revert them.

@vvcephei
Copy link
Contributor Author

vvcephei commented Apr 5, 2018

Checkstyle complained because I explicitly imported log4j, which is appropriate in this case. I isolated the usage to a "testutils" package, so I could allow the usage without allowing it for all of "stream.processor.internals".

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few meta comments:

  1. We should update the docs/upgrade-guide section, emphasizing that users who monitor based on the old metrics will be impacted (it is not compatible), and what they should start monitoring instead in the new version.

Also, we need to mention the new APIs added to TopologyTestDriver.

  1. We did not include any other places to record skippedRecordsSensor yet. However, the added unit test still passed, I'm not sure why it is the case (skippedRecordsSensor was not recorded when it happened yet)?

@@ -107,6 +108,10 @@ int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {

// drop message if TS is invalid, i.e., negative
if (timestamp < 0) {
log.warn(String.format(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we can get rid of String.format.


skippedRecordsSensor = metrics.sensor("skipped-records");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are stripping the prefix for this sensor: is it intentional? Note that for JMX reporter, the sensor name would not be included in any fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

@@ -546,8 +546,9 @@ void removeAllSensors() {
removeSensor(punctuateTimeSensor);
removeSensor(taskCreatedSensor);
removeSensor(tasksClosedSensor);
removeSensor(skippedRecordsSensor);

if (skippedRecordsSensor != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skippedRecordsSensor should not be null, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was when I made it lazy. I've fixed it.

new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
task.addRecords(partition1, Arrays.asList(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: flattening to a very long single line, is it intentional?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto below and in other tests like testPauseResume

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was not. I've fixed it.

put(repartition.topic(), source2);
}
},
sourcesByTopic,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: alignment.

put(topic2, source2);
}
}
sourcesByTopic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here.

put(topic2, source3);
}
}
sourcesByTopic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here.

@@ -243,7 +246,6 @@ public void testMetrics() {
assertNotNull(metrics.getSensor(defaultPrefix + ".punctuate-latency"));
assertNotNull(metrics.getSensor(defaultPrefix + ".task-created"));
assertNotNull(metrics.getSensor(defaultPrefix + ".task-closed"));
assertNotNull(metrics.getSensor(defaultPrefix + ".skipped-records"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove this sensor?

rawRecord.offset(),
deserializationException
);
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we record the thread-level skipped record sensor here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heh, what a coincidence! I think so, and that's actually part of the motivation for this change I'm proposing to the metrics.

mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], "asdfasdfasdfasdfa".getBytes()));
mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], "asdfasdfasdfasdfa".getBytes()));
thread.runOnce(-1);
assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. I did not see we have recorded the sensor for deserialization error here, why this test passed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

during thread.runOnce(-1);, it'll encounter an exception "asdfasdfasdf" as an integer and increment the metric.

// streamsMetrics.removeSensor(sensor1);
// metrics = streamsMetrics.metrics();
// assertEquals(metrics.size(), 1);
// }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disabled these tests because part of their function is to verify the number of metrics we register. This currently fails because we're registering a lot more metrics. If we decide to go with this overall strategy, I'll rethink these tests.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass over this commit only. I think it is a good idea to encapsulate the sensors into the metrics so we do not pass individual sensors around.

One concern I had though, is that by moving StreamsMetricsThreadImpl to StreamsMetricsImpl, we are effectively wrapping the thread-level metrics into the InternalStreamsMetrics since the exposed APIs like taskCreatedSensor should be thread-level only (that makes thinking, should InternalStreamsMetrics just be ThreadMetrics)?

But for other layers, the awkward thing is that, for example, in TaskMetrics it has a reference of the total thread-level metrics which it does not really need: all it needs is the functionality to register more sensors into the underlying metrics registry.

So I'm wondering instead of make InternalStreamsMetrics extends StreamsMetrics. Could we:

  1. make InternalStreamsMetrics an independent interface with those thread-level metrics (we can rename it to ThreadMetrics to clarify).

  2. let StreamsMetricsThreadImpl implement ThreadMetrics, while holding a reference to the StreamsMetrics register the sensors.

  3. Similarly for TaskMetrics, CacheMetrics and ProcessorNodeMetrics, we do the same: each exposing its sensors as an API, while the corresponding impl class to get a reference of the StreamMetrics to call its addXXSensor etc functions.

WDYT?


public class StreamsMetricsImpl implements StreamsMetrics {
public class StreamsMetricsImpl implements StreamsMetrics, InternalStreamsMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just implements InternalStreamsMetrics should be sufficient, since InternalStreamsMetrics extends StreamsMetrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, in an earlier pass they were independent interfaces.

@vvcephei
Copy link
Contributor Author

vvcephei commented Apr 6, 2018

@guozhangwang I think you're right. For Streams, the thread level is the most global scope we have atm. I think what you're pointing out is that I've conflated the global scope with the thread scope. Ideally, these would be two separate scopes.

Let me refactor a bit more, and see what you think.

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left just a couple of minor comments.

Overall I'm in favor of the change since this avoids the need to pass sensors around and returning values for metrics that can be captured where the event occurs.

But I agree with @guozhangwang on naming and structure suggested for the metrics refactoring proposed here.

metrics = streamsMetrics.metrics();
assertEquals(metrics.size(), 1);
}
// @Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For testLatencyMetrics and testThroughputMetrics maybe use @Ignore instead ? Not a big deal but by getting an ignored test count there's a better chance these two tests won't fall through the cracks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, that's how you do it. I tried @Test(ignore=true) like testng, but that obviously doesn't work...

new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
task.addRecords(partition2, Arrays.asList(new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the task.addRecords with a long list of ConsumerRecord<> seems like the only difference with each record is the offset. Maybe create a method that takes an int[] with offsets and returns a List<ConsumerRecord>?

@guozhangwang
Copy link
Contributor

I think what you're pointing out is that I've conflated the global scope with the thread scope. Ideally, these would be two separate scopes.

Actually I'm arguing that StreamsMetrics should only be some sort of a util class, that 1) wraps the actual Metrics object as the metrics registry, 2) provides util functions like addXXSensor, addSensor etc. And then ThreadMetrics, TaskMetrics etc exposes the API of their defined sensors, and in their corresponding impl class they keep a reference of the StreamsMetrics to register new metrics. We still need to note that, since we allow users to register their custom metrics via the StreamsMetrics, we need to expose StreamsMetrics in user-facing APIs, while for other built-in XXMetrics they are only used internally, while we can still expose their xxSensor functions for other internal classes to use.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good. Couple of nits and questions.

config.getLong(StreamsConfig.POLL_MS_CONFIG),
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
final StateConsumer stateConsumer = new StateConsumer(
this.logContext,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this

final StateConsumer stateConsumer = new StateConsumer(
this.logContext,
globalConsumer,
new GlobalStateUpdateTask(topology,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move topology to next line

private final Sensor tasksClosedSensor;
private final Sensor skippedRecordsSensor;


public StreamsMetricsImpl(Metrics metrics, String groupName, Map<String, String> tags) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final to the parameters to cleanup code "on the side"

this(metrics, groupName, "", tags);
}

public StreamsMetricsImpl(Metrics metrics, String groupName, final String rawPrefix, Map<String, String> tags) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

removeSensor(punctuateTimeSensor);
removeSensor(taskCreatedSensor);
removeSensor(tasksClosedSensor);
if (skippedRecordsSensor != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would skippedRecordsSensor be null? (I know this is just "move" code but still wondering why we need this)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't. That's an artifact that I need to fix.

streamsMetrics, stateDirectory, null, time, producer);
}

@SuppressWarnings("SameParameterValue")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above.

@@ -199,8 +202,8 @@ public boolean conditionMet() {
assertEquals(thread.state(), StreamThread.State.DEAD);
}

private Cluster createCluster(final int numNodes) {
HashMap<Integer, Node> nodes = new HashMap<>();
private Cluster createCluster(@SuppressWarnings("SameParameterValue") final int numNodes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this slip? Or did you leave it intentionally?

new Node[0]));
}
});
final ArrayList<PartitionInfo> topic = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: topic -> partitionsForTopic

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this slip?

new Node[0]));
}
});
final ArrayList<PartitionInfo> partitions = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: partitions -> partitionsForChangelog


import java.util.LinkedList;

public class LogCaptureAppender extends AppenderSkeleton {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@vvcephei
Copy link
Contributor Author

vvcephei commented Apr 6, 2018

@guozhangwang and @bbejeck about that experimental commit, I've decided to ditch it and implement the KIP with minimal changes to the structure of the metrics.

I think I'd like to submit a KIP to alter the metric registration strategy we're employing later on, but I don't want to pollute KIP-274.

@vvcephei
Copy link
Contributor Author

vvcephei commented Apr 6, 2018

I don't understand why these tests are failing. The message says:

00:03:46.733 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.12@2/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:804: error: incompatible types: boolean cannot be converted to StreamsConfig
00:03:46.733         task = createStatelessTask(true);

but line 804 in StreamTaskTest is:

task = createStatelessTask(createConfig());

Retest this, please.

@mjsax
Copy link
Member

mjsax commented Apr 6, 2018

I see

task = createStatelessTask(true);

(line 804) -- this makes sense, it should be task = createStatelessTask(createConfig(true));

@vvcephei
Copy link
Contributor Author

vvcephei commented Apr 6, 2018

Ah, it was because Jenkins (surprisingly) merges with trunk before testing.

Also, there was an undetected merge conflict, resulting in the broken code.

I've rebased and corrected it. Once the tests pass for me, I'll push again.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more comments.

@@ -138,18 +143,18 @@ public State state() {
* Sets the state
* @param newState New state
*/
private boolean setState(final State newState) {
private void setState(final State newState) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a private method with an unused return value. Making it void helps the reader to understand the code without having to trace through usages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei @bbejeck @mjsax I'd suggest in the future try to only piggy-back different changes into the same PR if we think they are either correlated or if they are really trivial. Having a single PR mingled with multiple changes has several drawbacks:

  1. It makes git history a bit harder to trace: think, "git blame" would be tricker to reason.
  2. It tends to generate bigger PRs than necessary, making reviewer less willing to start working on them :P
  3. If multiple rounds of reviews are needed, even requiring major code refactoring, it will surprisingly introduce regressions during those iterations as by-products of the multiple changes.

} else {
if (productionExceptionIsFatal(exception)) {
recordSendError(key, value, timestamp, topic, exception);
} else if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) {
recordSendError(key, value, timestamp, topic, exception);
} else {
log.debug(HANDLER_CONTINUED_MESSAGE, key, value, timestamp, topic, exception);
log.warn(
"Error sending records (key=[{}] value=[{}] timestamp=[{}]) to topic=[{}] and partition=[{}]; " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: do we need to [] around each value? [] is used for collections or list -- might be confusing to add them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I have developed the habit of delimiting variables in log messages, as it disambiguates the structure of the message for the reader. Without delimiters, there are several edge cases that would make the log message difficult to read.

For example, if the key were "value" and the value were "" with the old format, you get:

Error sending records (key value value  timestamp 1234)

Whereas, if the key were "" and the value were "value", you get

Error sending records (key  value value timestamp 1234)

The only difference between these strings is where the extra space is. With delimiters, you have:

Error sending records (key=[value] value=[] timestamp=[1234])
Error sending records (key=[] value=[value] timestamp=[1234])

It's the kind of thing that saves people from #1 making a bad assumption about the nature of the problem and burning hours before they realize their mistake, or #2 being unable to clearly understand the error message and having to load it in a debugger just to understand what the values of the arguments actually are.

It sounds like your concern is about the ambiguity of [] as delimiters, since they already indicate a list. Can we keep delimiters but pick a different character? Other paired delimiters are <> and {}, and "" and '' also come to mind. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally thank, that having = (without []) is good enough as the = makes it clear:

Error sending records (key=value value= timestamp=1234)

Thus, it's not ambiguous to me (I agree that having no delimiter at all would be bad).

It's just that I like uniform formatting, and this would introduce a new style -- I am fine with change to this style, but we should agree on one style and rewrite code (on the side) if it does not fit the 'style guide'.

\cc @bbejeck @dguy @guozhangwang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to prefer key=[value], but I do not have a scientific reason for that: I just feel it is more "vivid" :P

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax I can dig the desire to have uniform style on log messages. I'll also point out that the logs are part of the public API, so we can't just go terraform them willy-nilly, but instead we'd have to change them only in scope of the relevant KIPs, which makes it difficult to change, or even establish, a log style.

Nevertheless, if we don't already have a clear style for streams logs, I'll advocate for some kind of enclosing delimiter on substitutions. I continue to agree that square brackets are confusing w.r.t. common List#toString() formats, so I think we should agree on a different enclosing delimiter.

I agree that = is better than nothing, but it's still ambiguous when the substitution is 0 or more whitespace characters, while [] vs [ ] gives you more of a clue. No choice here is going to be perfect, but my experience is that this format saves enough debugging time to be worth the visual noise.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't already have a clear style for streams logs

We never discussed this explicitly; it's just a matter of fact that we use key=value so far from what I can remember. The question is, how much we gain if we start to rewrite to a different format and how much work it it.

With regard to ambiguity: you can always construct an (academic?) example for which any formatting strategy "break" and is ambiguous... If we agree on key=[value] I am fine with it. Still not sure, if we gain much (but if you think we do, it's fine with me to change)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also prefer key=[value], but can't say it's for any specific reason other than personal preference.

@@ -77,10 +81,17 @@
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
deserializationException);
} else {
sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record();
log.warn(
"Skipping record due to deserialization error. topic=[{}] partition=[{}] offset=[{}]",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above.

@@ -107,6 +112,11 @@ int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {

// drop message if TS is invalid, i.e., negative
if (timestamp < 0) {
log.warn(
"Skipping record due to negative extracted timestamp. topic=[{}] partition=[{}] offset=[{}] extractedTimestamp=[{}] extractor=[{}]",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above.

* @param cache the {@link ThreadCache} created by the thread
* @param time the system {@link Time} of the thread
* @param producer the instance of {@link Producer} used to produce records
* @param id the ID of this task
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove space

@@ -695,14 +712,14 @@ public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
task.initializeTopology();
task.commit();
final File checkpointFile = new File(stateDirectory.directoryForTask(taskId00),
ProcessorStateManager.CHECKPOINT_FILE_NAME);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above.

@@ -199,8 +202,8 @@ public boolean conditionMet() {
assertEquals(thread.state(), StreamThread.State.DEAD);
}

private Cluster createCluster(final int numNodes) {
HashMap<Integer, Node> nodes = new HashMap<>();
private Cluster createCluster(@SuppressWarnings("SameParameterValue") final int numNodes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this slip? Or did you leave it intentionally?

new Node[0]));
}
});
final ArrayList<PartitionInfo> topic = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this slip?

Collections.singletonMap("client-id", "topology-test-driver")
);

skippedRecordsSensor.add(new Meter(new Sum(), rateMetricName, totalMetricName));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this implies, that we have to maintain the same code twice. Should we extract this into some internal method that we can call here to avoid code duplication?

What about other thread-level metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been mulling over the same thing, and that was part of what I was trying to achieve with my experiment before. I think I have a better solution now, so maybe you can take another look after my next update and see what you think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do, after you pushed an update :)

@@ -96,6 +96,7 @@ public Punctuator getPunctuator() {
return punctuator;
}

@SuppressWarnings("WeakerAccess")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own education: what is this? (btw: can we remove this below?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove this below.

The WeakerAccess inspection tells you that it's possible to restrict the access scope of cancel(). I think this particular case was warning me that cancel() could be package-private instead of public. But the static analyzer can only look at the code in the project. We know that we do want the method to be public, so I added a supression for this inspection. An alternative would be to write black-box tests in a different package (just like real user tests would be), and the static analyser wouldn't warn us anymore, since it would have an example of a usage requiring public access.

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good, just a couple of additional minor comments.

@@ -144,24 +143,24 @@ public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName,
* @throws IllegalArgumentException if tags is not constructed in key-value pairs
*/
@Override
public Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String... tags) {
public Sensor addThroughputSensor(final String scopeName, final String entityName, final String operationName, final Sensor.RecordingLevel recordingLevel, final String... tags) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I realize this was pre-existing in a single line, but since there are several parameters, maybe put each param on its own line.

mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], "I am not an integer.".getBytes()));
thread.runOnce(-1);
assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());
assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: what about assertTrue((Double)metrics.metric(skippedRateMetric).metricValue() > 0.0); however, I don't have a strong opinion in this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That also works, but assertNotEquals is a little nicer in that it'll print the actual value on failure, whereas assertTrue only tells you that it was false on failure. I suppose I could add a utility method assertGreater that prints the values on failure, but in this case, I'm really just making sure that the metric got moved. I don't care that much to assert what it got moved to, or I would override the time implementation and assert the exact expected value.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Overall it looks reasonable to me. I have only minor detailed comments, plus the meta comment about follow-up refactoring and the docs changes: 1) we should update upgrade-guide to warn users their monitoring needs to be updated, 2) in monitoring we'd better update the skipped records to list possible reasons it will be recorded.

@@ -195,6 +199,20 @@ public GlobalStreamThread(final ProcessorTopology topology,
this.cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
this.stateRestoreListener = stateRestoreListener;

skippedRecordsSensor = metrics.sensor("thread." + threadClientId + ".skipped-records");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm do not feel very comfortable to define the metrics name in scattered places, because it means whenever we'll update the name we have to remember to update all the places (for this sensor the other place we declared it is

skippedRecordsSensor = metrics.sensor(prefix + ".skipped-records", Sensor.RecordingLevel.INFO);
skippedRecordsSensor.add(createMeter(metrics, new Sum(), "skipped-records", "skipped records"));

So which line gets called first, it will create the sensor, while keeping the other just as an no-op.

), and that's why I liked @vvcephei 's proposal for wrapping the sensor names in the leveled metrics, and passing those metrics across different modules than re-declaring the sensors in different places.

This makes me feel more urgent to do the refactoring of the metrics hierarchy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm about to push a commit to put the skipped-records sensor in particular in a common place, since it winds up getting accessed from so many different places in the code. I'm hoping that will be good enough for now, and we can seek an elegant enclosing-scope metrics implementation in the future.

} else {
if (productionExceptionIsFatal(exception)) {
recordSendError(key, value, timestamp, topic, exception);
} else if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) {
recordSendError(key, value, timestamp, topic, exception);
} else {
log.debug(HANDLER_CONTINUED_MESSAGE, key, value, timestamp, topic, exception);
log.warn(
"Error sending records (key=[{}] value=[{}] timestamp=[{}]) to topic=[{}] and partition=[{}]; " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to prefer key=[value], but I do not have a scientific reason for that: I just feel it is more "vivid" :P

@@ -111,12 +112,14 @@ public StreamTask(final TaskId id,
final StateDirectory stateDirectory,
final ThreadCache cache,
final Time time,
final Producer<byte[], byte[]> producer) {
final Producer<byte[], byte[]> producer,
final Sensor skippedRecordsSensor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, if we could pass around the threadMetrics here, it will make the code more readable: we can make it very clear at which places we record some task metrics like taskMetrics.sensorA.record() and where do we record thread-level metrics like threadMetrics.skippedRecordsSensor.record().

But I think it is better to be left as a follow-up PR as this one is already pretty big.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I keep getting wrapped around the axle thinking about stuff like this. Hopefully, I'll be able to deliver a reasonable implementation for this PR, and I'll continue to mull about a way to pass the right metric context around the code base.

updatedTagList.add(scopeName + "-id");
updatedTagList.add(entityName);
return tagMap(updatedTagList.toArray(new String[updatedTagList.size()]));
final String[] updatedTags = Arrays.copyOf(tags, tags.length + 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice improvement

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another meta comment: in KStreamAggregate, KStreamReduce, when either the key or value is null, we also skipped the record, should we record this as well since in many places it is not an expected event?

@vvcephei
Copy link
Contributor Author

vvcephei commented Apr 9, 2018

@guozhangwang I'm currently working on adding metrics there. I'm also adding warning logs, as it's totally silent right now.

@vvcephei
Copy link
Contributor Author

vvcephei commented Apr 9, 2018

@guozhangwang and regarding this:

Thanks for the PR. Overall it looks reasonable to me. I have only minor detailed comments, plus the meta comment about follow-up refactoring and the docs changes: 1) we should update upgrade-guide to warn users their monitoring needs to be updated, 2) in monitoring we'd better update the skipped records to list possible reasons it will be recorded.

Ack. I might do that in a follow-up PR (under the same Jira/KIP) to keep the LOC in this PR lower.

@vvcephei
Copy link
Contributor Author

vvcephei commented Apr 9, 2018

@mjsax @bbejeck @guozhangwang

I've rebased and pushed the latest changes. I still need to add tests for the processors' metrics, but this change is otherwise pretty much where I want this PR to be.

Note that I rebased and put the change to Sensor at the beginning so that I can send that one change as a separate PR, if needed, for review by the client library people. This change is necessary, since the existing maybeAddMetric implementation in Streams is unsafe. To be done properly, it has to be synchronized, which my change does. My change also allows us to differentiate between registering a metric twice for a particular sensor (ok) and registering the same metric name on two different sensors (not ok).

Also, reminder that we should agree on a log style. I left [] in the PR as a placeholder. The discussion for this is above.

@vvcephei
Copy link
Contributor Author

Ok, @bbejeck @mjsax @guozhangwang ,

This PR is ready for another pass.

I have completed all code and tests. Docs will follow in another PR.

Please comment on:

  • my strategy for sharing the skipped metric around the code base
  • my changes to Sensor (and whether I need to send a separate PR for that change)
  • the aforementioned log enclosing delimiter discussion

Thanks, all.

@guozhangwang
Copy link
Contributor

retest this please

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments on the high-level approach. Will dig into details after we have agreed on the proposal itself.

KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), config == null ? this.config : config, time);
this.registry.registerMetric(metric);
this.metrics.add(metric);
final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), config == null ? this.config : config, time);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of keep track of the metric names? If it is for preventing double-registering, I think relying on maintaining the metrics name inside the sensor would not always work, since multiple sensors would be added into the metrics registry, and we still cannot prevent different sensors trying to register the same metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have pulled this change into a separate PR: #4853

The intent is to make it a no-op if you add the same metric to the same sensor twice, as opposed to the current behavior, in which the registry.registerMetric(metric) throws an exception if the metric is already registered.

With this change, you'll still get an exception if the metric is already registered in another sensor, but if it's already in the same sensor, you just get a no-op success.

import java.util.Map;

public class StreamsMetricsConventions {
private StreamsMetricsConventions() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need the non-arg constructors since it will be defined by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This privatizes the constructor, guaranteeing that the class cannot be instantiated. It's a way of enforcing that the class be used only for its static members.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do this properly, though, the class should also be final. I'll make that change.

return "thread." + threadName + "." + sensorName;
}

public static Map<String, String> threadLevelTags(final String threadName, final Map<String, String> tags) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No callers seem to provide any non-empty tags?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's laying the groundwork for a future change in callers can compose thread-level tags, task-level tags, etc.

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsConventions.threadLevelTags;

public class CommonStreamsMetrics {
private CommonStreamsMetrics() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same rationale.

}

public static Sensor skippedRecordsMeter(final Metrics metrics, final String threadName) {
final String sensorName = threadLevelSensorName(threadName, "skipped-records");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we always create the skipped record sensor upon creating the thread, then we should always get the sensor right? If that case, should we simply throw if the getSensor returns null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is used to idempotently create or retrieve the sensor. In other words, the mechanism by which we create the sensor when we create the thread is that it calls this method, and the sensor is null, so it creates it.

You're correct in that if we do that, then all other usages will just return the existing sensor.

I'm not sure I see the value in separating creation from retrieval so that we can throw an exception if you retrieve it without creating it first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, that makes sense.

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsConventions.threadLevelSensorName;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsConventions.threadLevelTags;

public class CommonStreamsMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a meta comment:

I think we have seen two approaches here:

  1. Pass along the metrics objects across different modules (in some classes, we will pass multiple metrics objects for different levels, like threadMetrics and taskMetrics) in order to record their sensors.

  2. In the current PR: only pass alone the metrics registry (i.e. the Metrics object) along different modules, but standardize the sensor name construction, and get the sensor by its raw name directly whenever necessary to record the sensor.

I am slightly in favor of the second one since we could pass long fewer parameters, i.e. only a single Metrics object which can be accessed 1) from ProcessorContext, 2) in multiple internal classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a detailed comment: what's the rantionale of naming it CommonStreamsMetrics? Is it for thread-level metrics only? I.e. should we just move this static function into ThreadMetrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm still undecided on whether approach 1 or 2 is better. But I did decide that I don't want to make a call on it in this PR. If you'd like me to resolve this sooner rather than later, I can follow up immediately with another PR to reorganize the metrics.

The reason I pulled skipped-records out into a CommonStreamsMetrics class is that that metric is common across all components in Streams. It's accessed by both our framework-level code and also by the user-space DSL-provided processors. Thus, it needs to live in a spot where all those components have visibility on it.

There's a distinction between metrics that are aggregated at the thread level and metrics that belong to StreamThread. It'll be difficult to really do a good job in this PR with that distinction, though, without refactoring the whole metrics hierarchy. Since we're already over 2,000 LOC, I'd really like to move such a refactoring to another PR.

What I have done in this PR is as minimal as I can manage to expose the skipped-records sensor to our Processors.

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall LGTM, just left some additional comments.

} else {
if (productionExceptionIsFatal(exception)) {
recordSendError(key, value, timestamp, topic, exception);
} else if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) {
recordSendError(key, value, timestamp, topic, exception);
} else {
log.debug(HANDLER_CONTINUED_MESSAGE, key, value, timestamp, topic, exception);
log.warn(
"Error sending records (key=[{}] value=[{}] timestamp=[{}]) to topic=[{}] and partition=[{}]; " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also prefer key=[value], but can't say it's for any specific reason other than personal preference.

assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[-1] offset=[-1]"));

assertEquals(1.0, MetricTestUtil.getMetricByName(driver.context().metrics().metrics(), "skipped-records-total").metricValue());
System.out.println("");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this line intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops. I put it there when I needed a place for a breakpoint. Sorry!

LogCaptureAppender.unregister(appender);

assertEquals(1.0, MetricTestUtil.getMetricByName(driver.context().metrics().metrics(), "skipped-records-total").metricValue());
assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[1] value=[null] topic=[streamTopic] partition=[-1] offset=[-1]"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hasItem matcher is new to me, nice one!


import java.util.Map;

public class MetricTestUtil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a separate class for this? We could add the getMetricByName method to StreamsTestUtils instead, additionally, it doesn't access anything package private so it should be fine to make the method public

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can move it to StreamTestUtils if you like.

I made getMetricByName package-private to prevent other code from calling it, since it's intended for these tests only. Making it public would open this method up to be called beyond the intended scope. I'd rather defer that until we have a use case we think calls for broadening the scope.

import java.util.LinkedList;
import java.util.List;

public class LogCaptureAppender extends AppenderSkeleton {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meant to say this before, nice addition!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It's predicated on us using log4j as the implementation for slf4j in the unit tests, so if that changes, we'll have to put in a different log appender. But that seems unlikely, and it's handy to have this in the mean time.

@vvcephei
Copy link
Contributor Author

Huh, that's a new one. It looks like (aside from the KafkaAdminTest continuing to flake out), the tests failed because the Jenkins worker ran out of disk! I'll wait until the last job completes before starting them again.

I've rebased this PR on trunk now that #4853 is merged. I still have a few nits to clean up. I'll notify again when I'm ready for final reviews.

@mjsax
Copy link
Member

mjsax commented Apr 12, 2018

Meta comment: please update the PR title with the JIRA number

@@ -26,6 +27,9 @@
*/
public interface InternalProcessorContext extends ProcessorContext {

@Override
StreamsMetricsImpl metrics();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Override to refine the type from StreamMetrics to StreamMetricsImpl in support of internal usages.

* @param cache the {@link ThreadCache} created by the thread
* @param time the system {@link Time} of the thread
* @param producer the instance of {@link Producer} used to produce records
*/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an internal class, and this javadoc doesn't say anything that the method signature doesn't say. I added a new constructor and changed the existing one, so I just removed the doc rather than updating it.

@@ -692,10 +697,4 @@ boolean commitNeeded() {
RecordCollector recordCollector() {
return recordCollector;
}

// visible for testing only
RecordCollector createRecordCollector(final LogContext logContext,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This existed only so a test could override it. Instead, I added a constructor arg for the test to pass and removed this method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

commitTimeSensor.add(metrics.metricName("commit-latency-avg", groupName, "The average commit time in ms", tags()), new Avg());
commitTimeSensor.add(metrics.metricName("commit-latency-max", groupName, "The maximum commit time in ms", tags()), new Max());
commitTimeSensor.add(metrics.metricName("commit-rate", groupName, "The average per-second number of commit calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
commitTimeSensor.add(metrics.metricName("commit-total", groupName, "The total number of commit calls", tags()), new Count());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored these to flatten the metric definition, since it was super hard to figure out what metrics were actually being created.

Maybe you can forgive me for this because I actually found a bug: The description of the total metrics say that it counts the number of calls, but it previously summed the recorded values. (was via createMeter -> new Meter -> new Total)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call!


public final Sensor skippedRecordsSensor() {
return skippedRecordsSensor;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defining and providing skippedRecordsSensor here now, since it's now needed in contexts where the implementation is not a StreamThreadMetricsImpl.

"The total number of occurrence of " + opName + " operations.",
tags
),
new Count()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the metrics in StreamThread, by getting rid of the Meter and flattening these metrics, I realized that the total computation was incorrectly a Total rather than a Count.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand this comment: it was indeed defined as a Count() before as well right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it was previously:

sensor.add(new Meter(new Count(), rateMetricName, totalMetricName));

But the Count there is only used for updating the rate. Here's the Metric constructor:

public Meter(SampledStat rateStat, MetricName rateMetricName, MetricName totalMetricName) {
        this(TimeUnit.SECONDS, rateStat, rateMetricName, totalMetricName);
}

public Meter(TimeUnit unit, SampledStat rateStat, MetricName rateMetricName, MetricName totalMetricName) {
        this.total = new Total();
        this.rate = new Rate(unit, rateStat);
        this.rateMetricName = rateMetricName;
        this.totalMetricName = totalMetricName;
}

You can see that it's using Total for the "total" metric, which I guess makes sense given the parameter name. But according to the description of our "total" metric, what we really wanted to do was keep a count of occurrences, which should be a Count stat.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. That makes sense.

I think it was not introducing any issue only because we only call that sensor.record() not sensor.record(n) so Count and Total are actually the same: record() is the same as record(1), but I agree that it should really be Count, to avoid any potential bugs.

hitRatioSensor.add(this.metrics.registry().metricName(opName + "-min", groupName,
"The minimum cache hit ratio.", metricTags), new Min());
hitRatioSensor.add(this.metrics.registry().metricName(opName + "-max", groupName,
"The maximum cache hit ratio.", metricTags), new Max());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These metrics never get removed. Is that ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, let's add a TODO marker and remove them in a follow-up PR: so we do not drag too long on this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, when I took another look, I found that hitRatioSensor does get removed, but its parent doesn't. Also neither sensors have scoped names, so every record() will actually update all hit ratio metrics for all caches.

That seems like a bigger deal, so I've already prepared a follow-up PR. I'll send it next once this one is merged.

@@ -125,6 +119,7 @@ public void testMetrics() {
final Map<String, String> metricTags = new LinkedHashMap<>();
metricTags.put("processor-node-id", node.name());
metricTags.put("task-id", context.taskId().toString());
metricTags.put("client-id", "mock");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required per the docs, but we previously only added it in production code paths. Now we add it in all code paths.

@@ -246,7 +247,7 @@ private KafkaMetric getMetric(final String nameFormat, final String descriptionF
String.format(nameFormat, "commit"),
"stream-task-metrics",
String.format(descriptionFormat, "commit"),
mkMap(mkEntry("task-id", taskId))
mkMap(mkEntry("task-id", taskId), mkEntry("client-id", "test"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required per the docs, but we previously only added it in production code paths. Now we add it in all code paths.

@@ -70,44 +60,40 @@ public void testRemoveSensor() {

@Test
public void testLatencyMetrics() {
final String groupName = "doesNotMatter";
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
final int defaultMetrics = streamsMetrics.metrics().size();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The skipped records metrics are now always present. Rather than updating the hard-coded value, I did this to make the test less brittle.

};
public void setUp() {
final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced the override-with-capture strategy in this test with just a regular StreamsMetrics and verifying the invocation by checking that the total metric is == 1.

"topology-test-driver-stream-metrics",
Collections.<String, String>emptyMap());
metrics = new Metrics();
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "topology-test-driver-virtual-thread");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"virtual" just in case people go looking for the actual thread in the thread dump. I also thought about using Thead.currentThread(), but it wouldn't necessarily be the same thread when the tests run.

@@ -194,7 +193,7 @@ public MockProcessorContext(final Properties config, final TaskId taskId, final
this.taskId = taskId;
this.config = streamsConfig;
this.stateDir = stateDir;
this.metrics = new StreamsMetricsImpl(new Metrics(), "mock-processor-context", Collections.<String, String>emptyMap());
this.metrics = new StreamsMetricsImpl(new Metrics(), "mock-processor-context-virtual-thread");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thinking regarding "virtual"

@vvcephei
Copy link
Contributor Author

ok, @guozhangwang @mjsax @bbejeck ,

I believe this is ready for final review. I've made a pass over it to make sure the diff is clean and to comment on the rationale of some of the choices.

The diff is still quite long, but it's mostly because of all the processors that now record skipped metrics and the corresponding tests.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except an comment regarding

Similar to the metrics in StreamThread, by getting rid of the Meter and flattening these metrics, I realized that the total computation was incorrectly a Total rather than a Count.

@@ -216,7 +230,7 @@ public void maybeAddMetric(final Sensor sensor, final MetricName name, final Mea
* @param action Action to run.
* @param sensor Sensor to record value.
*/
void measureLatencyNs(final Time time, final Runnable action, final Sensor sensor) {
public void measureLatencyNs(final Time time, final Runnable action, final Sensor sensor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this need to be public now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I moved StreamsMetricsImpl to the ...internal.metrics package, but I took a look, and it was only used (properly) by ProcessorNode.

StreamTask also used it, but only by specifically wrapping the operation in a Runnable and passing it to the metric to immediately be run.

I've added 18358f3 to simplify StreamTask's usage to not need this method, and move the method to ProcessorNode.

hitRatioSensor.add(this.metrics.registry().metricName(opName + "-min", groupName,
"The minimum cache hit ratio.", metricTags), new Min());
hitRatioSensor.add(this.metrics.registry().metricName(opName + "-max", groupName,
"The maximum cache hit ratio.", metricTags), new Max());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, let's add a TODO marker and remove them in a follow-up PR: so we do not drag too long on this one.

@@ -36,7 +36,7 @@
private final TaskId taskId;
private final String applicationId;
private final StreamsConfig config;
private final StreamsMetrics metrics;
private final StreamsMetricsImpl metrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

this.producer = producer;
this.offsets = new HashMap<>();
this.logPrefix = String.format("task [%s] ", streamTaskId);
this.log = logContext.logger(getClass());
this.productionExceptionHandler = productionExceptionHandler;
this.skippedRecordsSensor = skippedRecordsSensor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my previous comment was a bit misleading :P Actually I'm not against the CommonStreamsMetrics and ThreadMetricsConventions classes, but I think we could have one such class for each different layer than having a common class, for the reason I mentioned before. But since you have removed it I'm also fine with passing along the sensors as well.

We can consider which one is better in the near future and if we can do another code refactoring, but let's not block on this PR for too long.

@@ -692,10 +697,4 @@ boolean commitNeeded() {
RecordCollector recordCollector() {
return recordCollector;
}

// visible for testing only
RecordCollector createRecordCollector(final LogContext logContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

commitTimeSensor.add(metrics.metricName("commit-latency-avg", groupName, "The average commit time in ms", tags()), new Avg());
commitTimeSensor.add(metrics.metricName("commit-latency-max", groupName, "The maximum commit time in ms", tags()), new Max());
commitTimeSensor.add(metrics.metricName("commit-rate", groupName, "The average per-second number of commit calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
commitTimeSensor.add(metrics.metricName("commit-total", groupName, "The total number of commit calls", tags()), new Count());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call!

private StreamsMetricsConventions() {
}

public static String threadLevelSensorName(final String threadName, final String sensorName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool.

@@ -577,8 +555,7 @@ void removeAllSensors() {
removeSensor(punctuateTimeSensor);
removeSensor(taskCreatedSensor);
removeSensor(tasksClosedSensor);
removeSensor(skippedRecordsSensor);

removeSensor(skippedRecordsSensor());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. why we create the sensor in StreamsMetricsmpl while removing it in StreamsMetricsThreadImpl? It seems a bit inconsistency.. could we still create in StreamsMetricsThreadImpl, and let StreamsMetricsImpl to get a hold of the sensor object assuming it is already created then (i.e. set sensor == null in constructor, and in skippedRecordsSensor(): if (sensor == null) try get it from the metrics)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that, but it wound up making things messier than I expected (because there are other callers to StreamsMetricsImpl, and because it makes the ownership of this sensor ambiguous).

Instead, I added 36c065f, which gives SMI the ability to remove its own sensors, and then I called to it from the two places (StreamThread and GlobalStreamThread) that actually need to unload metrics when they shut down.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not completely get your explanation re: because there are other callers to StreamsMetricsImpl, and because it makes the ownership of this sensor ambiguous. Could you elaborate a bit more?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry; I misread your suggestion. I thought you wanted the StreamsMetricsImpl to take the sensor as a constructor argument.

Aside from the StreamThread (via StreamThreadMetricsImpl), several other classes directly invoke the StreamsMetricsImpl constructor and thus obtain a StreamsMetricsImpl that is not a StreamThreadMetricsImpl. Namely, GlobalStreamThread, MockProcessorContext, and TopologyTestDriver.

When the code paths downstream of these points need to record a skipped record, they will get a null sensor back. It wouldn't be possible to get it from the Metrics registry at that point, though, because the skipped records sensor is scoped by thread (or "virtual" thread for the test-utils), and the sensor would never have been created for GlobalStreamThread, MockProcessorContext, or TopologyTestDriver. So the only way to get it at that point is to have either the caller of the SMI constructor or the SMI itself create the sensor (either at construction or at call-time).

The previous implementation with the public static getter was effectively saying that the one who wants it in a particular context first creates it, but it's problematic because no-one owns it. And indeed, in my implementation, the sensor never got destroyed. In practice I think it's not a huge deal because I'm sure it's rare for a streams app to shut down and start up again with the same Metrics registry, and I think the threads live as long as the app. But still, we have this model of unloading metrics when they're out of scope, and I think it's a good one.

So that brings us to the current implementation. In the current implementation, the skipped records metric is clearly owned by the StreamsMetricsImpl, which it may as well be, since that is the smallest scope in which it's needed. It's the first metric to be owned by SMI, so I had to create a removeAll method, and make sure it's invoked in the right places. But that seems appropriate; every other scope that owns metrics has such a method.

"The total number of occurrence of " + opName + " operations.",
tags
),
new Count()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand this comment: it was indeed defined as a Count() before as well right?

@vvcephei
Copy link
Contributor Author

@guozhangwang Thanks for the review!

I added 18358f3 and 36c065f in response to your comments. I also have a next PR queued up for after this one is merged (in response to our concerns about NamedCacheMetrics).

Please let me know what you think!
-John

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks good, just have a few comments.

@@ -106,19 +106,19 @@ public void addChild(final ProcessorNode<?, ?> child) {
childByName.put(child.name, child);
}

public void init(final ProcessorContext context) {
public void init(final InternalProcessorContext context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Just thinking if this change is necessary as NodeMetrics is an internal class so a cast here from ProcessorContext to InternalProcessorContext should not be a big deal and keeps ProcessorNode more generic. EDIT: NM read a comment below about using type system and I agree.


private final static String LOG_MESSAGE = "Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
"No more records will be sent and no more offsets will be recorded for this task.";
private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (key %s value %s timestamp %d) to topic %s due to %s";
private final static String PARAMETER_HINT = "\nYou can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.";
private final static String HANDLER_CONTINUED_MESSAGE = "Error sending records (key {} value {} timestamp {}) to topic {} due to {}; " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just used in one spot. It's easier to read the code if the log message is located at the spot where it gets logged rather than at the top of the file.

In earlier versions of Java, code like this was beneficial for performance, since the string is an object that can just get allocated and instantiated once statically, rather than dynamically on every invocation. But nowadays, the compiler and JIT compiler are smarter than that, so there really no benefit to coding this way, and you still pay the comprehensibility cost of having to follow the indirection.

I didn't want to make it a "thing", though, so I only inlined the message I needed to change.

@@ -60,6 +65,35 @@ public void setUp() {
stateDir = TestUtils.tempDirectory("kafka-test");
}

@Test
public void shouldLogAndMeterOnSkippedRecords() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shouldLogAndMeterOnSkippedRecords -> shouldLogAndMeterOnSkippedRecordsWithNullValue ?


private final Sensor commitTimeSensor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 503 the JavaDoc should change as the constructor for StreamsMetricsImpl only takes Metrics and a String parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good eye. I might just ditch the Javadoc, since it's an internal class and its function is pretty obvious.

private final Map<Sensor, Sensor> parentSensors;
private final Stack<String> ownedSensors = new Stack<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe consider replacing Stack with Deque as Stack is synchronized and ownedSensors only adds in the constructor and removes values in synchronized block already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh, good call. I will do that and probably avoid using Stack again.

@bbejeck
Copy link
Contributor

bbejeck commented Apr 20, 2018

retest this please

@vvcephei
Copy link
Contributor Author

Addressed Bill's comments and rebased.

@vvcephei vvcephei deleted the kip-274-streams-skip-metrics branch April 23, 2018 19:46
jeqo pushed a commit to jeqo/kafka that referenced this pull request May 2, 2018
* unify skipped records metering
* log warnings when things get skipped
* tighten up metrics usage a bit

### Testing strategy:
Unit testing of the metrics and the logs should be sufficient.

Author: John Roesler <john@confluent.io>

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes apache#4812 from vvcephei/kip-274-streams-skip-metrics
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
* unify skipped records metering
* log warnings when things get skipped
* tighten up metrics usage a bit

### Testing strategy:
Unit testing of the metrics and the logs should be sufficient.

Author: John Roesler <john@confluent.io>

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes apache#4812 from vvcephei/kip-274-streams-skip-metrics
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants