From 4f22bf3305a14912303437e2507f5063dee6ab08 Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Tue, 2 Aug 2016 16:07:27 -0700 Subject: [PATCH 1/4] KAFKA-3999: Record raw size of fetch responses Currently, only the decompressed size of fetch response is recorded. This PR adds a sensor to keep track of its raw size as well. --- .../clients/consumer/internals/Fetcher.java | 8 +++ .../internals/FetcherMetricsRegistry.java | 64 +++++++++++-------- 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index e73ff4e4f393..ed67971cc509 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -205,6 +205,8 @@ public int sendFetches() { @Override public void onSuccess(ClientResponse resp) { FetchResponse response = (FetchResponse) resp.responseBody(); + sensors.rawBytesFetched.record( + FetchResponse.sizeOf(request.desiredOrLatestVersion(), response.responseData())); if (!matchesRequestedPartitions(request, response)) { // obviously we expect the broker to always send us valid responses, so this check // is mainly for test cases where mock fetch responses must be manually crafted. @@ -1220,6 +1222,7 @@ protected void increment(int bytes, int records) { private static class FetchManagerMetrics { private final Metrics metrics; private FetcherMetricsRegistry metricsRegistry; + private final Sensor rawBytesFetched; private final Sensor bytesFetched; private final Sensor recordsFetched; private final Sensor fetchLatency; @@ -1231,6 +1234,11 @@ private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry metricsRegis this.metrics = metrics; this.metricsRegistry = metricsRegistry; + this.rawBytesFetched = metrics.sensor("raw-bytes-fetched"); + this.rawBytesFetched.add(metrics.metricInstance(metricsRegistry.rawFetchSizeAvg), new Avg()); + this.rawBytesFetched.add(metrics.metricInstance(metricsRegistry.rawFetchSizeMax), new Max()); + this.rawBytesFetched.add(metrics.metricInstance(metricsRegistry.rawBytesConsumedRate), new Rate()); + this.bytesFetched = metrics.sensor("bytes-fetched"); this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeAvg), new Avg()); this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeMax), new Max()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java index 0c98342fca9e..3277b0bad53a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java @@ -25,6 +25,9 @@ public class FetcherMetricsRegistry { + public MetricNameTemplate rawFetchSizeAvg; + public MetricNameTemplate rawFetchSizeMax; + public MetricNameTemplate rawBytesConsumedRate; public MetricNameTemplate fetchSizeAvg; public MetricNameTemplate fetchSizeMax; public MetricNameTemplate bytesConsumedRate; @@ -54,67 +57,76 @@ public FetcherMetricsRegistry(String metricGrpPrefix) { } public FetcherMetricsRegistry(Set tags, String metricGrpPrefix) { - + /***** Client level *****/ String groupName = metricGrpPrefix + "-fetch-manager-metrics"; - - this.fetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName, - "The average number of bytes fetched per request", tags); - this.fetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName, + this.rawFetchSizeAvg = new MetricNameTemplate("raw-fetch-size-avg", groupName, + "The average number of raw bytes fetched per request", tags); + this.rawFetchSizeMax = new MetricNameTemplate("raw-fetch-size-max", groupName, + "The maximum number of raw bytes fetched per request", tags); + this.rawBytesConsumedRate = new MetricNameTemplate("raw-bytes-consumed-rate", groupName, + "The average number of raw bytes consumed per second", tags); + + this.fetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName, + "The average number of bytes fetched per request", tags); + this.fetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName, "The maximum number of bytes fetched per request", tags); - this.bytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName, + this.bytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName, "The average number of bytes consumed per second", tags); - this.recordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName, + this.recordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName, "The average number of records in each request", tags); - this.recordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName, + this.recordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName, "The average number of records consumed per second", tags); - this.fetchLatencyAvg = new MetricNameTemplate("fetch-latency-avg", groupName, + this.fetchLatencyAvg = new MetricNameTemplate("fetch-latency-avg", groupName, "The average time taken for a fetch request.", tags); - this.fetchLatencyMax = new MetricNameTemplate("fetch-latency-max", groupName, + this.fetchLatencyMax = new MetricNameTemplate("fetch-latency-max", groupName, "The max time taken for any fetch request.", tags); - this.fetchRequestRate = new MetricNameTemplate("fetch-rate", groupName, + this.fetchRequestRate = new MetricNameTemplate("fetch-rate", groupName, "The number of fetch requests per second.", tags); - this.recordsLagMax = new MetricNameTemplate("records-lag-max", groupName, + this.recordsLagMax = new MetricNameTemplate("records-lag-max", groupName, "The maximum lag in terms of number of records for any partition in this window", tags); - this.fetchThrottleTimeAvg = new MetricNameTemplate("fetch-throttle-time-avg", groupName, + this.fetchThrottleTimeAvg = new MetricNameTemplate("fetch-throttle-time-avg", groupName, "The average throttle time in ms", tags); - this.fetchThrottleTimeMax = new MetricNameTemplate("fetch-throttle-time-max", groupName, + this.fetchThrottleTimeMax = new MetricNameTemplate("fetch-throttle-time-max", groupName, "The maximum throttle time in ms", tags); /***** Topic level *****/ Set topicTags = new HashSet<>(tags); topicTags.add("topic"); - this.topicFetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName, + this.topicFetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName, "The average number of bytes fetched per request for a topic", topicTags); - this.topicFetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName, + this.topicFetchSizeMax = new MetricNameTemplate("fetch-size-max", groupName, "The maximum number of bytes fetched per request for a topic", topicTags); - this.topicBytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName, + this.topicBytesConsumedRate = new MetricNameTemplate("bytes-consumed-rate", groupName, "The average number of bytes consumed per second for a topic", topicTags); - this.topicRecordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName, + this.topicRecordsPerRequestAvg = new MetricNameTemplate("records-per-request-avg", groupName, "The average number of records in each request for a topic", topicTags); - this.topicRecordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName, + this.topicRecordsConsumedRate = new MetricNameTemplate("records-consumed-rate", groupName, "The average number of records consumed per second for a topic", topicTags); - + /***** Partition level *****/ - this.partitionRecordsLag = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName, + this.partitionRecordsLag = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName, "The latest lag of the partition", tags); - this.partitionRecordsLagMax = new MetricNameTemplate("{topic}-{partition}.records-lag-max", groupName, + this.partitionRecordsLagMax = new MetricNameTemplate("{topic}-{partition}.records-lag-max", groupName, "The max lag of the partition", tags); - this.partitionRecordsLagAvg = new MetricNameTemplate("{topic}-{partition}.records-lag-avg", groupName, + this.partitionRecordsLagAvg = new MetricNameTemplate("{topic}-{partition}.records-lag-avg", groupName, "The average lag of the partition", tags); - - + + } - + public List getAllTemplates() { return Arrays.asList( + rawFetchSizeAvg, + rawFetchSizeMax, + rawBytesConsumedRate, fetchSizeAvg, fetchSizeMax, bytesConsumedRate, From 402aa093db243965d2b6c04118ac7ee6d196fd45 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 22 Sep 2017 07:33:50 +0800 Subject: [PATCH 2/4] KAFKA-5937: Improve ProcessorStateManager exception handling Author: Matthias J. Sax Reviewers: Ted Yu , Damian Guy , Guozhang Wang Closes #3913 from mjsax/kafka-5937-exceptions-processor-state-manager --- .../internals/ProcessorStateManager.java | 47 ++++---- .../processor/internals/StateDirectory.java | 36 ++++--- .../internals/ProcessorStateManagerTest.java | 102 +++++++++++++++++- 3 files changed, 147 insertions(+), 38 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 942e41a8304d..2f16547e3105 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.BatchingStateRestoreCallback; @@ -67,8 +66,7 @@ public class ProcessorStateManager implements StateManager { private OffsetCheckpoint checkpoint; /** - * @throws LockException if the state directory cannot be locked because another thread holds the lock - * (this might be recoverable by retrying) + * @throws ProcessorStateException if the task directory does not exist and could not be created * @throws IOException if any severe error happens while creating or locking the state directory */ public ProcessorStateManager(final TaskId taskId, @@ -96,15 +94,7 @@ public ProcessorStateManager(final TaskId taskId, restoreCallbacks = isStandby ? new HashMap() : null; this.storeToChangelogTopic = storeToChangelogTopic; - // get a handle on the parent/base directory of the task directory - // note that the parent directory could have been accidentally deleted here, - // so catch that exception if that is the case - try { - baseDir = stateDirectory.directoryForTask(taskId); - } catch (final ProcessorStateException e) { - throw new LockException(String.format("%sFailed to get the directory for task %s. Exception %s", - logPrefix, taskId, e)); - } + baseDir = stateDirectory.directoryForTask(taskId); // load the checkpoint information checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); @@ -256,40 +246,49 @@ public StateStore getStore(final String name) { @Override public void flush() { + ProcessorStateException firstException = null; + // attempting to flush the stores if (!stores.isEmpty()) { log.debug("Flushing all stores registered in the state manager"); for (final StateStore store : stores.values()) { + log.trace("Flushing store {}", store.name()); try { - log.trace("Flushing store={}", store.name()); store.flush(); } catch (final Exception e) { - throw new ProcessorStateException(String.format("%sFailed to flush state store %s", logPrefix, store.name()), e); + if (firstException == null) { + firstException = new ProcessorStateException(String.format("%sFailed to flush state store %s", logPrefix, store.name()), e); + } + log.error("Failed to flush state store {}: ", store.name(), e); } } } + + if (firstException != null) { + throw firstException; + } } /** * {@link StateStore#close() Close} all stores (even in case of failure). - * Re-throw the first + * Log all exception and re-throw the first exception that did occur at the end. * @throws ProcessorStateException if any error happens when closing the state stores */ @Override public void close(final Map ackedOffsets) throws ProcessorStateException { - RuntimeException firstException = null; + ProcessorStateException firstException = null; // attempting to close the stores, just in case they // are not closed by a ProcessorNode yet if (!stores.isEmpty()) { log.debug("Closing its state manager and all the registered state stores"); - for (final Map.Entry entry : stores.entrySet()) { - log.debug("Closing storage engine {}", entry.getKey()); + for (final StateStore store : stores.values()) { + log.debug("Closing storage engine {}", store.name()); try { - entry.getValue().close(); + store.close(); } catch (final Exception e) { if (firstException == null) { - firstException = new ProcessorStateException(String.format("%sFailed to close state store %s", logPrefix, entry.getKey()), e); + firstException = new ProcessorStateException(String.format("%sFailed to close state store %s", logPrefix, store.name()), e); } - log.error("Failed to close state store {}: ", entry.getKey(), e); + log.error("Failed to close state store {}: ", store.name(), e); } } @@ -309,11 +308,11 @@ public void close(final Map ackedOffsets) throws Processor public void checkpoint(final Map ackedOffsets) { log.trace("Writing checkpoint: {}", ackedOffsets); checkpointedOffsets.putAll(changelogReader.restoredOffsets()); - for (final Map.Entry entry : stores.entrySet()) { - final String storeName = entry.getKey(); + for (final StateStore store : stores.values()) { + final String storeName = store.name(); // only checkpoint the offset to the offsets file if // it is persistent AND changelog enabled - if (entry.getValue().persistent() && storeToChangelogTopic.containsKey(storeName)) { + if (store.persistent() && storeToChangelogTopic.containsKey(storeName)) { final String changelogTopic = storeToChangelogTopic.get(storeName); final TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName)); if (ackedOffsets.containsKey(topicPartition)) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index c4262bc857f6..b7bc45cfdf06 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -62,39 +62,50 @@ private static class LockAndOwner { } } + /** + * Ensures that the state base directory as well as the application's sub-directory are created. + * + * @throws ProcessorStateException if the base state directory or application state directory does not exist + * and could not be created + */ public StateDirectory(final String applicationId, final String stateDirConfig, final Time time) { this.time = time; final File baseDir = new File(stateDirConfig); if (!baseDir.exists() && !baseDir.mkdirs()) { - throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", - stateDirConfig)); + throw new ProcessorStateException( + String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirConfig)); } stateDir = new File(baseDir, applicationId); if (!stateDir.exists() && !stateDir.mkdir()) { - throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", - stateDir.getPath())); + throw new ProcessorStateException( + String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath())); } } /** - * Get or create the directory for the {@link TaskId} - * @param taskId + * Get or create the directory for the provided {@link TaskId}. * @return directory for the {@link TaskId} + * @throws ProcessorStateException if the task directory does not exists and could not be created */ File directoryForTask(final TaskId taskId) { final File taskDir = new File(stateDir, taskId.toString()); if (!taskDir.exists() && !taskDir.mkdir()) { - throw new ProcessorStateException(String.format("task directory [%s] doesn't exist and couldn't be created", - taskDir.getPath())); + throw new ProcessorStateException( + String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath())); } return taskDir; } + /** + * Get or create the directory for the global stores. + * @return directory for the global stores + * @throws ProcessorStateException if the global store directory does not exists and could not be created + */ File globalStateDir() { final File dir = new File(stateDir, "global"); if (!dir.exists() && !dir.mkdir()) { - throw new ProcessorStateException(String.format("global state directory [%s] doesn't exist and couldn't be created", - dir.getPath())); + throw new ProcessorStateException( + String.format("global state directory [%s] doesn't exist and couldn't be created", dir.getPath())); } return dir; } @@ -102,6 +113,7 @@ File globalStateDir() { private String logPrefix() { return String.format("stream-thread [%s]", Thread.currentThread().getName()); } + /** * Get the lock for the {@link TaskId}s directory if it is available * @param taskId @@ -192,9 +204,7 @@ synchronized void unlockGlobalState() throws IOException { } /** - * Unlock the state directory for the given {@link TaskId} - * @param taskId - * @throws IOException + * Unlock the state directory for the given {@link TaskId}. */ synchronized void unlock(final TaskId taskId) throws IOException { final LockAndOwner lockAndOwner = locks.get(taskId); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 1db2200a59d3..f3135d581012 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.TestUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -41,6 +42,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -79,7 +81,6 @@ public class ProcessorStateManagerTest { private OffsetCheckpoint checkpoint; private StateDirectory stateDirectory; - @Before public void setup() { baseDir = TestUtils.tempDirectory(); @@ -486,6 +487,35 @@ public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeen } + @Test + public void shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAnException() throws IOException { + + final ProcessorStateManager stateManager = new ProcessorStateManager( + taskId, + Collections.singleton(changelogTopicPartition), + false, + stateDirectory, + Collections.singletonMap(storeName, changelogTopic), + changelogReader, + false, + logContext); + + final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore(storeName, true) { + @Override + public void flush() { + throw new RuntimeException("KABOOM!"); + } + }; + stateManager.register(stateStore, false, stateStore.stateRestoreCallback); + + try { + stateManager.flush(); + fail("Should throw ProcessorStateException if store flush throws exception"); + } catch (final ProcessorStateException e) { + // pass + } + } + @Test public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws IOException { @@ -515,6 +545,76 @@ public void close() { } } + @Test + public void shouldFlushAllStoresEvenIfStoreThrowsExcepiton() throws IOException { + final ProcessorStateManager stateManager = new ProcessorStateManager( + taskId, + Collections.singleton(changelogTopicPartition), + false, + stateDirectory, + Collections.singletonMap(storeName, changelogTopic), + changelogReader, + false, + logContext); + + final AtomicBoolean flushedStore = new AtomicBoolean(false); + + final MockStateStoreSupplier.MockStateStore stateStore1 = new MockStateStoreSupplier.MockStateStore(storeName, true) { + @Override + public void flush() { + throw new RuntimeException("KABOOM!"); + } + }; + final MockStateStoreSupplier.MockStateStore stateStore2 = new MockStateStoreSupplier.MockStateStore(storeName + "2", true) { + @Override + public void flush() { + flushedStore.set(true); + } + }; + stateManager.register(stateStore1, false, stateStore1.stateRestoreCallback); + stateManager.register(stateStore2, false, stateStore2.stateRestoreCallback); + + try { + stateManager.flush(); + } catch (final ProcessorStateException expected) { /* ignode */ } + Assert.assertTrue(flushedStore.get()); + } + + @Test + public void shouldCloseAllStoresEvenIfStoreThrowsExcepiton() throws IOException { + final ProcessorStateManager stateManager = new ProcessorStateManager( + taskId, + Collections.singleton(changelogTopicPartition), + false, + stateDirectory, + Collections.singletonMap(storeName, changelogTopic), + changelogReader, + false, + logContext); + + final AtomicBoolean closedStore = new AtomicBoolean(false); + + final MockStateStoreSupplier.MockStateStore stateStore1 = new MockStateStoreSupplier.MockStateStore(storeName, true) { + @Override + public void close() { + throw new RuntimeException("KABOOM!"); + } + }; + final MockStateStoreSupplier.MockStateStore stateStore2 = new MockStateStoreSupplier.MockStateStore(storeName + "2", true) { + @Override + public void close() { + closedStore.set(true); + } + }; + stateManager.register(stateStore1, false, stateStore1.stateRestoreCallback); + stateManager.register(stateStore2, false, stateStore2.stateRestoreCallback); + + try { + stateManager.close(Collections.emptyMap()); + } catch (final ProcessorStateException expected) { /* ignode */ } + Assert.assertTrue(closedStore.get()); + } + @Test public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws IOException { checkpoint.write(Collections.emptyMap()); From a25548d2617a78ded2db339efaab718b9c5b0a20 Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Thu, 21 Sep 2017 16:38:10 -0700 Subject: [PATCH 3/4] Update FetcherMetricsRegistry.java --- .../kafka/clients/consumer/internals/FetcherMetricsRegistry.java | 1 + 1 file changed, 1 insertion(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java index 50733de1e73a..657f48b8de02 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java @@ -125,6 +125,7 @@ public FetcherMetricsRegistry(Set tags, String metricGrpPrefix) { "The average number of records consumed per second for a topic", topicTags); this.topicRecordsConsumedTotal = new MetricNameTemplate("records-consumed-total", groupName, "The total number of records consumed for a topic", topicTags); + /***** Partition level *****/ this.partitionRecordsLag = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName, "The latest lag of the partition", tags); From 8790f4f35a5407ac1eec82033871ed80822d8555 Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Thu, 21 Sep 2017 16:53:14 -0700 Subject: [PATCH 4/4] Additional rebase --- .../apache/kafka/clients/consumer/internals/Fetcher.java | 3 ++- .../clients/consumer/internals/FetcherMetricsRegistry.java | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 921e7de02c04..4f4c1727c90f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -1263,7 +1263,8 @@ private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry metricsRegis this.rawBytesFetched = metrics.sensor("raw-bytes-fetched"); this.rawBytesFetched.add(metrics.metricInstance(metricsRegistry.rawFetchSizeAvg), new Avg()); this.rawBytesFetched.add(metrics.metricInstance(metricsRegistry.rawFetchSizeMax), new Max()); - this.rawBytesFetched.add(metrics.metricInstance(metricsRegistry.rawBytesConsumedRate), new Rate()); + this.rawBytesFetched.add(new Meter(metrics.metricInstance(metricsRegistry.rawBytesConsumedRate), + metrics.metricInstance(metricsRegistry.rawBytesConsumedTotal))); this.bytesFetched = metrics.sensor("bytes-fetched"); this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeAvg), new Avg()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java index 657f48b8de02..0e23fdda62c1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java @@ -28,6 +28,7 @@ public class FetcherMetricsRegistry { public MetricNameTemplate rawFetchSizeAvg; public MetricNameTemplate rawFetchSizeMax; public MetricNameTemplate rawBytesConsumedRate; + public MetricNameTemplate rawBytesConsumedTotal; public MetricNameTemplate fetchSizeAvg; public MetricNameTemplate fetchSizeMax; public MetricNameTemplate bytesConsumedRate; @@ -72,6 +73,8 @@ public FetcherMetricsRegistry(Set tags, String metricGrpPrefix) { "The maximum number of raw bytes fetched per request", tags); this.rawBytesConsumedRate = new MetricNameTemplate("raw-bytes-consumed-rate", groupName, "The average number of raw bytes consumed per second", tags); + this.rawBytesConsumedTotal = new MetricNameTemplate("raw-bytes-consumed-total", groupName, + "The total number of raw bytes consumed", tags); this.fetchSizeAvg = new MetricNameTemplate("fetch-size-avg", groupName, "The average number of bytes fetched per request", tags); @@ -125,7 +128,7 @@ public FetcherMetricsRegistry(Set tags, String metricGrpPrefix) { "The average number of records consumed per second for a topic", topicTags); this.topicRecordsConsumedTotal = new MetricNameTemplate("records-consumed-total", groupName, "The total number of records consumed for a topic", topicTags); - + /***** Partition level *****/ this.partitionRecordsLag = new MetricNameTemplate("{topic}-{partition}.records-lag", groupName, "The latest lag of the partition", tags); @@ -142,6 +145,7 @@ public List getAllTemplates() { rawFetchSizeAvg, rawFetchSizeMax, rawBytesConsumedRate, + rawBytesConsumedTotal, fetchSizeAvg, fetchSizeMax, bytesConsumedRate,