diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index 1a5c3197442ea..8bd7185139ae6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -381,6 +381,13 @@ public void close() { open = false; } + long numEntries() { + return endTimeMap.values().stream() + .flatMap(keyMap -> keyMap.values().stream()) + .mapToLong(Map::size) + .sum(); + } + private void removeExpiredSegments() { long minLiveTime = Math.max(0L, observedStreamTime - retentionPeriod + 1); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index c883576977d96..8d2228db5c3d5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -403,6 +403,12 @@ public void close() { open = false; } + long numEntries() { + return segmentMap.values().stream() + .mapToLong(Map::size) + .sum(); + } + private void removeExpiredSegments() { long minLiveTime = Math.max(0L, observedStreamTime - retentionPeriod + 1); for (final InMemoryWindowStoreIteratorWrapper it : openIterators) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 0535e2f89e95b..cd6fa23fa5311 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -177,6 +177,10 @@ private void registerMetrics() { } } ); + if (!persistent()) { + StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope, name(), streamsMetrics, + (config, now) -> wrapped().approximateNumEntries()); + } } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index d27095e19fd44..cf5e00d403bb5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -138,6 +138,24 @@ private void registerMetrics() { } } ); + if (!persistent()) { + StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope, name(), streamsMetrics, + (config, now) -> { + final InMemorySessionStore inMemoryStore = findInMemorySessionStore(wrapped()); + return inMemoryStore != null ? inMemoryStore.numEntries() : -1L; + } + ); + } + } + + private static InMemorySessionStore findInMemorySessionStore(final StateStore store) { + if (store instanceof InMemorySessionStore) { + return (InMemorySessionStore) store; + } else if (store instanceof WrappedStateStore) { + return findInMemorySessionStore(((WrappedStateStore) store).wrapped()); + } else { + return null; + } } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index db016071511e5..2658772b7bded 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -158,6 +158,24 @@ private void registerMetrics() { } } ); + if (!persistent()) { + StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope, name(), streamsMetrics, + (config, now) -> { + final InMemoryWindowStore inMemoryStore = findInMemoryWindowStore(wrapped()); + return inMemoryStore != null ? inMemoryStore.numEntries() : -1L; + } + ); + } + } + + private static InMemoryWindowStore findInMemoryWindowStore(final StateStore store) { + if (store instanceof InMemoryWindowStore) { + return (InMemoryWindowStore) store; + } else if (store instanceof WrappedStateStore) { + return findInMemoryWindowStore(((WrappedStateStore) store).wrapped()); + } else { + return null; + } } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java index cfaece063e479..356d26bbe9715 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java @@ -158,6 +158,10 @@ private StateStoreMetrics() {} private static final String ITERATOR_DURATION_MAX_DESCRIPTION = MAX_DESCRIPTION_PREFIX + ITERATOR_DURATION_DESCRIPTION; + private static final String NUM_KEYS = "num-keys"; + private static final String NUM_KEYS_DESCRIPTION = + "The current number of keys in the in-memory state store"; + private static final String OLDEST_ITERATOR_OPEN_SINCE_MS = "oldest-iterator-open-since-ms"; private static final String OLDEST_ITERATOR_OPEN_SINCE_MS_DESCRIPTION = "The UNIX timestamp the oldest still open iterator was created, in milliseconds"; @@ -515,4 +519,20 @@ private static Sensor throughputAndLatencySensor(final String taskId, ); return sensor; } + + public static void addNumKeysGauge(final String taskId, + final String storeType, + final String storeName, + final StreamsMetricsImpl streamsMetrics, + final Gauge numKeysGauge) { + streamsMetrics.addStoreLevelMutableMetric( + taskId, + storeType, + storeName, + NUM_KEYS, + NUM_KEYS_DESCRIPTION, + RecordingLevel.INFO, + numKeysGauge + ); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java index 4769fde37e76a..eca2fa4502fe9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.state.KeyValueIterator; @@ -35,17 +36,78 @@ StoreType storeType() { return StoreType.InMemoryStore; } + @Test + public void shouldCountNumEntries() { + final InMemorySessionStore store = new InMemorySessionStore("test", RETENTION_PERIOD, "scope"); + store.init(context, store); + + assertEquals(0L, store.numEntries()); + + store.put( + new Windowed<>( + Bytes.wrap("a".getBytes()), + new SessionWindow(0, 0) + ), + "1".getBytes() + ); + assertEquals(1L, store.numEntries()); + + store.put( + new Windowed<>( + Bytes.wrap("b".getBytes()), + new SessionWindow(0, 10) + ), + "2".getBytes() + ); + assertEquals(2L, store.numEntries()); + + store.put( + new Windowed<>( + Bytes.wrap("a".getBytes()), + new SessionWindow(5, 15) + ), + "3".getBytes() + ); + assertEquals(3L, store.numEntries()); + + // remove one entry + store.remove( + new Windowed<>( + Bytes.wrap("a".getBytes()), + new SessionWindow(0, 0) + ) + ); + assertEquals(2L, store.numEntries()); + + store.close(); + } + @Test public void shouldNotExpireFromOpenIterator() { - sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); - sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L); - sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L); + sessionStore.put( + new Windowed<>("a", new SessionWindow(0, 0)), + 1L + ); + sessionStore.put( + new Windowed<>("aa", new SessionWindow(0, 10)), + 2L + ); + sessionStore.put( + new Windowed<>("a", new SessionWindow(10, 20)), + 3L + ); final KeyValueIterator, Long> iterator = sessionStore.findSessions("a", "b", 0L, RETENTION_PERIOD); // Advance stream time to expire the first three record - sessionStore.put(new Windowed<>("aa", new SessionWindow(100, 2 * RETENTION_PERIOD)), 4L); + sessionStore.put( + new Windowed<>( + "aa", + new SessionWindow(100, 2 * RETENTION_PERIOD) + ), + 4L + ); assertEquals(Set.of(1L, 2L, 3L, 4L), valuesToSet(iterator)); assertFalse(iterator.hasNext()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java index b39449cca3a5e..cdc23218e7305 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java @@ -84,6 +84,53 @@ WindowStore buildWindowStore(final long retentionPeriod, .build(); } + @Test + public void shouldCountNumEntries() { + final InMemoryWindowStore store = new InMemoryWindowStore("test", RETENTION_PERIOD, WINDOW_SIZE, false, "scope"); + store.init(context, store); + + assertEquals(0L, store.numEntries()); + + store.put( + Bytes.wrap("a".getBytes()), + "1".getBytes(), + 0L + ); + assertEquals(1L, store.numEntries()); + + store.put( + Bytes.wrap("b".getBytes()), + "2".getBytes(), + 0L + ); + assertEquals(2L, store.numEntries()); + + store.put( + Bytes.wrap("a".getBytes()), + "3".getBytes(), + 10L + ); + assertEquals(3L, store.numEntries()); + + // overwrite existing entry (same key, same timestamp) + store.put( + Bytes.wrap("a".getBytes()), + "4".getBytes(), + 0L + ); + assertEquals(3L, store.numEntries()); + + // delete entry by putting null + store.put( + Bytes.wrap("b".getBytes()), + null, + 0L + ); + assertEquals(2L, store.numEntries()); + + store.close(); + } + @SuppressWarnings("unchecked") @Test public void shouldRestore() { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index f4a9efa08004a..69f9f71003990 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -470,6 +470,17 @@ public void shouldGetRecordsWithPrefixKey() { assertTrue((Double) metric.metricValue() > 0); } + @Test + public void shouldTrackNumKeysMetric() { + setUp(); + when(inner.approximateNumEntries()).thenReturn(42L); + init(); + + final KafkaMetric numKeysMetric = metric("num-keys"); + assertThat(numKeysMetric, not(nullValue())); + assertThat((Long) numKeysMetric.metricValue(), equalTo(42L)); + } + @SuppressWarnings("unused") @Test public void shouldTrackOpenIteratorsMetric() { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index 06023d6256ae6..f3ca4cf3786a8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -655,6 +655,17 @@ public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() { assertThat(storeMetrics(), empty()); } + @Test + public void shouldTrackNumKeysMetric() { + setUp(); + init(); + + final KafkaMetric numKeysMetric = metric("num-keys"); + assertThat(numKeysMetric, not(nullValue())); + // inner store is a mock (not InMemorySessionStore), so returns -1 + assertThat((Long) numKeysMetric.metricValue(), equalTo(-1L)); + } + @SuppressWarnings("unused") @Test public void shouldTrackOpenIteratorsMetric() { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 2538873396f63..4af34c9bd8d4a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -442,6 +442,16 @@ public void shouldThrowNullPointerOnBackwardFetchIfKeyIsNull() { assertThrows(NullPointerException.class, () -> store.backwardFetch(null, 0L, 1L)); } + @Test + public void shouldTrackNumKeysMetric() { + store.init(context, store); + + final KafkaMetric numKeysMetric = metric("num-keys"); + assertThat(numKeysMetric, not(nullValue())); + // inner store is a mock (not InMemoryWindowStore), so returns -1 + assertThat((Long) numKeysMetric.metricValue(), equalTo(-1L)); + } + @SuppressWarnings("unused") @Test public void shouldTrackOpenIteratorsMetric() { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java index 3b814db667945..151d78da06ca9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java @@ -300,6 +300,24 @@ public void shouldGetSuppressionBufferSizeSensor() { ); } + @Test + public void shouldAddNumKeysGauge() { + @SuppressWarnings("unchecked") + final org.apache.kafka.common.metrics.Gauge gauge = mock(org.apache.kafka.common.metrics.Gauge.class); + + StateStoreMetrics.addNumKeysGauge(TASK_ID, STORE_TYPE, STORE_NAME, streamsMetrics, gauge); + + org.mockito.Mockito.verify(streamsMetrics).addStoreLevelMutableMetric( + TASK_ID, + STORE_TYPE, + STORE_NAME, + "num-keys", + "The current number of keys in the in-memory state store", + RecordingLevel.INFO, + gauge + ); + } + @Test public void shouldGetRecordE2ELatencySensor() { final String metricName = "record-e2e-latency";