Skip to content

Commit

Permalink
Include feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
cadonna committed Sep 23, 2019
1 parent 85977e6 commit 262b663
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,6 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
reporters.add(new JmxReporter(JMX_PREFIX));
metrics = new Metrics(metricConfig, reporters, time);


// re-write the physical topology according to the config
internalTopologyBuilder.rewriteTopology(config);

Expand Down Expand Up @@ -822,11 +821,13 @@ public synchronized void start() throws IllegalStateException, StreamsException
}
}, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);

final long recordingDelay = 0;
final long recordingInterval = 1;
if (RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
rocksDBMetricsRecordingTriggerThread.scheduleAtFixedRate(
rocksDBMetricsRecordingTrigger,
Duration.ZERO.toMinutes(),
Duration.ofMinutes(1).toMinutes(),
recordingDelay,
recordingInterval,
TimeUnit.MINUTES
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,24 @@ public static void addSumMetricToSensor(final Sensor sensor,
final Map<String, String> tags,
final String operation,
final String description) {
sensor.add(new MetricName(operation + TOTAL_SUFFIX, group, description, tags), new CumulativeSum());
addSumMetricToSensor(sensor, group, tags, operation, true, description);
}

public static void addSumMetricToSensor(final Sensor sensor,
final String group,
final Map<String, String> tags,
final String operation,
final boolean withSuffix,
final String description) {
sensor.add(
new MetricName(
withSuffix ? operation + TOTAL_SUFFIX : operation,
group,
description,
tags
),
new CumulativeSum()
);
}

public static void addValueMetricToSensor(final Sensor sensor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
private RocksDBConfigSetter configSetter;

private final RocksDBMetricsRecorder metricsRecorder;
private boolean isRecordingLevelDebug = false;
private boolean isStatisticsRegistered = false;

private volatile boolean prepareForBulkload = false;
ProcessorContext internalProcessorContext;
Expand Down Expand Up @@ -198,7 +198,7 @@ private void maybeSetUpMetricsRecorder(final ProcessorContext context, final Map
if (userSpecifiedOptions.statistics() == null &&
RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {

isRecordingLevelDebug = true;
isStatisticsRegistered = true;
// metrics recorder will clean up statistics object
final Statistics statistics = new Statistics();
userSpecifiedOptions.setStatistics(statistics);
Expand Down Expand Up @@ -449,9 +449,9 @@ public synchronized void close() {
}

private void maybeRemoveStatisticsFromMetricsRecorder() {
if (isRecordingLevelDebug) {
if (isStatisticsRegistered) {
metricsRecorder.removeStatistics(name);
isRecordingLevelDebug = false;
isStatisticsRegistered = false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addRateOfSumMetricToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndSumMetricsToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addSumMetricToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addValueMetricToSensor;

public class RocksDBMetrics {
Expand Down Expand Up @@ -365,12 +366,13 @@ public static Sensor compactionTimeMaxSensor(final StreamsMetricsImpl streamsMet
public static Sensor numberOfOpenFilesSensor(final StreamsMetricsImpl streamsMetrics,
final RocksDBMetricContext metricContext) {
final Sensor sensor = createSensor(streamsMetrics, metricContext, NUMBER_OF_OPEN_FILES);
addValueMetricToSensor(
addSumMetricToSensor(
sensor,
STATE_LEVEL_GROUP,
streamsMetrics
.storeLevelTagMap(metricContext.taskName(), metricContext.metricsScope(), metricContext.storeName()),
NUMBER_OF_OPEN_FILES,
false,
NUMBER_OF_OPEN_FILES_DESCRIPTION
);
return sensor;
Expand All @@ -379,7 +381,7 @@ public static Sensor numberOfOpenFilesSensor(final StreamsMetricsImpl streamsMet
public static Sensor numberOfFileErrorsSensor(final StreamsMetricsImpl streamsMetrics,
final RocksDBMetricContext metricContext) {
final Sensor sensor = createSensor(streamsMetrics, metricContext, NUMBER_OF_FILE_ERRORS);
addValueMetricToSensor(
addSumMetricToSensor(
sensor,
STATE_LEVEL_GROUP,
streamsMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public TaskId taskId() {
return taskId;
}

public void addStatistics(final String storeName,
public void addStatistics(final String segmentName,
final Statistics statistics,
final StreamsMetricsImpl streamsMetrics,
final TaskId taskId) {
Expand All @@ -78,7 +78,7 @@ public void addStatistics(final String storeName,
isInitialized = true;
}
if (this.taskId != taskId) {
throw new IllegalStateException("Statistics of store \"" + storeName + "\" for task " + taskId
throw new IllegalStateException("Statistics of store \"" + segmentName + "\" for task " + taskId
+ " cannot be added to metrics recorder for task " + this.taskId + ". This is a bug in Kafka Streams.");
}
if (statisticsToRecord.isEmpty()) {
Expand All @@ -87,16 +87,15 @@ public void addStatistics(final String storeName,
taskId
);
streamsMetrics.rocksDBMetricsRecordingTrigger().addMetricsRecorder(this);
} else if (statisticsToRecord.containsKey(storeName)) {
throw new IllegalStateException("Statistics for store \"" + storeName + "\" of task " + taskId +
} else if (statisticsToRecord.containsKey(segmentName)) {
throw new IllegalStateException("Statistics for store \"" + segmentName + "\" of task " + taskId +
" has been already added. This is a bug in Kafka Streams.");
}
statistics.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS);
logger.debug("Adding statistics for store {} of task {}", storeName, taskId);
statisticsToRecord.put(storeName, statistics);
logger.debug("Adding statistics for store {} of task {}", segmentName, taskId);
statisticsToRecord.put(segmentName, statistics);
}


private void initSensors(final StreamsMetricsImpl streamsMetrics, final TaskId taskId) {
final RocksDBMetricContext metricContext = new RocksDBMetricContext(taskId.toString(), metricsScope, storeName);
bytesWrittenToDatabaseSensor = RocksDBMetrics.bytesWrittenToDatabaseSensor(streamsMetrics, metricContext);
Expand All @@ -114,18 +113,18 @@ private void initSensors(final StreamsMetricsImpl streamsMetrics, final TaskId t
numberOfFileErrorsSensor = RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricContext);
}

public void removeStatistics(final String storeName) {
logger.debug("Removing statistics for store {} of task {}", storeName, taskId);
final Statistics removedStatistics = statisticsToRecord.remove(storeName);
public void removeStatistics(final String segmentName) {
logger.debug("Removing statistics for store {} of task {}", segmentName, taskId);
final Statistics removedStatistics = statisticsToRecord.remove(segmentName);
if (removedStatistics == null) {
throw new IllegalStateException("No statistics for store \"" + storeName + "\" of task " + taskId
throw new IllegalStateException("No statistics for store \"" + segmentName + "\" of task " + taskId
+ " could be found. This is a bug in Kafka Streams.");
}
removedStatistics.close();
if (statisticsToRecord.isEmpty()) {
logger.debug(
"Removing metrics recorder for store {} of task {} from metrics recording trigger",
this.storeName,
storeName,
taskId
);
streamsMetrics.rocksDBMetricsRecordingTrigger().removeMetricsRecorder(this);
Expand Down Expand Up @@ -165,9 +164,9 @@ public void record() {
writeStallDuration += statistics.getAndResetTickerCount(TickerType.STALL_MICROS);
bytesWrittenDuringCompaction += statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES);
bytesReadDuringCompaction += statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES);
numberOfOpenFiles += statistics.getTickerCount(TickerType.NO_FILE_OPENS)
- statistics.getTickerCount(TickerType.NO_FILE_CLOSES);
numberOfFileErrors += statistics.getTickerCount(TickerType.NO_FILE_ERRORS);
numberOfOpenFiles += statistics.getAndResetTickerCount(TickerType.NO_FILE_OPENS)
- statistics.getAndResetTickerCount(TickerType.NO_FILE_CLOSES);
numberOfFileErrors += statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS);
}
bytesWrittenToDatabaseSensor.record(bytesWrittenToDatabase);
bytesReadFromDatabaseSensor.record(bytesReadFromDatabase);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public class MetricsIntegrationTest {
private static final String COMPACTION_TIME_MIN = "compaction-time-min";
private static final String COMPACTION_TIME_MAX = "compaction-time-max";
private static final String NUMBER_OF_OPEN_FILES = "number-open-files";
private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors";
private static final String NUMBER_OF_FILE_ERRORS = "number-file-errors-total";

// stores name
private static final String TIME_WINDOWED_AGGREGATED_STREAM_STORE = "time-windowed-aggregated-stream-store";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,15 @@ public void shouldRecordMetrics() {
bytesReadDuringCompactionSensor.record(5 + 6);
replay(bytesReadDuringCompactionSensor);

expect(statisticsToAdd1.getTickerCount(TickerType.NO_FILE_OPENS)).andReturn(5L);
expect(statisticsToAdd1.getTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(3L);
expect(statisticsToAdd2.getTickerCount(TickerType.NO_FILE_OPENS)).andReturn(7L);
expect(statisticsToAdd2.getTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(4L);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(5L);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(3L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(7L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(4L);
numberOfOpenFilesSensor.record((5 + 7) - (3 + 4));
replay(numberOfOpenFilesSensor);

expect(statisticsToAdd1.getTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(34L);
expect(statisticsToAdd2.getTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(11L);
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(34L);
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(11L);
numberOfFileErrorsSensor.record(11 + 34);
replay(numberOfFileErrorsSensor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,14 @@ public void shouldGetCompactionTimeMaxSensor() {
public void shouldGetNumberOfOpenFilesSensor() {
final String metricNamePrefix = "number-open-files";
final String description = "Number of currently open files";
verifyValueSensor(metricNamePrefix, description, RocksDBMetrics::numberOfOpenFilesSensor);
verifySumSensor(metricNamePrefix, false, description, RocksDBMetrics::numberOfOpenFilesSensor);
}

@Test
public void shouldGetNumberOfFilesErrors() {
final String metricNamePrefix = "number-file-errors";
final String description = "Total number of file errors occurred";
setupStreamsMetricsMock(metricNamePrefix);
StreamsMetricsImpl.addValueMetricToSensor(sensor, STATE_LEVEL_GROUP, tags, metricNamePrefix, description);

replayCallAndVerify(RocksDBMetrics::numberOfFileErrorsSensor);
verifySumSensor(metricNamePrefix, true, description, RocksDBMetrics::numberOfFileErrorsSensor);
}

private void verifyRateAndTotalSensor(final String metricNamePrefix,
Expand Down Expand Up @@ -252,6 +249,21 @@ private void verifyValueSensor(final String metricNamePrefix,
replayCallAndVerify(sensorCreator);
}

private void verifySumSensor(final String metricNamePrefix,
final boolean withSuffix,
final String description,
final SensorCreator sensorCreator) {
setupStreamsMetricsMock(metricNamePrefix);
if (withSuffix) {
StreamsMetricsImpl.addSumMetricToSensor(sensor, STATE_LEVEL_GROUP, tags, metricNamePrefix, description);
} else {
StreamsMetricsImpl
.addSumMetricToSensor(sensor, STATE_LEVEL_GROUP, tags, metricNamePrefix, withSuffix, description);
}

replayCallAndVerify(sensorCreator);
}

private void setupStreamsMetricsMock(final String metricNamePrefix) {
mockStatic(StreamsMetricsImpl.class);
expect(streamsMetrics.storeLevelSensor(
Expand Down

0 comments on commit 262b663

Please sign in to comment.