From d21209556a1e4fec7bf6f9340239e03cc64e2437 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Thu, 26 Jan 2017 18:19:47 +0000 Subject: [PATCH 1/2] improve test coverage --- .../internals/GlobalStateManagerImplTest.java | 50 +++++++++++++------ 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index db51cefabf587..53e8e4fd6b436 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -68,6 +68,7 @@ public class GlobalStateManagerImplTest { private MockConsumer consumer; private File checkpointFile; private final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); + private ProcessorTopology topology; @Before public void before() throws IOException { @@ -80,12 +81,12 @@ public void before() throws IOException { storeToProcessorNode.put(store1, new MockProcessorNode(-1)); store2 = new NoOpReadOnlyStore("t2-store"); storeToProcessorNode.put(store2, new MockProcessorNode(-1)); - final ProcessorTopology topology = new ProcessorTopology(Collections.emptyList(), - Collections.emptyMap(), - Collections.emptyMap(), - Collections.emptyList(), - storeToTopic, - Arrays.asList(store1, store2)); + topology = new ProcessorTopology(Collections.emptyList(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyList(), + storeToTopic, + Arrays.asList(store1, store2)); context = new NoOpProcessorContext(); stateDirPath = TestUtils.tempDirectory().getPath(); @@ -107,7 +108,7 @@ public void shouldLockGlobalStateDirectory() throws Exception { } @Test(expected = LockException.class) - public void shouldThrowStreamsExceptionIfCantGetLock() throws Exception { + public void shouldThrowLockExceptionIfCantGetLock() throws Exception { final StateDirectory stateDir = new StateDirectory("appId", stateDirPath); try { stateDir.lockGlobalState(1); @@ -135,10 +136,7 @@ public void shouldDeleteCheckpointFileAfteLoaded() throws Exception { @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionIfFailedToReadCheckpointedOffsets() throws Exception { - final File checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME); - try (final FileOutputStream stream = new FileOutputStream(checkpointFile)) { - stream.write("0\n1\nblah".getBytes()); - } + writeCorruptCheckpoint(); stateManager.initialize(context); } @@ -361,10 +359,7 @@ public void close() { @Test public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws Exception { - final File checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME); - try (final FileOutputStream stream = new FileOutputStream(checkpointFile)) { - stream.write("0\n1\nblah".getBytes()); - } + writeCorruptCheckpoint(); try { stateManager.initialize(context); } catch (StreamsException e) { @@ -379,6 +374,31 @@ public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws Exceptio } } + @Test(expected = StreamsException.class) + public void shouldThrowStreamsExceptionIfFailedToReadCheckpoints() throws Exception { + writeCorruptCheckpoint(); + stateManager.initialize(context); + } + + @Test(expected = LockException.class) + public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() throws Exception { + stateManager = new GlobalStateManagerImpl(topology, consumer, new StateDirectory("appId", stateDirPath) { + @Override + public boolean lockGlobalState(final int retry) throws IOException { + throw new IOException("KABOOM!"); + } + }); + + stateManager.initialize(context); + } + + private void writeCorruptCheckpoint() throws IOException { + final File checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME); + try (final FileOutputStream stream = new FileOutputStream(checkpointFile)) { + stream.write("0\n1\nblah".getBytes()); + } + } + private void initializeConsumer(final long numRecords, final long startOffset, final TopicPartition topicPartition) { final HashMap startOffsets = new HashMap<>(); startOffsets.put(topicPartition, 1L); From 4d1b9e65601ef428092694567c9a21db8ab08725 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Thu, 2 Feb 2017 08:50:08 +0000 Subject: [PATCH 2/2] remove redundant test --- .../processor/internals/GlobalStateManagerImplTest.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 53e8e4fd6b436..8d896904418ed 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -374,11 +374,6 @@ public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws Exceptio } } - @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionIfFailedToReadCheckpoints() throws Exception { - writeCorruptCheckpoint(); - stateManager.initialize(context); - } @Test(expected = LockException.class) public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() throws Exception { @@ -395,7 +390,7 @@ public boolean lockGlobalState(final int retry) throws IOException { private void writeCorruptCheckpoint() throws IOException { final File checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME); try (final FileOutputStream stream = new FileOutputStream(checkpointFile)) { - stream.write("0\n1\nblah".getBytes()); + stream.write("0\n1\nfoo".getBytes()); } }