Skip to content

Commit

Permalink
KAFKA-8580: Compute RocksDB metrics (#7263)
Browse files Browse the repository at this point in the history
A metric recorder runs in it own thread and regularly records RocksDB metrics from
RocksDB's statistics. For segmented state stores the metrics are aggregated over the
segments.

Reviewers: John Roesler <vvcephei@users.noreply.github.com>, A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
  • Loading branch information
cadonna authored and guozhangwang committed Sep 24, 2019
1 parent bcc0237 commit ad3b843
Show file tree
Hide file tree
Showing 20 changed files with 782 additions and 249 deletions.
72 changes: 50 additions & 22 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
import org.slf4j.Logger;

import java.time.Duration;
Expand All @@ -80,6 +82,7 @@

import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;

/**
Expand Down Expand Up @@ -138,13 +141,16 @@ public class KafkaStreams implements AutoCloseable {
private final StateDirectory stateDirectory;
private final StreamsMetadataState streamsMetadataState;
private final ScheduledExecutorService stateDirCleaner;
private final ScheduledExecutorService rocksDBMetricsRecordingTriggerThread;
private final QueryableStoreProvider queryableStoreProvider;
private final Admin adminClient;

private GlobalStreamThread globalStreamThread;
private KafkaStreams.StateListener stateListener;
private StateRestoreListener globalStateRestoreListener;

private final RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger = new RocksDBMetricsRecordingTrigger();

// container states
/**
* Kafka Streams states are the possible state that a Kafka Streams instance can be in.
Expand Down Expand Up @@ -698,15 +704,18 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
GlobalStreamThread.State globalThreadState = null;
if (globalTaskTopology != null) {
final String globalThreadId = clientId + "-GlobalStreamThread";
globalStreamThread = new GlobalStreamThread(globalTaskTopology,
config,
clientSupplier.getGlobalConsumer(config.getGlobalConsumerConfigs(clientId)),
stateDirectory,
cacheSizePerThread,
metrics,
time,
globalThreadId,
delegatingStateRestoreListener);
globalStreamThread = new GlobalStreamThread(
globalTaskTopology,
config,
clientSupplier.getGlobalConsumer(config.getGlobalConsumerConfigs(clientId)),
stateDirectory,
cacheSizePerThread,
metrics,
time,
globalThreadId,
delegatingStateRestoreListener,
rocksDBMetricsRecordingTrigger
);
globalThreadState = globalStreamThread.state();
}

Expand All @@ -716,19 +725,21 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length);
final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
for (int i = 0; i < threads.length; i++) {
threads[i] = StreamThread.create(internalTopologyBuilder,
config,
clientSupplier,
adminClient,
processId,
clientId,
metrics,
time,
streamsMetadataState,
cacheSizePerThread,
stateDirectory,
delegatingStateRestoreListener,
i + 1);
threads[i] = StreamThread.create(
internalTopologyBuilder,
config,
clientSupplier,
adminClient,
processId,
clientId,
metrics,
time,
streamsMetadataState,
cacheSizePerThread,
stateDirectory,
delegatingStateRestoreListener,
i + 1);
threads[i].setRocksDBMetricsRecordingTrigger(rocksDBMetricsRecordingTrigger);
threadState.put(threads[i].getId(), threads[i].state());
storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
}
Expand All @@ -749,6 +760,12 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
thread.setDaemon(true);
return thread;
});

rocksDBMetricsRecordingTriggerThread = Executors.newSingleThreadScheduledExecutor(r -> {
final Thread thread = new Thread(r, clientId + "-RocksDBMetricsRecordingTrigger");
thread.setDaemon(true);
return thread;
});
}

private static HostInfo parseHostInfo(final String endPoint) {
Expand Down Expand Up @@ -803,6 +820,17 @@ public synchronized void start() throws IllegalStateException, StreamsException
stateDirectory.cleanRemovedTasks(cleanupDelay);
}
}, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);

final long recordingDelay = 0;
final long recordingInterval = 1;
if (RecordingLevel.forName(config.getString(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
rocksDBMetricsRecordingTriggerThread.scheduleAtFixedRate(
rocksDBMetricsRecordingTrigger,
recordingDelay,
recordingInterval,
TimeUnit.MINUTES
);
}
} else {
throw new IllegalStateException("The client is either already started or already stopped, cannot re-start");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
import org.slf4j.Logger;

import java.io.IOException;
Expand Down Expand Up @@ -182,18 +183,20 @@ public GlobalStreamThread(final ProcessorTopology topology,
final Metrics metrics,
final Time time,
final String threadClientId,
final StateRestoreListener stateRestoreListener) {
final StateRestoreListener stateRestoreListener,
final RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger) {
super(threadClientId);
this.time = time;
this.config = config;
this.topology = topology;
this.globalConsumer = globalConsumer;
this.stateDirectory = stateDirectory;
this.streamsMetrics = new StreamsMetricsImpl(
streamsMetrics = new StreamsMetricsImpl(
metrics,
threadClientId,
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG)
);
streamsMetrics.setRocksDBMetricsRecordingTrigger(rocksDBMetricsRecordingTrigger);
this.logPrefix = String.format("global-stream-thread [%s] ", threadClientId);
this.logContext = new LogContext(logPrefix);
this.log = logContext.logger(getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
import org.slf4j.Logger;

import java.time.Duration;
Expand Down Expand Up @@ -748,6 +749,10 @@ public static String getSharedAdminClientId(final String clientId) {
return clientId + "-admin";
}

public void setRocksDBMetricsRecordingTrigger(final RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger) {
streamsMetrics.setRocksDBMetricsRecordingTrigger(rocksDBMetricsRecordingTrigger);
}

/**
* Execute the stream processors
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -61,6 +62,8 @@ public enum Version {
private final Map<String, Deque<String>> cacheLevelSensors = new HashMap<>();
private final Map<String, Deque<String>> storeLevelSensors = new HashMap<>();

private RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger;

private static final String SENSOR_PREFIX_DELIMITER = ".";
private static final String SENSOR_NAME_DELIMITER = ".s.";

Expand Down Expand Up @@ -122,6 +125,14 @@ public Version version() {
return version;
}

public void setRocksDBMetricsRecordingTrigger(final RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger) {
this.rocksDBMetricsRecordingTrigger = rocksDBMetricsRecordingTrigger;
}

public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() {
return rocksDBMetricsRecordingTrigger;
}

public final Sensor threadLevelSensor(final String sensorName,
final RecordingLevel recordingLevel,
final Sensor... parents) {
Expand Down Expand Up @@ -586,7 +597,24 @@ public static void addSumMetricToSensor(final Sensor sensor,
final Map<String, String> tags,
final String operation,
final String description) {
sensor.add(new MetricName(operation + TOTAL_SUFFIX, group, description, tags), new CumulativeSum());
addSumMetricToSensor(sensor, group, tags, operation, true, description);
}

public static void addSumMetricToSensor(final Sensor sensor,
final String group,
final Map<String, String> tags,
final String operation,
final boolean withSuffix,
final String description) {
sensor.add(
new MetricName(
withSuffix ? operation + TOTAL_SUFFIX : operation,
group,
description,
tags
),
new CumulativeSum()
);
}

public static void addValueMetricToSensor(final Sensor sensor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
*/
class KeyValueSegments extends AbstractSegments<KeyValueSegment> {

final private RocksDBMetricsRecorder metricsRecorder;
private final RocksDBMetricsRecorder metricsRecorder;

KeyValueSegments(final String name,
final String metricsScope,
Expand All @@ -51,10 +51,4 @@ public KeyValueSegment getOrCreateSegment(final long segmentId,
return newSegment;
}
}

@Override
public void close() {
metricsRecorder.close();
super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
private RocksDBConfigSetter configSetter;

private final RocksDBMetricsRecorder metricsRecorder;
private boolean closeMetricsRecorder = false;
private boolean removeStatisticsFromMetricsRecorder = false;
private boolean isStatisticsRegistered = false;

private volatile boolean prepareForBulkload = false;
ProcessorContext internalProcessorContext;
Expand All @@ -117,7 +116,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BulkLoadingSt
RocksDBStore(final String name,
final String metricsScope) {
this(name, DB_FILE_DIR, new RocksDBMetricsRecorder(metricsScope, name));
closeMetricsRecorder = true;
}

RocksDBStore(final String name,
Expand Down Expand Up @@ -190,26 +188,21 @@ void openDB(final ProcessorContext context) {
throw new ProcessorStateException(fatal);
}

setUpMetrics(context, configs);
maybeSetUpMetricsRecorder(context, configs);

openRocksDB(dbOptions, columnFamilyOptions);
open = true;
}

private void setUpMetrics(final ProcessorContext context, final Map<String, Object> configs) {
private void maybeSetUpMetricsRecorder(final ProcessorContext context, final Map<String, Object> configs) {
if (userSpecifiedOptions.statistics() == null &&
RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {

isStatisticsRegistered = true;
// metrics recorder will clean up statistics object
final Statistics statistics = new Statistics();
userSpecifiedOptions.setStatistics(statistics);
metricsRecorder.addStatistics(
name,
statistics,
(StreamsMetricsImpl) context.metrics(),
context.taskId()
);
removeStatisticsFromMetricsRecorder = true;
metricsRecorder.addStatistics(name, statistics, (StreamsMetricsImpl) context.metrics(), context.taskId());
}
}

Expand Down Expand Up @@ -434,7 +427,7 @@ public synchronized void close() {
configSetter = null;
}

closeOrUpdateMetricsRecorder();
maybeRemoveStatisticsFromMetricsRecorder();

// Important: do not rearrange the order in which the below objects are closed!
// Order of closing must follow: ColumnFamilyHandle > RocksDB > DBOptions > ColumnFamilyOptions
Expand All @@ -455,6 +448,13 @@ public synchronized void close() {
cache = null;
}

private void maybeRemoveStatisticsFromMetricsRecorder() {
if (isStatisticsRegistered) {
metricsRecorder.removeStatistics(name);
isStatisticsRegistered = false;
}
}

private void closeOpenIterators() {
final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
synchronized (openIterators) {
Expand All @@ -468,14 +468,6 @@ private void closeOpenIterators() {
}
}

private void closeOrUpdateMetricsRecorder() {
if (closeMetricsRecorder) {
metricsRecorder.close();
} else if (removeStatisticsFromMetricsRecorder) {
metricsRecorder.removeStatistics(name);
}
}

interface RocksDBAccessor {

void put(final byte[] key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,4 @@ public TimestampedSegment getOrCreateSegment(final long segmentId,
return newSegment;
}
}

@Override
public void close() {
metricsRecorder.close();
super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,13 @@ public static Sensor compactionTimeMaxSensor(final StreamsMetricsImpl streamsMet
public static Sensor numberOfOpenFilesSensor(final StreamsMetricsImpl streamsMetrics,
final RocksDBMetricContext metricContext) {
final Sensor sensor = createSensor(streamsMetrics, metricContext, NUMBER_OF_OPEN_FILES);
addValueMetricToSensor(
addSumMetricToSensor(
sensor,
STATE_LEVEL_GROUP,
streamsMetrics
.storeLevelTagMap(metricContext.taskName(), metricContext.metricsScope(), metricContext.storeName()),
NUMBER_OF_OPEN_FILES,
false,
NUMBER_OF_OPEN_FILES_DESCRIPTION
);
return sensor;
Expand Down

0 comments on commit ad3b843

Please sign in to comment.