Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,13 @@ public void close() {
open = false;
}

long numEntries() {
return endTimeMap.values().stream()
.flatMap(keyMap -> keyMap.values().stream())
.mapToLong(Map::size)
.sum();
}

private void removeExpiredSegments() {
long minLiveTime = Math.max(0L, observedStreamTime - retentionPeriod + 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,12 @@ public void close() {
open = false;
}

long numEntries() {
return segmentMap.values().stream()
.mapToLong(Map::size)
.sum();
}

private void removeExpiredSegments() {
long minLiveTime = Math.max(0L, observedStreamTime - retentionPeriod + 1);
for (final InMemoryWindowStoreIteratorWrapper it : openIterators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ private void registerMetrics() {
}
}
);
if (!persistent()) {
StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> wrapped().approximateNumEntries());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,24 @@ private void registerMetrics() {
}
}
);
if (!persistent()) {
StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> {
final InMemorySessionStore inMemoryStore = findInMemorySessionStore(wrapped());
return inMemoryStore != null ? inMemoryStore.numEntries() : -1L;
}
);
}
}

private static InMemorySessionStore findInMemorySessionStore(final StateStore store) {
if (store instanceof InMemorySessionStore) {
return (InMemorySessionStore) store;
} else if (store instanceof WrappedStateStore) {
return findInMemorySessionStore(((WrappedStateStore<?, ?, ?>) store).wrapped());
} else {
return null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,24 @@ private void registerMetrics() {
}
}
);
if (!persistent()) {
StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> {
final InMemoryWindowStore inMemoryStore = findInMemoryWindowStore(wrapped());
return inMemoryStore != null ? inMemoryStore.numEntries() : -1L;
}
);
}
}

private static InMemoryWindowStore findInMemoryWindowStore(final StateStore store) {
if (store instanceof InMemoryWindowStore) {
return (InMemoryWindowStore) store;
} else if (store instanceof WrappedStateStore) {
return findInMemoryWindowStore(((WrappedStateStore<?, ?, ?>) store).wrapped());
} else {
return null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ private StateStoreMetrics() {}
private static final String ITERATOR_DURATION_MAX_DESCRIPTION =
MAX_DESCRIPTION_PREFIX + ITERATOR_DURATION_DESCRIPTION;

private static final String NUM_KEYS = "num-keys";
private static final String NUM_KEYS_DESCRIPTION =
"The current number of keys in the in-memory state store";

private static final String OLDEST_ITERATOR_OPEN_SINCE_MS = "oldest-iterator-open-since-ms";
private static final String OLDEST_ITERATOR_OPEN_SINCE_MS_DESCRIPTION =
"The UNIX timestamp the oldest still open iterator was created, in milliseconds";
Expand Down Expand Up @@ -515,4 +519,20 @@ private static Sensor throughputAndLatencySensor(final String taskId,
);
return sensor;
}

public static void addNumKeysGauge(final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics,
final Gauge<Long> numKeysGauge) {
streamsMetrics.addStoreLevelMutableMetric(
taskId,
storeType,
storeName,
NUM_KEYS,
NUM_KEYS_DESCRIPTION,
RecordingLevel.INFO,
numKeysGauge
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
Expand All @@ -35,17 +36,78 @@ StoreType storeType() {
return StoreType.InMemoryStore;
}

@Test
public void shouldCountNumEntries() {
final InMemorySessionStore store = new InMemorySessionStore("test", RETENTION_PERIOD, "scope");
store.init(context, store);

assertEquals(0L, store.numEntries());

store.put(
new Windowed<>(
Bytes.wrap("a".getBytes()),
new SessionWindow(0, 0)
),
"1".getBytes()
);
assertEquals(1L, store.numEntries());

store.put(
new Windowed<>(
Bytes.wrap("b".getBytes()),
new SessionWindow(0, 10)
),
"2".getBytes()
);
assertEquals(2L, store.numEntries());

store.put(
new Windowed<>(
Bytes.wrap("a".getBytes()),
new SessionWindow(5, 15)
),
"3".getBytes()
);
assertEquals(3L, store.numEntries());

// remove one entry
store.remove(
new Windowed<>(
Bytes.wrap("a".getBytes()),
new SessionWindow(0, 0)
)
);
assertEquals(2L, store.numEntries());

store.close();
}

@Test
public void shouldNotExpireFromOpenIterator() {

sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L);
sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L);
sessionStore.put(
new Windowed<>("a", new SessionWindow(0, 0)),
1L
);
sessionStore.put(
new Windowed<>("aa", new SessionWindow(0, 10)),
2L
);
sessionStore.put(
new Windowed<>("a", new SessionWindow(10, 20)),
3L
);

final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("a", "b", 0L, RETENTION_PERIOD);

// Advance stream time to expire the first three record
sessionStore.put(new Windowed<>("aa", new SessionWindow(100, 2 * RETENTION_PERIOD)), 4L);
sessionStore.put(
new Windowed<>(
"aa",
new SessionWindow(100, 2 * RETENTION_PERIOD)
),
4L
);

assertEquals(Set.of(1L, 2L, 3L, 4L), valuesToSet(iterator));
assertFalse(iterator.hasNext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,53 @@ <K, V> WindowStore<K, V> buildWindowStore(final long retentionPeriod,
.build();
}

@Test
public void shouldCountNumEntries() {
final InMemoryWindowStore store = new InMemoryWindowStore("test", RETENTION_PERIOD, WINDOW_SIZE, false, "scope");
store.init(context, store);

assertEquals(0L, store.numEntries());

store.put(
Bytes.wrap("a".getBytes()),
"1".getBytes(),
0L
);
assertEquals(1L, store.numEntries());

store.put(
Bytes.wrap("b".getBytes()),
"2".getBytes(),
0L
);
assertEquals(2L, store.numEntries());

store.put(
Bytes.wrap("a".getBytes()),
"3".getBytes(),
10L
);
assertEquals(3L, store.numEntries());

// overwrite existing entry (same key, same timestamp)
store.put(
Bytes.wrap("a".getBytes()),
"4".getBytes(),
0L
);
assertEquals(3L, store.numEntries());

// delete entry by putting null
store.put(
Bytes.wrap("b".getBytes()),
null,
0L
);
assertEquals(2L, store.numEntries());

store.close();
}

@SuppressWarnings("unchecked")
@Test
public void shouldRestore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,17 @@ public void shouldGetRecordsWithPrefixKey() {
assertTrue((Double) metric.metricValue() > 0);
}

@Test
public void shouldTrackNumKeysMetric() {
setUp();
when(inner.approximateNumEntries()).thenReturn(42L);
init();

final KafkaMetric numKeysMetric = metric("num-keys");
assertThat(numKeysMetric, not(nullValue()));
assertThat((Long) numKeysMetric.metricValue(), equalTo(42L));
}

@SuppressWarnings("unused")
@Test
public void shouldTrackOpenIteratorsMetric() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,17 @@ public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() {
assertThat(storeMetrics(), empty());
}

@Test
public void shouldTrackNumKeysMetric() {
setUp();
init();

final KafkaMetric numKeysMetric = metric("num-keys");
assertThat(numKeysMetric, not(nullValue()));
// inner store is a mock (not InMemorySessionStore), so returns -1
assertThat((Long) numKeysMetric.metricValue(), equalTo(-1L));
}

@SuppressWarnings("unused")
@Test
public void shouldTrackOpenIteratorsMetric() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,16 @@ public void shouldThrowNullPointerOnBackwardFetchIfKeyIsNull() {
assertThrows(NullPointerException.class, () -> store.backwardFetch(null, 0L, 1L));
}

@Test
public void shouldTrackNumKeysMetric() {
store.init(context, store);

final KafkaMetric numKeysMetric = metric("num-keys");
assertThat(numKeysMetric, not(nullValue()));
// inner store is a mock (not InMemoryWindowStore), so returns -1
assertThat((Long) numKeysMetric.metricValue(), equalTo(-1L));
}

@SuppressWarnings("unused")
@Test
public void shouldTrackOpenIteratorsMetric() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,24 @@ public void shouldGetSuppressionBufferSizeSensor() {
);
}

@Test
public void shouldAddNumKeysGauge() {
@SuppressWarnings("unchecked")
final org.apache.kafka.common.metrics.Gauge<Long> gauge = mock(org.apache.kafka.common.metrics.Gauge.class);

StateStoreMetrics.addNumKeysGauge(TASK_ID, STORE_TYPE, STORE_NAME, streamsMetrics, gauge);

org.mockito.Mockito.verify(streamsMetrics).addStoreLevelMutableMetric(
TASK_ID,
STORE_TYPE,
STORE_NAME,
"num-keys",
"The current number of keys in the in-memory state store",
RecordingLevel.INFO,
gauge
);
}

@Test
public void shouldGetRecordE2ELatencySensor() {
final String metricName = "record-e2e-latency";
Expand Down
Loading