From ee279efe38d42077de93713694101dcdb625fced Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Tue, 27 Sep 2016 09:45:40 +0100 Subject: [PATCH] make sure ProcessorStateManager can handle State Stores that are not backed by a topic and have logging disabled --- .../processor/internals/ProcessorStateManagerTest.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 5802b29113f58..7c2220227ae86 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -48,6 +48,7 @@ import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; @@ -461,4 +462,12 @@ public void testClose() throws IOException { assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition(persistentStoreTopicName, 1))); } + @Test + public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception { + MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, null, Collections.emptyMap()); + stateMgr.register(mockStateStore, false, mockStateStore.stateRestoreCallback); + assertNotNull(stateMgr.getStore(nonPersistentStoreName)); + } + }