From 452296733bb9bf0211942c38835d68cff5fccc73 Mon Sep 17 00:00:00 2001 From: bachmanity1 Date: Sat, 5 Aug 2023 00:15:35 +0900 Subject: [PATCH 01/12] remove easymock & powermock --- .../kafka/connect/util/KafkaBasedLogTest.java | 84 ++++++------------- 1 file changed, 26 insertions(+), 58 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index e1c7e6dd5db4..717a3eed8ce0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -43,12 +43,9 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; import org.powermock.api.easymock.PowerMock; -import org.powermock.api.easymock.annotation.Mock; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.reflect.Whitebox; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -67,15 +64,18 @@ import java.util.function.Supplier; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(KafkaBasedLog.class) -@PowerMockIgnore("javax.management.*") +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; + +@RunWith(MockitoJUnitRunner.class) public class KafkaBasedLogTest { private static final String TOPIC = "connect-log"; @@ -132,11 +132,9 @@ public class KafkaBasedLogTest { records.add(record); }; - @SuppressWarnings("unchecked") @Before public void setUp() { - store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, - TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, initializer); + store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, initializer)); consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1)); Map beginningOffsets = new HashMap<>(); @@ -150,8 +148,6 @@ public void testStartStop() throws Exception { expectStart(); expectStop(); - PowerMock.replayAll(); - Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); @@ -160,10 +156,7 @@ public void testStartStop() throws Exception { assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); store.stop(); - - assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); assertTrue(consumer.closed()); - PowerMock.verifyAll(); } @Test @@ -171,8 +164,6 @@ public void testReloadOnStart() throws Exception { expectStart(); expectStop(); - PowerMock.replayAll(); - Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 1L); endOffsets.put(TP1, 1L); @@ -206,10 +197,8 @@ public void testReloadOnStart() throws Exception { assertEquals(TP1_VALUE, consumedRecords.get(TP1).get(0).value()); store.stop(); - - assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); + //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); assertTrue(consumer.closed()); - PowerMock.verifyAll(); } @Test @@ -217,8 +206,6 @@ public void testReloadOnStartWithNoNewRecordsPresent() throws Exception { expectStart(); expectStop(); - PowerMock.replayAll(); - Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 7L); endOffsets.put(TP1, 7L); @@ -241,9 +228,8 @@ public void testReloadOnStartWithNoNewRecordsPresent() throws Exception { store.stop(); - assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); + //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); assertTrue(consumer.closed()); - PowerMock.verifyAll(); } @Test @@ -264,8 +250,6 @@ public void testSendAndReadToEnd() throws Exception { expectStop(); - PowerMock.replayAll(); - Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); @@ -335,9 +319,8 @@ public void testSendAndReadToEnd() throws Exception { // Cleanup store.stop(); - assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); + //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); assertTrue(consumer.closed()); - PowerMock.verifyAll(); } @Test @@ -345,8 +328,6 @@ public void testPollConsumerError() throws Exception { expectStart(); expectStop(); - PowerMock.replayAll(); - final CountDownLatch finishedLatch = new CountDownLatch(1); Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 1L); @@ -376,9 +357,8 @@ public void testPollConsumerError() throws Exception { store.stop(); - assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); + //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); assertTrue(consumer.closed()); - PowerMock.verifyAll(); } @Test @@ -387,11 +367,10 @@ public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception { // Producer flushes when read to log end is called producer.flush(); - PowerMock.expectLastCall(); + // PowerMock.expectLastCall(); expectStop(); - PowerMock.replayAll(); final CountDownLatch finishedLatch = new CountDownLatch(1); Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); @@ -433,9 +412,8 @@ public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception { store.stop(); - assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); + //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); assertTrue(consumer.closed()); - PowerMock.verifyAll(); } @Test @@ -471,7 +449,7 @@ public void testProducerError() throws Exception { store.stop(); - assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); + //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); assertTrue(consumer.closed()); PowerMock.verifyAll(); } @@ -491,8 +469,6 @@ public void testReadEndOffsetsUsingAdmin() throws Exception { admin.endOffsets(EasyMock.eq(tps)); PowerMock.expectLastCall().andReturn(endOffsets).times(1); - PowerMock.replayAll(); - store.start(); assertEquals(endOffsets, store.readEndOffsets(tps, false)); } @@ -531,43 +507,35 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); // Getting end offsets upon startup should work fine - admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong()); - PowerMock.expectLastCall().andReturn(endOffsets).times(1); + when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenReturn(endOffsets); // Getting end offsets using the admin client should fail with leader not available - admin.endOffsets(EasyMock.eq(tps)); - PowerMock.expectLastCall().andThrow(new LeaderNotAvailableException("retry")); - - PowerMock.replayAll(); + when(admin.endOffsets(eq(tps))).thenThrow(new LeaderNotAvailableException("retry")); store.start(); assertThrows(LeaderNotAvailableException.class, () -> store.readEndOffsets(tps, false)); } - @SuppressWarnings("unchecked") private void setupWithAdmin() { Supplier adminSupplier = () -> admin; java.util.function.Consumer initializer = admin -> { }; - store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"}, - TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer); + store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer)); } - private void expectProducerAndConsumerCreate() throws Exception { - PowerMock.expectPrivate(store, "createProducer") - .andReturn(producer); - PowerMock.expectPrivate(store, "createConsumer") - .andReturn(consumer); + private void expectProducerAndConsumerCreate() { + doReturn(producer).when(store).createProducer(); + doReturn(consumer).when(store).createConsumer(); } private void expectStart() throws Exception { initializer.run(); - EasyMock.expectLastCall().times(1); + // EasyMock.expectLastCall().times(1); expectProducerAndConsumerCreate(); } private void expectStop() { producer.close(); - PowerMock.expectLastCall(); + // PowerMock.expectLastCall(); // MockConsumer close is checked after test. } From 675731e79a310ab430b66b84a72eec5d8d9bb3aa Mon Sep 17 00:00:00 2001 From: bachmanity1 Date: Sat, 5 Aug 2023 00:44:56 +0900 Subject: [PATCH 02/12] almost done --- .../kafka/connect/util/KafkaBasedLogTest.java | 39 +++++++------------ 1 file changed, 14 insertions(+), 25 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index 717a3eed8ce0..4a8d83376fa2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -38,14 +38,12 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.MockTime; -import org.easymock.Capture; -import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import org.powermock.api.easymock.PowerMock; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -114,7 +112,7 @@ public class KafkaBasedLogTest { private static final String TP0_VALUE_NEW = "VAL0_NEW"; private static final String TP1_VALUE_NEW = "VAL1_NEW"; - private Time time = new MockTime(); + private final Time time = new MockTime(); private KafkaBasedLog store; @Mock @@ -125,8 +123,8 @@ public class KafkaBasedLogTest { @Mock private TopicAdmin admin; - private Map>> consumedRecords = new HashMap<>(); - private Callback> consumedCallback = (error, record) -> { + private final Map>> consumedRecords = new HashMap<>(); + private final Callback> consumedCallback = (error, record) -> { TopicPartition partition = new TopicPartition(record.topic(), record.partition()); List> records = consumedRecords.computeIfAbsent(partition, k -> new ArrayList<>()); records.add(record); @@ -237,17 +235,16 @@ public void testSendAndReadToEnd() throws Exception { expectStart(); TestFuture tp0Future = new TestFuture<>(); ProducerRecord tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE); - Capture callback0 = EasyMock.newCapture(); - EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future); + ArgumentCaptor callback0 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class); + when(producer.send(eq(tp0Record), callback0.capture())).thenReturn(tp0Future); TestFuture tp1Future = new TestFuture<>(); ProducerRecord tp1Record = new ProducerRecord<>(TOPIC, TP1_KEY, TP1_VALUE); - Capture callback1 = EasyMock.newCapture(); - EasyMock.expect(producer.send(EasyMock.eq(tp1Record), EasyMock.capture(callback1))).andReturn(tp1Future); + ArgumentCaptor callback1 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class); + when(producer.send(eq(tp1Record), callback1.capture())).thenReturn(tp1Future); // Producer flushes when read to log end is called producer.flush(); - PowerMock.expectLastCall(); - + // PowerMock.expectLastCall(); expectStop(); Map endOffsets = new HashMap<>(); @@ -421,13 +418,11 @@ public void testProducerError() throws Exception { expectStart(); TestFuture tp0Future = new TestFuture<>(); ProducerRecord tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE); - Capture callback0 = EasyMock.newCapture(); - EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future); + ArgumentCaptor callback0 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class); + when(producer.send(eq(tp0Record), callback0.capture())).thenReturn(tp0Future); expectStop(); - PowerMock.replayAll(); - Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); @@ -451,7 +446,6 @@ public void testProducerError() throws Exception { //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); assertTrue(consumer.closed()); - PowerMock.verifyAll(); } @Test @@ -464,10 +458,8 @@ public void testReadEndOffsetsUsingAdmin() throws Exception { Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); - admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong()); - PowerMock.expectLastCall().andReturn(endOffsets).times(1); - admin.endOffsets(EasyMock.eq(tps)); - PowerMock.expectLastCall().andReturn(endOffsets).times(1); + when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenReturn(endOffsets); + when(admin.endOffsets(eq(tps))).thenReturn(endOffsets); store.start(); assertEquals(endOffsets, store.readEndOffsets(tps, false)); @@ -481,8 +473,7 @@ public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() throws Except Set tps = new HashSet<>(Arrays.asList(TP0, TP1)); // Getting end offsets using the admin client should fail with unsupported version - admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong()); - PowerMock.expectLastCall().andThrow(new UnsupportedVersionException("too old")); + when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenThrow(new UnsupportedVersionException("too old")); // Falls back to the consumer Map endOffsets = new HashMap<>(); @@ -490,8 +481,6 @@ public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() throws Except endOffsets.put(TP1, 0L); consumer.updateEndOffsets(endOffsets); - PowerMock.replayAll(); - store.start(); assertEquals(endOffsets, store.readEndOffsets(tps, false)); } From 0f194a011ea3f6cdc66069d6e0f775da4e358994 Mon Sep 17 00:00:00 2001 From: bachmanity1 Date: Sat, 5 Aug 2023 00:53:53 +0900 Subject: [PATCH 03/12] only whitebox references left --- .../kafka/connect/util/KafkaBasedLogTest.java | 55 +++++++------------ 1 file changed, 20 insertions(+), 35 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index 4a8d83376fa2..388200f70225 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -72,6 +72,7 @@ import static org.mockito.Mockito.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.verify; @RunWith(MockitoJUnitRunner.class) public class KafkaBasedLogTest { @@ -143,8 +144,7 @@ public void setUp() { @Test public void testStartStop() throws Exception { - expectStart(); - expectStop(); + expectProducerAndConsumerCreate(); Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); @@ -155,12 +155,12 @@ public void testStartStop() throws Exception { store.stop(); assertTrue(consumer.closed()); + verifyStartAndStop(); } @Test public void testReloadOnStart() throws Exception { - expectStart(); - expectStop(); + expectProducerAndConsumerCreate(); Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 1L); @@ -197,12 +197,12 @@ public void testReloadOnStart() throws Exception { store.stop(); //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); assertTrue(consumer.closed()); + verifyStartAndStop(); } @Test public void testReloadOnStartWithNoNewRecordsPresent() throws Exception { - expectStart(); - expectStop(); + expectProducerAndConsumerCreate(); Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 7L); @@ -228,11 +228,12 @@ public void testReloadOnStartWithNoNewRecordsPresent() throws Exception { //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); assertTrue(consumer.closed()); + verifyStartAndStop(); } @Test public void testSendAndReadToEnd() throws Exception { - expectStart(); + expectProducerAndConsumerCreate(); TestFuture tp0Future = new TestFuture<>(); ProducerRecord tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE); ArgumentCaptor callback0 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class); @@ -242,11 +243,6 @@ public void testSendAndReadToEnd() throws Exception { ArgumentCaptor callback1 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class); when(producer.send(eq(tp1Record), callback1.capture())).thenReturn(tp1Future); - // Producer flushes when read to log end is called - producer.flush(); - // PowerMock.expectLastCall(); - expectStop(); - Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); @@ -318,12 +314,13 @@ public void testSendAndReadToEnd() throws Exception { //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); assertTrue(consumer.closed()); + verify(producer).flush(); + verifyStartAndStop(); } @Test public void testPollConsumerError() throws Exception { - expectStart(); - expectStop(); + expectProducerAndConsumerCreate(); final CountDownLatch finishedLatch = new CountDownLatch(1); Map endOffsets = new HashMap<>(); @@ -356,17 +353,12 @@ public void testPollConsumerError() throws Exception { //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); assertTrue(consumer.closed()); + verifyStartAndStop(); } @Test public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception { - expectStart(); - - // Producer flushes when read to log end is called - producer.flush(); - // PowerMock.expectLastCall(); - - expectStop(); + expectProducerAndConsumerCreate(); final CountDownLatch finishedLatch = new CountDownLatch(1); Map endOffsets = new HashMap<>(); @@ -411,18 +403,18 @@ public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception { //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); assertTrue(consumer.closed()); + verify(producer).flush(); + verifyStartAndStop(); } @Test public void testProducerError() throws Exception { - expectStart(); + expectProducerAndConsumerCreate(); TestFuture tp0Future = new TestFuture<>(); ProducerRecord tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE); ArgumentCaptor callback0 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class); when(producer.send(eq(tp0Record), callback0.capture())).thenReturn(tp0Future); - expectStop(); - Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); @@ -446,6 +438,7 @@ public void testProducerError() throws Exception { //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); assertTrue(consumer.closed()); + verifyStartAndStop(); } @Test @@ -515,17 +508,9 @@ private void expectProducerAndConsumerCreate() { doReturn(consumer).when(store).createConsumer(); } - private void expectStart() throws Exception { - initializer.run(); - // EasyMock.expectLastCall().times(1); - - expectProducerAndConsumerCreate(); - } - - private void expectStop() { - producer.close(); - // PowerMock.expectLastCall(); - // MockConsumer close is checked after test. + private void verifyStartAndStop() { + verify(initializer).run(); + verify(producer).close(); } private static ByteBuffer buffer(String v) { From f52125a3010c03abc8dd5dd028cdc4f361cd0cb7 Mon Sep 17 00:00:00 2001 From: bachmanity1 Date: Sat, 5 Aug 2023 01:02:02 +0900 Subject: [PATCH 04/12] just remove whitebox --- .../kafka/connect/util/KafkaBasedLogTest.java | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index 388200f70225..1ac9dc53843d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -74,7 +74,7 @@ import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.verify; -@RunWith(MockitoJUnitRunner.class) +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class KafkaBasedLogTest { private static final String TOPIC = "connect-log"; @@ -154,7 +154,6 @@ public void testStartStop() throws Exception { assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment()); store.stop(); - assertTrue(consumer.closed()); verifyStartAndStop(); } @@ -195,8 +194,6 @@ public void testReloadOnStart() throws Exception { assertEquals(TP1_VALUE, consumedRecords.get(TP1).get(0).value()); store.stop(); - //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); verifyStartAndStop(); } @@ -226,8 +223,6 @@ public void testReloadOnStartWithNoNewRecordsPresent() throws Exception { store.stop(); - //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); verifyStartAndStop(); } @@ -312,8 +307,6 @@ public void testSendAndReadToEnd() throws Exception { // Cleanup store.stop(); - //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); verify(producer).flush(); verifyStartAndStop(); } @@ -351,8 +344,6 @@ public void testPollConsumerError() throws Exception { store.stop(); - //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); verifyStartAndStop(); } @@ -401,8 +392,6 @@ public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception { store.stop(); - //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); verify(producer).flush(); verifyStartAndStop(); } @@ -436,8 +425,6 @@ public void testProducerError() throws Exception { store.stop(); - //assertFalse(Whitebox.getInternalState(store, "thread").isAlive()); - assertTrue(consumer.closed()); verifyStartAndStop(); } @@ -511,6 +498,7 @@ private void expectProducerAndConsumerCreate() { private void verifyStartAndStop() { verify(initializer).run(); verify(producer).close(); + assertTrue(consumer.closed()); } private static ByteBuffer buffer(String v) { From 9bb96616f20daa3018352e8b146662b9cf19d370 Mon Sep 17 00:00:00 2001 From: bachmanity1 Date: Tue, 8 Aug 2023 01:06:21 +0900 Subject: [PATCH 05/12] fix build errors --- .../kafka/connect/util/KafkaBasedLogTest.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index 1ac9dc53843d..9ce0831f2d7b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -45,7 +45,6 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -59,6 +58,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Supplier; import static org.junit.Assert.assertEquals; @@ -117,12 +117,14 @@ public class KafkaBasedLogTest { private KafkaBasedLog store; @Mock - private Runnable initializer; + private Consumer initializer; @Mock private KafkaProducer producer; private MockConsumer consumer; @Mock private TopicAdmin admin; + @Mock + private Supplier topicAdminSupplier; private final Map>> consumedRecords = new HashMap<>(); private final Callback> consumedCallback = (error, record) -> { @@ -133,7 +135,7 @@ public class KafkaBasedLogTest { @Before public void setUp() { - store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time, initializer)); + store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, topicAdminSupplier, consumedCallback, time, initializer)); consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1)); Map beginningOffsets = new HashMap<>(); @@ -496,13 +498,8 @@ private void expectProducerAndConsumerCreate() { } private void verifyStartAndStop() { - verify(initializer).run(); + verify(initializer).accept(any()); verify(producer).close(); assertTrue(consumer.closed()); } - - private static ByteBuffer buffer(String v) { - return ByteBuffer.wrap(v.getBytes()); - } - } From c504dab21078071baef96fef754ffb84d0bbce1d Mon Sep 17 00:00:00 2001 From: bachmanity1 Date: Tue, 8 Aug 2023 01:15:07 +0900 Subject: [PATCH 06/12] reorder imports --- .../apache/kafka/connect/util/KafkaBasedLogTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index 9ce0831f2d7b..789cf3b83d62 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -36,8 +36,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -66,13 +66,13 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.StrictStubs.class) public class KafkaBasedLogTest { From 000891d9723a16ff4682b615c3092c8d3f2b9bef Mon Sep 17 00:00:00 2001 From: bachmanity1 Date: Tue, 8 Aug 2023 19:14:42 +0900 Subject: [PATCH 07/12] remove KafkaBasedLogTest from build.gradle --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 76dec6966800..bc11cd20332f 100644 --- a/build.gradle +++ b/build.gradle @@ -417,7 +417,7 @@ subprojects { // connect tests "**/DistributedHerderTest.*", "**/KafkaConfigBackingStoreTest.*", - "**/KafkaBasedLogTest.*", "**/StandaloneHerderTest.*", + "**/StandaloneHerderTest.*", "**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*" ]) } From d42a3e0017e0955d9a4683225710d8ead9413563 Mon Sep 17 00:00:00 2001 From: bachmanity1 Date: Tue, 8 Aug 2023 19:57:09 +0900 Subject: [PATCH 08/12] make thread package private --- .../main/java/org/apache/kafka/connect/util/KafkaBasedLog.java | 2 +- .../java/org/apache/kafka/connect/util/KafkaBasedLogTest.java | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 046d84923aec..b122d33f57e3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -103,7 +103,7 @@ public class KafkaBasedLog { private Optional> producer; private TopicAdmin admin; - private Thread thread; + Thread thread; private boolean stopRequested; private final Queue> readLogEndOffsetCallbacks; private final java.util.function.Consumer initializer; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index 789cf3b83d62..be8be5fecd85 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -62,6 +62,7 @@ import java.util.function.Supplier; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; @@ -309,6 +310,7 @@ public void testSendAndReadToEnd() throws Exception { // Cleanup store.stop(); + // Producer flushes when read to log end is called verify(producer).flush(); verifyStartAndStop(); } @@ -501,5 +503,6 @@ private void verifyStartAndStop() { verify(initializer).accept(any()); verify(producer).close(); assertTrue(consumer.closed()); + assertFalse(store.thread.isAlive()); } } From 8a1ce9dfca151cf97c84b6e3ad513de500dbc89a Mon Sep 17 00:00:00 2001 From: bachmanity1 Date: Tue, 8 Aug 2023 20:27:33 +0900 Subject: [PATCH 09/12] apply review suggestions --- .../kafka/connect/util/KafkaBasedLogTest.java | 84 ++++++++----------- 1 file changed, 34 insertions(+), 50 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index be8be5fecd85..206b3cad3bba 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -69,9 +69,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyLong; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -115,17 +113,16 @@ public class KafkaBasedLogTest { private static final String TP1_VALUE_NEW = "VAL1_NEW"; private final Time time = new MockTime(); - private KafkaBasedLog store; + private MockedKafkaBasedLog store; + @Mock + private TopicAdmin admin = null; @Mock private Consumer initializer; @Mock private KafkaProducer producer; + private final Supplier topicAdminSupplier = () -> admin; private MockConsumer consumer; - @Mock - private TopicAdmin admin; - @Mock - private Supplier topicAdminSupplier; private final Map>> consumedRecords = new HashMap<>(); private final Callback> consumedCallback = (error, record) -> { @@ -134,9 +131,31 @@ public class KafkaBasedLogTest { records.add(record); }; + private class MockedKafkaBasedLog extends KafkaBasedLog { + public MockedKafkaBasedLog(String topic, + Map producerConfigs, + Map consumerConfigs, + Supplier topicAdminSupplier, + Callback> consumedCallback, + Time time, + Consumer initializer) { + super(topic, producerConfigs, consumerConfigs, topicAdminSupplier, consumedCallback, time, initializer); + } + + @Override + protected KafkaProducer createProducer() { + return producer; + } + + @Override + protected MockConsumer createConsumer() { + return consumer; + } + } + @Before public void setUp() { - store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, topicAdminSupplier, consumedCallback, time, initializer)); + store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, topicAdminSupplier, consumedCallback, time, initializer); consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1)); Map beginningOffsets = new HashMap<>(); @@ -146,9 +165,7 @@ public void setUp() { } @Test - public void testStartStop() throws Exception { - expectProducerAndConsumerCreate(); - + public void testStartStop() { Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); @@ -162,8 +179,6 @@ public void testStartStop() throws Exception { @Test public void testReloadOnStart() throws Exception { - expectProducerAndConsumerCreate(); - Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 1L); endOffsets.put(TP1, 1L); @@ -201,9 +216,7 @@ public void testReloadOnStart() throws Exception { } @Test - public void testReloadOnStartWithNoNewRecordsPresent() throws Exception { - expectProducerAndConsumerCreate(); - + public void testReloadOnStartWithNoNewRecordsPresent() { Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 7L); endOffsets.put(TP1, 7L); @@ -231,7 +244,6 @@ public void testReloadOnStartWithNoNewRecordsPresent() throws Exception { @Test public void testSendAndReadToEnd() throws Exception { - expectProducerAndConsumerCreate(); TestFuture tp0Future = new TestFuture<>(); ProducerRecord tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE); ArgumentCaptor callback0 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class); @@ -317,8 +329,6 @@ public void testSendAndReadToEnd() throws Exception { @Test public void testPollConsumerError() throws Exception { - expectProducerAndConsumerCreate(); - final CountDownLatch finishedLatch = new CountDownLatch(1); Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 1L); @@ -353,8 +363,6 @@ public void testPollConsumerError() throws Exception { @Test public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception { - expectProducerAndConsumerCreate(); - final CountDownLatch finishedLatch = new CountDownLatch(1); Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); @@ -401,8 +409,7 @@ public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception { } @Test - public void testProducerError() throws Exception { - expectProducerAndConsumerCreate(); + public void testProducerError() { TestFuture tp0Future = new TestFuture<>(); ProducerRecord tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE); ArgumentCaptor callback0 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class); @@ -433,11 +440,7 @@ public void testProducerError() throws Exception { } @Test - public void testReadEndOffsetsUsingAdmin() throws Exception { - // Create a log that uses the admin supplier - setupWithAdmin(); - expectProducerAndConsumerCreate(); - + public void testReadEndOffsetsUsingAdmin() { Set tps = new HashSet<>(Arrays.asList(TP0, TP1)); Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); @@ -450,11 +453,7 @@ public void testReadEndOffsetsUsingAdmin() throws Exception { } @Test - public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() throws Exception { - // Create a log that uses the admin supplier - setupWithAdmin(); - expectProducerAndConsumerCreate(); - + public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() { Set tps = new HashSet<>(Arrays.asList(TP0, TP1)); // Getting end offsets using the admin client should fail with unsupported version when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenThrow(new UnsupportedVersionException("too old")); @@ -470,11 +469,7 @@ public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() throws Except } @Test - public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exception { - // Create a log that uses the admin supplier - setupWithAdmin(); - expectProducerAndConsumerCreate(); - + public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() { Set tps = new HashSet<>(Arrays.asList(TP0, TP1)); Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); @@ -488,19 +483,8 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio assertThrows(LeaderNotAvailableException.class, () -> store.readEndOffsets(tps, false)); } - private void setupWithAdmin() { - Supplier adminSupplier = () -> admin; - java.util.function.Consumer initializer = admin -> { }; - store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer)); - } - - private void expectProducerAndConsumerCreate() { - doReturn(producer).when(store).createProducer(); - doReturn(consumer).when(store).createConsumer(); - } - private void verifyStartAndStop() { - verify(initializer).accept(any()); + verify(initializer).accept(admin); verify(producer).close(); assertTrue(consumer.closed()); assertFalse(store.thread.isAlive()); From 2c7d2e380d02d0f7dc77944ca5775ddaa3540457 Mon Sep 17 00:00:00 2001 From: bachmanity1 Date: Tue, 8 Aug 2023 20:39:15 +0900 Subject: [PATCH 10/12] mock admin when needed only --- .../org/apache/kafka/connect/util/KafkaBasedLogTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index 206b3cad3bba..c5e793cfbfd9 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -70,6 +70,7 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -115,12 +116,11 @@ public class KafkaBasedLogTest { private final Time time = new MockTime(); private MockedKafkaBasedLog store; - @Mock - private TopicAdmin admin = null; @Mock private Consumer initializer; @Mock private KafkaProducer producer; + private TopicAdmin admin; private final Supplier topicAdminSupplier = () -> admin; private MockConsumer consumer; @@ -445,6 +445,7 @@ public void testReadEndOffsetsUsingAdmin() { Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); + admin = mock(TopicAdmin.class); when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenReturn(endOffsets); when(admin.endOffsets(eq(tps))).thenReturn(endOffsets); @@ -455,6 +456,7 @@ public void testReadEndOffsetsUsingAdmin() { @Test public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() { Set tps = new HashSet<>(Arrays.asList(TP0, TP1)); + admin = mock(TopicAdmin.class); // Getting end offsets using the admin client should fail with unsupported version when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenThrow(new UnsupportedVersionException("too old")); @@ -474,6 +476,7 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() { Map endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); + admin = mock(TopicAdmin.class); // Getting end offsets upon startup should work fine when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenReturn(endOffsets); // Getting end offsets using the admin client should fail with leader not available From c91f102d638d586e60ded4b4a25cdb8aa5dad4c2 Mon Sep 17 00:00:00 2001 From: bachmanity1 Date: Tue, 8 Aug 2023 20:50:05 +0900 Subject: [PATCH 11/12] make sure that TopicAdmin methods are called one time only --- .../org/apache/kafka/connect/util/KafkaBasedLogTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index c5e793cfbfd9..c8199419ea05 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -451,6 +451,8 @@ public void testReadEndOffsetsUsingAdmin() { store.start(); assertEquals(endOffsets, store.readEndOffsets(tps, false)); + verify(admin).retryEndOffsets(eq(tps), any(), anyLong()); + verify(admin).endOffsets(eq(tps)); } @Test @@ -468,6 +470,7 @@ public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() { store.start(); assertEquals(endOffsets, store.readEndOffsets(tps, false)); + verify(admin).retryEndOffsets(eq(tps), any(), anyLong()); } @Test @@ -484,6 +487,8 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() { store.start(); assertThrows(LeaderNotAvailableException.class, () -> store.readEndOffsets(tps, false)); + verify(admin).retryEndOffsets(eq(tps), any(), anyLong()); + verify(admin).endOffsets(eq(tps)); } private void verifyStartAndStop() { From 65c33d4bdaa40927cb961e6cff02f1e1eb8bc025 Mon Sep 17 00:00:00 2001 From: bachmanity1 Date: Tue, 8 Aug 2023 23:47:59 +0900 Subject: [PATCH 12/12] apply pr suggestions --- .../kafka/connect/util/KafkaBasedLog.java | 2 +- .../kafka/connect/util/KafkaBasedLogTest.java | 37 +++++++------------ 2 files changed, 14 insertions(+), 25 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index b122d33f57e3..3a9014eda65a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -102,7 +102,7 @@ public class KafkaBasedLog { private Consumer consumer; private Optional> producer; private TopicAdmin admin; - + // Visible for testing Thread thread; private boolean stopRequested; private final Queue> readLogEndOffsetCallbacks; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index c8199419ea05..40ad5ba5c622 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -114,7 +114,7 @@ public class KafkaBasedLogTest { private static final String TP1_VALUE_NEW = "VAL1_NEW"; private final Time time = new MockTime(); - private MockedKafkaBasedLog store; + private KafkaBasedLog store; @Mock private Consumer initializer; @@ -131,31 +131,19 @@ public class KafkaBasedLogTest { records.add(record); }; - private class MockedKafkaBasedLog extends KafkaBasedLog { - public MockedKafkaBasedLog(String topic, - Map producerConfigs, - Map consumerConfigs, - Supplier topicAdminSupplier, - Callback> consumedCallback, - Time time, - Consumer initializer) { - super(topic, producerConfigs, consumerConfigs, topicAdminSupplier, consumedCallback, time, initializer); - } - - @Override - protected KafkaProducer createProducer() { - return producer; - } - - @Override - protected MockConsumer createConsumer() { - return consumer; - } - } - @Before public void setUp() { - store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, topicAdminSupplier, consumedCallback, time, initializer); + store = new KafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, topicAdminSupplier, consumedCallback, time, initializer) { + @Override + protected KafkaProducer createProducer() { + return producer; + } + + @Override + protected MockConsumer createConsumer() { + return consumer; + } + }; consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1)); Map beginningOffsets = new HashMap<>(); @@ -404,6 +392,7 @@ public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception { store.stop(); + // Producer flushes when read to log end is called verify(producer).flush(); verifyStartAndStop(); }