Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace EasyMock and PowerMock with Mockito - TimeOrderedWindowStoreTest #12777

Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -53,7 +53,6 @@
import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -90,6 +89,12 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;


@RunWith(Parameterized.class)
public class TimeOrderedWindowStoreTest {
Expand All @@ -103,7 +108,7 @@ public class TimeOrderedWindowStoreTest {

private InternalMockProcessorContext context;
private RocksDBTimeOrderedWindowSegmentedBytesStore bytesStore;
private WindowStore<Bytes, byte[]> underlyingStore;
private RocksDBTimeOrderedWindowStore underlyingStore;
private TimeOrderedCachingWindowStore cachingStore;
private CacheFlushListenerStub<Windowed<String>, String> cacheListener;
private ThreadCache cache;
Expand Down Expand Up @@ -144,45 +149,38 @@ public void closeStore() {
@SuppressWarnings("deprecation")
@Test
public void shouldDelegateDeprecatedInit() {
final RocksDBTimeOrderedWindowStore inner = EasyMock.mock(RocksDBTimeOrderedWindowStore.class);
EasyMock.expect(inner.hasIndex()).andReturn(hasIndex);
EasyMock.replay(inner);
final RocksDBTimeOrderedWindowStore inner = mock(RocksDBTimeOrderedWindowStore.class);
when(inner.hasIndex()).thenReturn(hasIndex);
final TimeOrderedCachingWindowStore outer = new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);

EasyMock.reset(inner);
EasyMock.expect(inner.name()).andStubReturn("store");
inner.init((ProcessorContext) context, outer);
EasyMock.expectLastCall();
EasyMock.replay(inner);
reset(inner);
when(inner.name()).thenReturn("store");
outer.init((ProcessorContext) context, outer);
EasyMock.verify(inner);
verify(inner).init((ProcessorContext) context, outer);
}

@Test
public void shouldDelegateInit() {
final RocksDBTimeOrderedWindowStore inner = EasyMock.mock(RocksDBTimeOrderedWindowStore.class);
EasyMock.expect(inner.hasIndex()).andReturn(hasIndex);
EasyMock.replay(inner);
final RocksDBTimeOrderedWindowStore inner = mock(RocksDBTimeOrderedWindowStore.class);
when(inner.hasIndex()).thenReturn(hasIndex);
final TimeOrderedCachingWindowStore outer = new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);

EasyMock.reset(inner);
EasyMock.expect(inner.name()).andStubReturn("store");
inner.init((StateStoreContext) context, outer);
EasyMock.expectLastCall();
EasyMock.replay(inner);
reset(inner);
when(inner.name()).thenReturn("store");

outer.init((StateStoreContext) context, outer);
EasyMock.verify(inner);
verify(inner).init((StateStoreContext) context, outer);
}

@Test
public void shouldThrowIfWrongStore() {
final RocksDBTimestampedWindowStore innerWrong = EasyMock.mock(RocksDBTimestampedWindowStore.class);
final RocksDBTimestampedWindowStore innerWrong = mock(RocksDBTimestampedWindowStore.class);
final Exception e = assertThrows(IllegalArgumentException.class,
() -> new TimeOrderedCachingWindowStore(innerWrong, WINDOW_SIZE, SEGMENT_INTERVAL));
assertThat(e.getMessage(),
containsString("TimeOrderedCachingWindowStore only supports RocksDBTimeOrderedWindowStore backed store"));

final RocksDBTimeOrderedWindowStore inner = EasyMock.mock(RocksDBTimeOrderedWindowStore.class);
final RocksDBTimeOrderedWindowStore inner = mock(RocksDBTimeOrderedWindowStore.class);
// Nothing happens
new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
}
Expand Down Expand Up @@ -1166,58 +1164,43 @@ public void shouldNotThrowInvalidBackwardRangeExceptionWithNegativeFromKey() {
@Test
public void shouldCloseCacheAndWrappedStoreAfterErrorDuringCacheFlush() {
setUpCloseTests();
EasyMock.reset(cache);
cache.flush(CACHE_NAMESPACE);
EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on flush"));
cache.close(CACHE_NAMESPACE);
EasyMock.replay(cache);
EasyMock.reset(underlyingStore);
underlyingStore.close();
EasyMock.replay(underlyingStore);
doThrow(new RuntimeException(
"Simulating an error on flush"))
.when(cache).flush(CACHE_NAMESPACE);
reset(underlyingStore);

assertThrows(RuntimeException.class, cachingStore::close);
EasyMock.verify(cache, underlyingStore);
verifyAndTearDownCloseTests();
}

@Test
public void shouldCloseWrappedStoreAfterErrorDuringCacheClose() {
setUpCloseTests();
EasyMock.reset(cache);
cache.flush(CACHE_NAMESPACE);
cache.close(CACHE_NAMESPACE);
EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on close"));
EasyMock.replay(cache);
EasyMock.reset(underlyingStore);
underlyingStore.close();
EasyMock.replay(underlyingStore);
doThrow(new RuntimeException("Simulating an error on close"))
.when(cache).close(CACHE_NAMESPACE);

reset(underlyingStore);
shekhar-rajak marked this conversation as resolved.
Show resolved Hide resolved
assertThrows(RuntimeException.class, cachingStore::close);
EasyMock.verify(cache, underlyingStore);
verifyAndTearDownCloseTests();
}

@Test
public void shouldCloseCacheAfterErrorDuringStateStoreClose() {
setUpCloseTests();
EasyMock.reset(cache);
cache.flush(CACHE_NAMESPACE);
cache.close(CACHE_NAMESPACE);
EasyMock.replay(cache);
EasyMock.reset(underlyingStore);
underlyingStore.close();
EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on close"));
EasyMock.replay(underlyingStore);
doThrow(new RuntimeException("Simulating an error on close"))
.when(underlyingStore).close();

assertThrows(RuntimeException.class, cachingStore::close);
EasyMock.verify(cache, underlyingStore);
verifyAndTearDownCloseTests();
}

private void setUpCloseTests() {
underlyingStore = EasyMock.createNiceMock(RocksDBTimeOrderedWindowStore.class);
EasyMock.expect(underlyingStore.name()).andStubReturn("store-name");
EasyMock.expect(underlyingStore.isOpen()).andStubReturn(true);
EasyMock.replay(underlyingStore);
underlyingStore = mock(RocksDBTimeOrderedWindowStore.class);
when(underlyingStore.name()).thenReturn("store-name");
when(underlyingStore.isOpen()).thenReturn(true);

cachingStore = new TimeOrderedCachingWindowStore(underlyingStore, WINDOW_SIZE, SEGMENT_INTERVAL);
cache = EasyMock.createNiceMock(ThreadCache.class);
cache = mock(ThreadCache.class);
context = new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC, new RecordHeaders()));
cachingStore.init((StateStoreContext) context, cachingStore);
Expand All @@ -1242,4 +1225,15 @@ private int addItemsToCache() {
return i;
}

private void verifyAndTearDownCloseTests() {
verify(underlyingStore).close();
verify(cache).flush(CACHE_NAMESPACE);
verify(cache).close(CACHE_NAMESPACE);

// resets the mocks created in #setUpCloseTests(). It is necessary to
// ensure that @After works correctly.
reset(cache);
reset(underlyingStore);
}

}