From 16a0772f39a91195941cdb4be9e4639bdb20c84e Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Thu, 22 Jun 2017 16:30:48 +0100 Subject: [PATCH] prevent creation of window store with < 2 segments --- .../apache/kafka/streams/state/Stores.java | 5 +++- .../internals/RocksDBWindowStoreSupplier.java | 5 +++- .../kafka/streams/state/StoresTest.java | 15 ++++++++++ .../RocksDBWindowStoreSupplierTest.java | 29 ++++++++++++------- 4 files changed, 41 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 86ee1d25b9477..fef4ade5a5e1d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -102,7 +102,7 @@ public StateStoreSupplier build() { @Override public PersistentKeyValueFactory persistent() { return new PersistentKeyValueFactory() { - public boolean cachingEnabled; + boolean cachingEnabled; private long windowSize; private final Map logConfig = new HashMap<>(); private int numSegments = 0; @@ -113,6 +113,9 @@ public PersistentKeyValueFactory persistent() { @Override public PersistentKeyValueFactory windowed(final long windowSize, final long retentionPeriod, final int numSegments, final boolean retainDuplicates) { + if (numSegments < RocksDBWindowStoreSupplier.MIN_SEGMENTS) { + throw new IllegalArgumentException("numSegments must be >= " + RocksDBWindowStoreSupplier.MIN_SEGMENTS); + } this.windowSize = windowSize; this.numSegments = numSegments; this.retentionPeriod = retentionPeriod; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java index b1e0b0296db38..e19d09fde84c5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java @@ -33,7 +33,7 @@ */ public class RocksDBWindowStoreSupplier extends AbstractStoreSupplier implements WindowStoreSupplier { - + public static final int MIN_SEGMENTS = 2; private final long retentionPeriod; private final boolean retainDuplicates; private final int numSegments; @@ -47,6 +47,9 @@ public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegm public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde keySerde, Serde valueSerde, Time time, long windowSize, boolean logged, Map logConfig, boolean enableCaching) { super(name, keySerde, valueSerde, time, logged, logConfig); + if (numSegments < MIN_SEGMENTS) { + throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS); + } this.retentionPeriod = retentionPeriod; this.retainDuplicates = retainDuplicates; this.numSegments = numSegments; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java index c2aa14d4f3f46..66adbf52cd80f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class StoresTest { @@ -80,4 +81,18 @@ public void shouldCreatePersistenStoreSupplierNotLogged() throws Exception { assertFalse(supplier.loggingEnabled()); } + + @Test + public void shouldThrowIllegalArgumentExceptionWhenTryingToConstructWindowStoreWithLessThanTwoSegments() throws Exception { + final Stores.PersistentKeyValueFactory storeFactory = Stores.create("store") + .withKeys(Serdes.String()) + .withValues(Serdes.String()) + .persistent(); + try { + storeFactory.windowed(1, 1, 1, false); + fail("Should have thrown illegal argument exception as number of segments is less than 2"); + } catch (final IllegalArgumentException e) { + // ok + } + } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java index c9301a1486892..77fe8ee45fe37 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java @@ -53,12 +53,14 @@ public class RocksDBWindowStoreSupplierTest { @After public void close() { context.close(); - store.close(); + if (store != null) { + store.close(); + } } @Test public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() throws Exception { - store = createStore(true, false); + store = createStore(true, false, 3); final List logged = new ArrayList<>(); final NoOpRecordCollector collector = new NoOpRecordCollector() { @Override @@ -85,7 +87,7 @@ public void send(final String topic, @Test public void shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled() throws Exception { - store = createStore(false, false); + store = createStore(false, false, 3); final List logged = new ArrayList<>(); final NoOpRecordCollector collector = new NoOpRecordCollector() { @Override @@ -112,7 +114,7 @@ public void send(final String topic, @Test public void shouldBeCachedWindowStoreWhenCachingEnabled() throws Exception { - store = createStore(false, true); + store = createStore(false, true, 3); store.init(context, store); context.setTime(1); store.put("a", "b"); @@ -123,20 +125,20 @@ public void shouldBeCachedWindowStoreWhenCachingEnabled() throws Exception { @Test public void shouldReturnRocksDbStoreWhenCachingAndLoggingDisabled() throws Exception { - store = createStore(false, false); + store = createStore(false, false, 3); assertThat(store, is(instanceOf(RocksDBWindowStore.class))); } @Test public void shouldReturnRocksDbStoreWhenCachingDisabled() throws Exception { - store = createStore(true, false); + store = createStore(true, false, 3); assertThat(store, is(instanceOf(RocksDBWindowStore.class))); } @SuppressWarnings("unchecked") @Test public void shouldHaveMeteredStoreWhenCached() throws Exception { - store = createStore(false, true); + store = createStore(false, true, 3); store.init(context, store); final StreamsMetrics metrics = context.metrics(); assertFalse(metrics.metrics().isEmpty()); @@ -145,7 +147,7 @@ public void shouldHaveMeteredStoreWhenCached() throws Exception { @SuppressWarnings("unchecked") @Test public void shouldHaveMeteredStoreWhenLogged() throws Exception { - store = createStore(true, false); + store = createStore(true, false, 3); store.init(context, store); final StreamsMetrics metrics = context.metrics(); assertFalse(metrics.metrics().isEmpty()); @@ -154,17 +156,22 @@ public void shouldHaveMeteredStoreWhenLogged() throws Exception { @SuppressWarnings("unchecked") @Test public void shouldHaveMeteredStoreWhenNotLoggedOrCached() throws Exception { - store = createStore(false, false); + store = createStore(false, false, 3); store.init(context, store); final StreamsMetrics metrics = context.metrics(); assertFalse(metrics.metrics().isEmpty()); } + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIllegalArgumentExceptionIfNumSegmentsLessThanTwo() throws Exception { + createStore(true, true, 1); + } + @SuppressWarnings("unchecked") - private WindowStore createStore(final boolean logged, final boolean cached) { + private WindowStore createStore(final boolean logged, final boolean cached, final int numSegments) { return new RocksDBWindowStoreSupplier<>(STORE_NAME, 10, - 3, + numSegments, false, Serdes.String(), Serdes.String(),