Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8580: Compute RocksDB metrics #7263

Merged
merged 7 commits into from Sep 24, 2019

Conversation

cadonna
Copy link
Contributor

@cadonna cadonna commented Aug 28, 2019

A metric recorder runs in it own thread and regularly records RocksDB metrics from
RocksDB's statistics. For segmented state stores the metrics are aggregated over the
segments.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@@ -209,7 +210,8 @@ private void setUpMetrics(final ProcessorContext context, final Map<String, Obje
(StreamsMetricsImpl) context.metrics(),
context.taskId()
);
removeStatisticsFromMetricsRecorder = true;
metricsRecorder.startRecording(Duration.ofMinutes(10));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I set RocksDB's standard interval for its own regular statistics dump. If anybody has a better idea please speak up. In future, we could provide a user configuration to set this.

@@ -461,19 +461,13 @@ private void checkRocksDBMetricsByTag(final String tag) {
checkMetricByName(listMetricStore, MEMTABLE_BYTES_FLUSHED_RATE, 1);
checkMetricByName(listMetricStore, MEMTABLE_BYTES_FLUSHED_TOTAL, 1);
checkMetricByName(listMetricStore, MEMTABLE_HIT_RATIO, 1);
checkMetricByName(listMetricStore, MEMTABLE_FLUSH_TIME_AVG, 1);
checkMetricByName(listMetricStore, MEMTABLE_FLUSH_TIME_MIN, 1);
checkMetricByName(listMetricStore, MEMTABLE_FLUSH_TIME_MAX, 1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I could not compute all metrics proposed in KIP-471 because some require functionality of RocksDB 6+ and Streams currently uses 5.18.

checkMetricByName(listMetricStore, WRITE_STALL_DURATION_AVG, 1);
checkMetricByName(listMetricStore, WRITE_STALL_DURATION_TOTAL, 1);
checkMetricByName(listMetricStore, BLOCK_CACHE_DATA_HIT_RATIO, 1);
checkMetricByName(listMetricStore, BLOCK_CACHE_INDEX_HIT_RATIO, 1);
checkMetricByName(listMetricStore, BLOCK_CACHE_FILTER_HIT_RATIO, 1);
checkMetricByName(listMetricStore, BYTES_READ_DURING_COMPACTION_RATE, 1);
checkMetricByName(listMetricStore, BYTES_WRITTEN_DURING_COMPACTION_RATE, 1);
checkMetricByName(listMetricStore, COMPACTION_TIME_AVG, 1);
checkMetricByName(listMetricStore, COMPACTION_TIME_MIN, 1);
checkMetricByName(listMetricStore, COMPACTION_TIME_MAX, 1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

@cadonna
Copy link
Contributor Author

cadonna commented Aug 28, 2019

@cadonna
Copy link
Contributor Author

cadonna commented Aug 29, 2019

For JDK 11/Scala 2.12 the following tests failed:

kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition
kafka.api.SaslGssapiSslEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

The other two builds exceeded the timeout.

Retest this, please

@cadonna
Copy link
Contributor Author

cadonna commented Aug 30, 2019

In JDK8/Scala 2.11 the following tests failed:

kafka.network.DynamicConnectionQuotaTest.testDynamicConnectionQuota

@cadonna
Copy link
Contributor Author

cadonna commented Aug 30, 2019

Retest this, please

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna thanks for your PR, I have a meta comment plus some minor ones on the non-testing code.

ping @mjsax @vvcephei for another look.

this.recordingInterval = recordingInterval;
if (!recordingAlreadyStarted) {
final Thread thread = new Thread(this::recordLoop);
thread.setName(storeName.replace(" ", "-") + "-RocksDB-metrics-recorder");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we want to replace spaces in thread name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overthought it. Will remove the replacement.

@@ -471,7 +473,8 @@ private void closeOpenIterators() {
private void closeOrUpdateMetricsRecorder() {
if (closeMetricsRecorder) {
metricsRecorder.close();
} else if (removeStatisticsFromMetricsRecorder) {
metricsRecorderIsRunning = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic of closeMetricsRecorder and metricsRecorderIsRunning is a bit awkward here since the logic of RocksDBMetricsRecorder#removeStatistics and #close are actually the same except stopRecording. How about simplifying it to the following:

  1. Just have a hard-coded recording interval inside RocksDBMetricsRecorder instead of letting caller pass in the value. Also maintaining reference to the background thread but not starting it at construction time.

  2. In addStatistics, if the map is no longer empty afterwards (in practice it should be), starts the thread if it has not been started.

  3. In removeStatistics, if the map becomes empty afterwards, stop the thread.

Then in the caller, we only need addStatistics / removeStatics and do not need the flags for closing any more, and the logic of this function can be simplified also.

For segmented store, we need to make sure that if the last segment was indeed removed, it would not try to create a new one, which means that at least two segments are maintained at the same time which I think can be guaranteed programmatically. To be safe, we can also maintain a flag or a enum state of the recorder of created -> (addStatistics) -> started -> (removeStatistics) stopped, and if addStatistics was called after it has transit to stopped throw an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is really a good point. I changed the design of the recorder. Now it starts a thread when the first Statistics object is added, then it stops the thread when the last Statistics object is removed and it starts a new thread when a Statistics object is re-added. This was necessary since a RocksDB instance may be opened and closed a couple of times during its life cycle due to restoration.

log.debug("Removed Statistics for store segment {}", storeName);
}

public void startRecording(final Duration recordingInterval) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If startRecording is called multiple times with different recording intervals, we would change the sleeping intervals, is that intentional? Or should we just stick with the first passed in value?

compactionTimeAvgSensor = RocksDBMetrics.compactionTimeAvgSensor(streamsMetrics, metricContext);
compactionTimeMinSensor = RocksDBMetrics.compactionTimeMinSensor(streamsMetrics, metricContext);
compactionTimeMaxSensor = RocksDBMetrics.compactionTimeMaxSensor(streamsMetrics, metricContext);
bytesReadDuringCompactionSensor = RocksDBMetrics.bytesReadDuringCompactionSensor(streamsMetrics, metricContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some metrics are removed here, is that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it was intentional. Unfortunately, some metrics require functionality of RocksDB 6+ and Streams currently uses 5.18.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there are metrics we would like to add but can't until we upgrade RocksDB? Can we create a 3.0 blocker ticket to add them back in when we bump rocks (and/or maybe a separate ticket to consider a major version bump of rocks with the next major version bump of kafka)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman I created ticket https://issues.apache.org/jira/browse/KAFKA-8897
I did not specify it as a blocker for 3.0 because a major version increase of Streams is not required for a major version increase of RocksDB. If I miss something regarding this, please feel free to comment on the ticket and change it to a blocker.

for (final Statistics statistics : statisticsToRecord.values()) {
statistics.close();
}
statisticsToRecord.clear();
log.debug("Closed", storeName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check for the thread has eventually joined, say when the streams instance is closed? With 10min we should not block on waiting, but also we should make sure we are not accidentally leaking threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the last Statistics object is removed the thread is interrupted, stopped, and the removing thread waits for the thread to die. That should ensure no leak. Please review that code with special care.

@bbejeck bbejeck added the streams label Sep 3, 2019
@cadonna cadonna force-pushed the AK8580-Compute_RocksDB_metrics branch from a024d01 to 62c57ee Compare September 5, 2019 21:24
compactionTimeAvgSensor = RocksDBMetrics.compactionTimeAvgSensor(streamsMetrics, metricContext);
compactionTimeMinSensor = RocksDBMetrics.compactionTimeMinSensor(streamsMetrics, metricContext);
compactionTimeMaxSensor = RocksDBMetrics.compactionTimeMaxSensor(streamsMetrics, metricContext);
bytesReadDuringCompactionSensor = RocksDBMetrics.bytesReadDuringCompactionSensor(streamsMetrics, metricContext);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it was intentional. Unfortunately, some metrics require functionality of RocksDB 6+ and Streams currently uses 5.18.

this.recordingInterval = recordingInterval;
if (!recordingAlreadyStarted) {
final Thread thread = new Thread(this::recordLoop);
thread.setName(storeName.replace(" ", "-") + "-RocksDB-metrics-recorder");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overthought it. Will remove the replacement.

@@ -471,7 +473,8 @@ private void closeOpenIterators() {
private void closeOrUpdateMetricsRecorder() {
if (closeMetricsRecorder) {
metricsRecorder.close();
} else if (removeStatisticsFromMetricsRecorder) {
metricsRecorderIsRunning = false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is really a good point. I changed the design of the recorder. Now it starts a thread when the first Statistics object is added, then it stops the thread when the last Statistics object is removed and it starts a new thread when a Statistics object is re-added. This was necessary since a RocksDB instance may be opened and closed a couple of times during its life cycle due to restoration.

for (final Statistics statistics : statisticsToRecord.values()) {
statistics.close();
}
statisticsToRecord.clear();
log.debug("Closed", storeName);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the last Statistics object is removed the thread is interrupted, stopped, and the removing thread waits for the thread to die. That should ensure no leak. Please review that code with special care.

bytesReadFromDatabase += statistics.getTickerCount(TickerType.BYTES_READ);
memtableBytesFlushed += statistics.getTickerCount(TickerType.FLUSH_WRITE_BYTES);
memtableHits += statistics.getAndResetTickerCount(TickerType.MEMTABLE_HIT);
memtableMisses += statistics.getAndResetTickerCount(TickerType.MEMTABLE_MISS);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, hit ratios consists only of the current value measured in the recording interval. Should we change this to AVG, MIN, and MAX?

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made another pass, all are nit comments. @vvcephei could you take a look at this PR also?

private final String metricsScope;
private final String storeName;

private enum State { NEW, RUNNING, NOT_RUNNING, ERROR, MANUAL }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we can just merge NEW into NOT_RUNNING? I.e. the initialized state is just NOT_RUNNING.

return state == State.RUNNING;
}

public boolean error() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is only used for testing?

}

// visible for testing
RocksDBMetricsRecorder(final String metricsScope, final String storeName, final boolean startRecordingThread) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the last parameter seems always true in non-testing code. I'm wondering if we can add a couple test-only functions (e.g. the following error() seems test only also), like a setManual which would then force the state to be MANUAL instead of setting it at the constructor.

Then we can remove the startRecordingThread flag, and in tests after we've created the recorder object call setManual so that later add/removeStatistics would not start or stop recording.

if (state == State.RUNNING && statisticsToRecord.isEmpty()) {
state = State.NOT_RUNNING;
thread.interrupt();
waitForThreadToDie();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One caveat is that when we are closing the Kafka Streams instance with a specified timeout value, this function may violate that timeout and wait for longer time since we call thread.join() without a timeout value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, that add/removeStatistics do not need to be thread-safe since they would only be called within a single thread at a given time right?

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @cadonna ,

Thanks for the PR!

I'm curious about the threading model. Is there an advantage to having one thread per store? It seems like we could also just have one thread per Streams instance and have it iterate over all the RocksDBs. I'm not sure, but it seems like the management might be a little simpler, not to mention avoiding all those extra threads for an amount of work that is really quite small.

WDYT?

}
boolean wait = true;
log.debug("Wait for recording thread to die");
while (wait) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems suspicious...

  • Shouldn't all blocking operations have timeouts?
  • Should we be swallowing and ignoring InterruptedExceptions?

It seems like a recipe for Streams to hang forever un-killably. But I feel like I'm missing something.

for (final Statistics statistics : statisticsToRecord.values()) {
statistics.close();
bytesWrittenToDatabase += statistics.getTickerCount(TickerType.BYTES_WRITTEN);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some of these don't reset the ticker, but they all use +=. Is this right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, Thanks! I totally overlooked that. Now all ticker counts are reset after each recording except for open files and file errors. Open files and file errors are recorded as values, i.e., no aggregation is applied on them on Kafka's metrics side. All other metrics apply some aggregation, thus we have to record only the delta at each recording.

@cadonna cadonna force-pushed the AK8580-Compute_RocksDB_metrics branch from 62c57ee to 4987afa Compare September 23, 2019 13:33
Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @cadonna , this LGTM, mod a few minor thoughts you can take or leave.

Thanks!
-John

rocksDBMetricsRecordingTriggerThread.scheduleAtFixedRate(
rocksDBMetricsRecordingTrigger,
Duration.ZERO.toMinutes(),
Duration.ofMinutes(1).toMinutes(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit): not sure if this is any clearer that 1 (and 0) above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I stored the values for delay and interval in variables named recordingDelay and recordingInterval, respectively.

private void maybeRemoveStatisticsFromMetricsRecorder() {
if (isRecordingLevelDebug) {
metricsRecorder.removeStatistics(name);
isRecordingLevelDebug = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a little mis-named, since the recording level doesn't actually change when we remove the stats. Maybe something like isStatsRegistered?

On the other hand, would it be possible to just idempotently "remove" the stats even if they weren't registered? I.e., just no-op in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be possible to idempotently remove the stats and I also thought about it. However, I have IllegalStateExceptions in place in RocksDBMetricsRecorder and RocksDBMetricsRecordingTrigger that ensure that the contract is satisfied to only remove stats that where added before. I thought, that would improve the correctness of the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be safe to call remove here multiple times, the important thing is just to make sure it's actually at least once

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But @cadonna , will this remove the metrics whenever any store on the instance is closed? That seems to make sense with the current eager rebalancing, but with KIP-429 we will only close the ones that are migrated to another consumer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be able to remove just a few specific metrics without disrupting the others, while also making sure to actually close/cleanup during an actual shutdown

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After closer look, I don't think there's a conflict between 429 and how the metrics are currently closed 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics of a store within a task are removed from the streams metrics whenever the store of this task is closed. The metrics of the stores that are still open remain. That should be fine with KIP-429.

this.streamsMetrics = streamsMetrics;
isInitialized = true;
}
if (this.taskId != taskId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this parity check. 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

bytesWrittenDuringCompaction += statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES);
bytesReadDuringCompaction += statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES);
numberOfOpenFiles += statistics.getTickerCount(TickerType.NO_FILE_OPENS)
- statistics.getTickerCount(TickerType.NO_FILE_CLOSES);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully, there's no danger of these counters overflowing, but regardless, I think the math would still work out if we did reset them here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if rocksDB opened 10 files and closed 0 files during a recording interval. The metrics would record 10. Then the counters are reset to 0. In the next recording interval 0 files are opened and 10 are closed. The metric would record -10. However, the metrics is defined as a Value(), i.e., the value is recorded as a plain number. Your proposal would work, if the metric were defined as CumulativeSum(). I will try to do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right. I was thinking that the numberOfOpenFiles variable was a field (i.e., persistent).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics are now reset after each read and accumulated with CumulativeSum().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made another pass, only have nit ones left and I think after that it's good to merge.

@@ -380,7 +379,7 @@ public static Sensor numberOfOpenFilesSensor(final StreamsMetricsImpl streamsMet
public static Sensor numberOfFileErrorsSensor(final StreamsMetricsImpl streamsMetrics,
final RocksDBMetricContext metricContext) {
final Sensor sensor = createSensor(streamsMetrics, metricContext, NUMBER_OF_FILE_ERRORS);
addSumMetricToSensor(
addValueMetricToSensor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually should the previous sensors like bytesReadDuringCompactionSensor not be a addRateOfSumMetricToSensor metric than a rate? My understanding is that the values read from rocksDB is already sum, so we probably only cares about avg / max?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The statistics ticker that we read from RocksDB is a monotonically increasing sum of bytes that we reset after each read. I chose a rate because it should give an impression about the IO that is produced by compactions.


public RocksDBMetricsRecorder(final String metricsScope, final String storeName) {
this.metricsScope = metricsScope;
this.storeName = storeName;
final LogContext logContext = new LogContext(String.format("[RocksDB Metrics Recorder for %s] ", storeName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit confusing to have two storeNames variable, the first one is actually for the logical storeName (which would be the same to the physical store name if it is kv-store) and the second is for the physical storeName (e.g. for segmented store its name is storeName-segmentID).

Maybe rename the second to rocksDBName?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming is a good idea. I renamed the physical storeName to segmentName.

@Test
public void shouldSetStatsLevelToExceptDetailedTimers() {
public void shouldSetStatsLevelToExceptDetailedTimersWhenStatisticsIsAdded() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to say I really like the way this unit test is written! With the right usage of mocks we would avoid having any time-dependent flakiness.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you

statisticsToRecord.put(storeName, statistics);
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra linebreak?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@@ -665,6 +671,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
reporters.add(new JmxReporter(JMX_PREFIX));
metrics = new Metrics(metricConfig, reporters, time);


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra linebreak?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

private void maybeRemoveStatisticsFromMetricsRecorder() {
if (isRecordingLevelDebug) {
metricsRecorder.removeStatistics(name);
isRecordingLevelDebug = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be safe to call remove here multiple times, the important thing is just to make sure it's actually at least once

A metric recorder runs in it own thread and regularly records RocksDB metrics from
RocksDB's statistics. For segmented state stores the metrics are aggregated over the
segments.
Changes the threading model from one recording thread per logical
RocksDB instance to one recording thread per Kafka Streams instance.
@cadonna cadonna force-pushed the AK8580-Compute_RocksDB_metrics branch from 4987afa to 262b663 Compare September 23, 2019 22:15
Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again, @cadonna !

@guozhangwang guozhangwang merged commit ad3b843 into apache:trunk Sep 24, 2019
@guozhangwang
Copy link
Contributor

Merged to trunk, thanks @cadonna ! Please make sure we have another PR for docs change as well :)

ijuma added a commit to confluentinc/kafka that referenced this pull request Sep 29, 2019
Conflicts:
* .gitignore: addition of clients/src/generated-test was near
local additions for support-metrics.
* checkstyle/suppressions.xml: upstream refactoring of exclusions
for generator were near the local changes for support-metrics.
* gradle.properties: scala version bump caused a minor conflict
due to the kafka version change locally.
gradle/dependencies.gradle: bcpkix version bump was near avro
additions in the local version.

* apache-github/trunk: (49 commits)
  KAFKA-8471: Replace control requests/responses with automated protocol (apache#7353)
  MINOR: Don't generate unnecessary strings for debug logging in FetchSessionHandler (apache#7394)
  MINOR:fixed typo and removed outdated varilable name (apache#7402)
  KAFKA-8934: Create version file during build for Streams (apache#7397)
  KAFKA-8319: Make KafkaStreamsTest a non-integration test class (apache#7382)
  KAFKA-6883: Add toUpperCase support to sasl.kerberos.principal.to.local rule (KIP-309)
  KAFKA-8907; Return topic configs in CreateTopics response (KIP-525) (apache#7380)
  MINOR: Address review comments for KIP-504 authorizer changes (apache#7379)
  MINOR: add versioning to request and response headers (apache#7372)
  KAFKA-7273: Extend Connect Converter to support headers (apache#6362)
  MINOR: improve the Kafka RPC code generator (apache#7340)
  MINOR: Improve the org.apache.kafka.common.protocol code (apache#7344)
  KAFKA-8880: Docs on upgrade-guide (apache#7385)
  KAFKA-8179: do not suspend standby tasks during rebalance (apache#7321)
  KAFKA-8580: Compute RocksDB metrics (apache#7263)
  KAFKA-8880: Add overloaded function of Consumer.committed (apache#7304)
  HOTFIX: fix Kafka Streams upgrade note for broker backward compatibility (apache#7363)
  KAFKA-8848; Update system tests to use new AclAuthorizer (apache#7374)
  MINOR: remove unnecessary null check (apache#7299)
  KAFKA-6958: Overload methods for group and windowed stream to allow to name operation name using the new Named class (apache#6413)
  ...
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
6 participants