New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6820: Refactor Stream Metrics #6498
KAFKA-6820: Refactor Stream Metrics #6498
Conversation
…factor-thread-metrics
…factor-thread-metrics
|
||
@SuppressWarnings("unchecked") | ||
@Override | ||
public void init(final ProcessorContext context) { | ||
super.init(context); | ||
metrics = (StreamsMetricsImpl) context.metrics(); | ||
|
||
skippedRecordSensor = ((InternalProcessorContext) context).currentNode().nodeMetrics().skippedRecordsRateSensor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to access the skipped-records sensor as process-node level now, and since processor cannot access processor-node which wraps it, we need to get it from InternalProcessorContext (ditto elsewhere). A minor impact is for unit tests which uses ProcessorContext that does not extend InternalProcessorContext -- I've only found one unit test needed for updates because of this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -75,22 +74,21 @@ public void enableSendingOldValues() { | |||
sendOldValues = true; | |||
} | |||
|
|||
private class KStreamSessionWindowAggregateProcessor extends AbstractProcessor<K, V> { | |||
public class KStreamSessionWindowAggregateProcessor extends AbstractProcessor<K, V> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class need to be accessed in the instanceof
condition for lazy creation of sensors at NodeMetrics, ditto elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this maybe left over from a previous iteration? I couldn't find the condition in NodeMetrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's in ProcessorNodeMetrics
.
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG; | ||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP; | ||
|
||
public class Sensors { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We moved the sensor creation protocol into the corresponding NodeMetrics. Ditto for another Sensors
class as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -69,7 +72,8 @@ public GlobalStateUpdateTask(final ProcessorTopology topology, | |||
source, | |||
deserializationExceptionHandler, | |||
logContext, | |||
processorContext.metrics().skippedRecordsSensor() | |||
// task-id would be "-1_-1" | |||
processorContext.metrics().taskLevelSensor(SKIPPED_RECORDS, processorContext.taskId().toString(), Sensor.RecordingLevel.INFO) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a task-id for the global-state-update-task for its skipped-records metrics.
if (processor != null) { | ||
processor.init(context); | ||
} | ||
nodeMetrics.nodeCreationSensor.record(time.nanoseconds() - startNs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This metric is removed as proposed. Ditto elsewhere.
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); | ||
join.process(null, new Change<>("new", "old")); | ||
LogCaptureAppender.unregister(appender); | ||
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is needed to use InternalProcessorContext. Ditto below.
context.setCurrentNode(new ProcessorNode("testNode")); | ||
final ProcessorNode processorNode = new ProcessorNode(TEST_NODE); | ||
context.setCurrentNode(processorNode); | ||
processorNode.init(context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to initialize the processor node in order to create the NodeMetrics so that it can be used for adding sensors later in the processor class, ditto in a few other places.
"The average number of occurrence of " + throughputOperation + " operation per second.", | ||
metricTags))); | ||
|
||
final JmxReporter reporter = new JmxReporter("kafka.streams"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the part to test JmxReporter since it should be covered in JmxReporterTest
, so removing them to cleanup some overlapped overage. Ditto elsewhere.
} | ||
|
||
@Test | ||
public void testMutiLevelSensorRemoval() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not have by-default parent sensor now, and the rest of special parent sensors are covered in StreamMetricsIntegrationTest
already.
private final Properties props = StreamsTestUtils.getStreamsConfig(); | ||
private final StreamsConfig config = new StreamsConfig(props); | ||
private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( | ||
new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to enable DEBUG reporting level to test some lower-level sensors now. Ditto elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @guozhangwang ,
Thanks for tackling this! I made a quick pass and left some thoughts. Overall, I think this is a really good approach.
* Note that you can add more metrics to this sensor after created it, which can then be updated upon {@link Sensor#record(double)} calls; | ||
* but additional user-customized metrics will not be managed by {@link StreamsMetrics}. | ||
* | ||
* @param scopeName name of the scope, which will be used as part of the metrics type, e.g.: "stream-[scope]-metrics". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for documenting how the arguments are used in the metrics
|
||
@SuppressWarnings("unchecked") | ||
@Override | ||
public void init(final ProcessorContext context) { | ||
super.init(context); | ||
metrics = (StreamsMetricsImpl) context.metrics(); | ||
|
||
skippedRecordSensor = ((InternalProcessorContext) context).currentNode().nodeMetrics().skippedRecordsRateSensor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -75,22 +74,21 @@ public void enableSendingOldValues() { | |||
sendOldValues = true; | |||
} | |||
|
|||
private class KStreamSessionWindowAggregateProcessor extends AbstractProcessor<K, V> { | |||
public class KStreamSessionWindowAggregateProcessor extends AbstractProcessor<K, V> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this maybe left over from a previous iteration? I couldn't find the condition in NodeMetrics.
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG; | ||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP; | ||
|
||
public class Sensors { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
public Sensor skippedRecordsRateSensor() { | ||
if (skippedRecordsRateSensor == null) { | ||
// keep the task-level parent sensor | ||
final Sensor taskLevelSensor = metrics.taskLevelSensor(SKIPPED_RECORDS, taskName, Sensor.RecordingLevel.INFO); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a responsibility to remove this sensor on clear as well? Or does it get removed elsewhere? (I haven't looked)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it will be cleared at the task-level TaskMetrics
.
More specifically, at this place the task sensor should always be created already, and hence to be on the safer side, we should to something like getButNotCreate
apis.
new CumulativeCount() | ||
); | ||
// we have to separate the latency-max/avg from the rate/total since the latter would be recorded at child node-level | ||
processLatencySensor = metrics.taskLevelSensor(PROCESS_LATENCY, taskName, Sensor.RecordingLevel.DEBUG); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -361,7 +351,13 @@ public boolean process() { | |||
log.trace("Start processing one record [{}]", record); | |||
|
|||
updateProcessorContext(record, currNode); | |||
currNode.process(record.key(), record.value()); | |||
|
|||
StreamsMetricsImpl.maybeMeasureLatency( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
commitRequested = false; | ||
}, | ||
time, | ||
taskMetrics.commitLatencySensor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This long lambda has me wondering if it would be nicer to make the lambda the last argument for readability.
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ID_SUFFIX; | ||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; | ||
|
||
public class StoreMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Hey @guozhangwang , I was talking about time semantics in another thread with @mjsax , and we were thinking it might make sense to piggy-back one other thing on this. Right now, we make a distinction between "dropped" and "skipped" records in the metrics. "Skipped" records are invalid records (e.g., null key, or a serialization error), whereas "dropped" records are valid records that we drop because they arrive after their window is closed, in windowed aggregations. But this distinction is subtle. What we think matters to an operator is two questions: How many records got dropped/skipped, and for what reasons? Should we pick one of the terms and standardize on it, like:
WDYT? |
I had the same feeling "dropped" and "skipped" this when doing the refactoring effort. I agree that having a consistent terminology is better, while we still make it doable for those who want to have finer granularity information on the causes etc. There's a not-so-subtle difference between some of those scenarios though: for example, in deserialize-error, the record is dropped at the very beginning of the topology without being processed at all, whereas for null-key / late records, it may have been traversed half-way into the topology and hence resulted some side effects on the state before being dropped. Hence as you see for current I'm thinking maybe we can still distinguish the task-level Part of the reason is another similar metric is for windowed stores, where we have an |
Thanks, @guozhangwang . Your reasoning sounds good to me. Of course, we should document this distinction alongside the metrics so that people can actually realize the benefit of being able to reason about drops/skips this way (and so I can remember it in 6 months, as well). |
Closing this PR as it has been addressed by @cadonna |
1.a Each XXMetrics object will keep track of all the sensors it has ever created, and a
clear
function which must be called when the corresponding object (a thread, a task, etc) is closed.1.b From anywhere inside the code base, as long as the class can access the StreamsMetricsImpl class it can use the util functions to get the sensor it wants to record from the
metrics
registry following the protocol. Only when it cannot access StreamsMetricsImpl the sensor need to be pre-accessed and passed through the constructor.1.c De-couple the original thread-level sensors as
StreamThreadMetrics
, extracted from the StreamsMetricsImpl class -- hence the latter can become a pure util functions provider and not keep track of any sensors.Make metric / group / tag names constant strings, residing either in each level's metrics class if they belong to specific sensors, and in StreamsMetricsImpl if they are util strings like suffix / prefix.
Change public StreamsMetrics interface to be more intuitive for users (KAFKA-6820). And make all internal usages to leverage on the provided util functions of StreamsMetricsImpl as well.
Remove default parent-sensors from everywhere.
TEST: use
getMetricByName
andgetMetricByNameAndTags
from the utils class across all unit test classes.MINOR: because of 1),
MockStreamsMetrics
is not needed any more and can be replaced with StreamsMetricsImpl now since the latter is now a stateless util layer (tech debt KAFKA-5676).NOTE one thing's missing here is the compatibility path with old metrics, which will be done in a follow-up PR.
Committer Checklist (excluded from commit message)