Skip to content

Commit

Permalink
apply review suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
bachmanity1 committed Aug 8, 2023
1 parent d42a3e0 commit 8a1ce9d
Showing 1 changed file with 34 additions and 50 deletions.
Expand Up @@ -69,9 +69,7 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -115,17 +113,16 @@ public class KafkaBasedLogTest {
private static final String TP1_VALUE_NEW = "VAL1_NEW";

private final Time time = new MockTime();
private KafkaBasedLog<String, String> store;
private MockedKafkaBasedLog store;

@Mock
private TopicAdmin admin = null;
@Mock
private Consumer<TopicAdmin> initializer;
@Mock
private KafkaProducer<String, String> producer;
private final Supplier<TopicAdmin> topicAdminSupplier = () -> admin;
private MockConsumer<String, String> consumer;
@Mock
private TopicAdmin admin;
@Mock
private Supplier<TopicAdmin> topicAdminSupplier;

private final Map<TopicPartition, List<ConsumerRecord<String, String>>> consumedRecords = new HashMap<>();
private final Callback<ConsumerRecord<String, String>> consumedCallback = (error, record) -> {
Expand All @@ -134,9 +131,31 @@ public class KafkaBasedLogTest {
records.add(record);
};

private class MockedKafkaBasedLog extends KafkaBasedLog<String, String> {
public MockedKafkaBasedLog(String topic,
Map<String, Object> producerConfigs,
Map<String, Object> consumerConfigs,
Supplier<TopicAdmin> topicAdminSupplier,
Callback<ConsumerRecord<String, String>> consumedCallback,
Time time,
Consumer<TopicAdmin> initializer) {
super(topic, producerConfigs, consumerConfigs, topicAdminSupplier, consumedCallback, time, initializer);
}

@Override
protected KafkaProducer<String, String> createProducer() {
return producer;
}

@Override
protected MockConsumer<String, String> createConsumer() {
return consumer;
}
}

@Before
public void setUp() {
store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, topicAdminSupplier, consumedCallback, time, initializer));
store = new MockedKafkaBasedLog(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, topicAdminSupplier, consumedCallback, time, initializer);
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1));
Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
Expand All @@ -146,9 +165,7 @@ public void setUp() {
}

@Test
public void testStartStop() throws Exception {
expectProducerAndConsumerCreate();

public void testStartStop() {
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
endOffsets.put(TP1, 0L);
Expand All @@ -162,8 +179,6 @@ public void testStartStop() throws Exception {

@Test
public void testReloadOnStart() throws Exception {
expectProducerAndConsumerCreate();

Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 1L);
endOffsets.put(TP1, 1L);
Expand Down Expand Up @@ -201,9 +216,7 @@ public void testReloadOnStart() throws Exception {
}

@Test
public void testReloadOnStartWithNoNewRecordsPresent() throws Exception {
expectProducerAndConsumerCreate();

public void testReloadOnStartWithNoNewRecordsPresent() {
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 7L);
endOffsets.put(TP1, 7L);
Expand Down Expand Up @@ -231,7 +244,6 @@ public void testReloadOnStartWithNoNewRecordsPresent() throws Exception {

@Test
public void testSendAndReadToEnd() throws Exception {
expectProducerAndConsumerCreate();
TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE);
ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback0 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
Expand Down Expand Up @@ -317,8 +329,6 @@ public void testSendAndReadToEnd() throws Exception {

@Test
public void testPollConsumerError() throws Exception {
expectProducerAndConsumerCreate();

final CountDownLatch finishedLatch = new CountDownLatch(1);
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 1L);
Expand Down Expand Up @@ -353,8 +363,6 @@ public void testPollConsumerError() throws Exception {

@Test
public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception {
expectProducerAndConsumerCreate();

final CountDownLatch finishedLatch = new CountDownLatch(1);
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
Expand Down Expand Up @@ -401,8 +409,7 @@ public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception {
}

@Test
public void testProducerError() throws Exception {
expectProducerAndConsumerCreate();
public void testProducerError() {
TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE);
ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback0 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
Expand Down Expand Up @@ -433,11 +440,7 @@ public void testProducerError() throws Exception {
}

@Test
public void testReadEndOffsetsUsingAdmin() throws Exception {
// Create a log that uses the admin supplier
setupWithAdmin();
expectProducerAndConsumerCreate();

public void testReadEndOffsetsUsingAdmin() {
Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
Expand All @@ -450,11 +453,7 @@ public void testReadEndOffsetsUsingAdmin() throws Exception {
}

@Test
public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() throws Exception {
// Create a log that uses the admin supplier
setupWithAdmin();
expectProducerAndConsumerCreate();

public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() {
Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
// Getting end offsets using the admin client should fail with unsupported version
when(admin.retryEndOffsets(eq(tps), any(), anyLong())).thenThrow(new UnsupportedVersionException("too old"));
Expand All @@ -470,11 +469,7 @@ public void testReadEndOffsetsUsingAdminThatFailsWithUnsupported() throws Except
}

@Test
public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exception {
// Create a log that uses the admin supplier
setupWithAdmin();
expectProducerAndConsumerCreate();

public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() {
Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
Expand All @@ -488,19 +483,8 @@ public void testReadEndOffsetsUsingAdminThatFailsWithRetriable() throws Exceptio
assertThrows(LeaderNotAvailableException.class, () -> store.readEndOffsets(tps, false));
}

private void setupWithAdmin() {
Supplier<TopicAdmin> adminSupplier = () -> admin;
java.util.function.Consumer<TopicAdmin> initializer = admin -> { };
store = spy(new KafkaBasedLog<>(TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, adminSupplier, consumedCallback, time, initializer));
}

private void expectProducerAndConsumerCreate() {
doReturn(producer).when(store).createProducer();
doReturn(consumer).when(store).createConsumer();
}

private void verifyStartAndStop() {
verify(initializer).accept(any());
verify(initializer).accept(admin);
verify(producer).close();
assertTrue(consumer.closed());
assertFalse(store.thread.isAlive());
Expand Down

0 comments on commit 8a1ce9d

Please sign in to comment.