Skip to content

Commit

Permalink
MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions
Browse files Browse the repository at this point in the history
Signed-off-by: PoAn Yang <payang@apache.org>
  • Loading branch information
FrankYang0529 committed May 9, 2024
1 parent f4fdaa7 commit e8307a0
Show file tree
Hide file tree
Showing 4 changed files with 500 additions and 419 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -139,10 +138,6 @@ ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args)
return service;
}

ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers) {
return addConsumerGroupExecutor(numConsumers, TOPIC, GROUP, RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), false, GroupProtocol.CLASSIC.name);
}

ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String groupProtocol) {
return addConsumerGroupExecutor(numConsumers, TOPIC, GROUP, RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), false, groupProtocol);
}
Expand All @@ -151,10 +146,6 @@ ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String groupPro
return addConsumerGroupExecutor(numConsumers, TOPIC, GROUP, RangeAssignor.class.getName(), remoteAssignor, Optional.empty(), false, groupProtocol);
}

ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String group, String groupProtocol) {
return addConsumerGroupExecutor(numConsumers, TOPIC, group, RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), false, groupProtocol);
}

ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String topic, String group, String groupProtocol) {
return addConsumerGroupExecutor(numConsumers, topic, group, RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), false, groupProtocol);
}
Expand All @@ -167,10 +158,6 @@ ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String topic, S
return executor;
}

SimpleConsumerGroupExecutor addSimpleGroupExecutor(String group) {
return addSimpleGroupExecutor(Arrays.asList(new TopicPartition(TOPIC, 0)), group);
}

SimpleConsumerGroupExecutor addSimpleGroupExecutor(Collection<TopicPartition> partitions, String group) {
SimpleConsumerGroupExecutor executor = new SimpleConsumerGroupExecutor(bootstrapServers(listenerName()), group, partitions);
addExecutor(executor);
Expand Down Expand Up @@ -339,14 +326,6 @@ public static Stream<Arguments> getTestQuorumAndGroupProtocolParametersAll() {
return BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll();
}

public static Stream<Arguments> getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() {
return BaseConsumerTest.getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly();
}

public static Stream<Arguments> getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly() {
return BaseConsumerTest.getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly();
}

@SuppressWarnings({"deprecation"})
static <T> Seq<T> seq(Collection<T> seq) {
return JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import kafka.test.ClusterConfig;
import kafka.test.ClusterGenerator;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Utils;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -92,6 +94,7 @@ static void generator(ClusterGenerator clusterGenerator) {
static <T> AutoCloseable buildConsumers(int numberOfConsumers,
boolean syncCommit,
String topic,
Collection<TopicPartition> topicPartitions,
Supplier<KafkaConsumer<T, T>> consumerSupplier) {
List<KafkaConsumer<T, T>> consumers = new ArrayList<>(numberOfConsumers);
ExecutorService executor = Executors.newFixedThreadPool(numberOfConsumers);
Expand All @@ -101,7 +104,7 @@ static <T> AutoCloseable buildConsumers(int numberOfConsumers,
for (int i = 0; i < numberOfConsumers; i++) {
KafkaConsumer<T, T> consumer = consumerSupplier.get();
consumers.add(consumer);
executor.execute(() -> initConsumer(topic, syncCommit, consumer, closed));
executor.execute(() -> initConsumer(topic, topicPartitions, syncCommit, consumer, closed));
}
return closeable;
} catch (Throwable e) {
Expand All @@ -117,9 +120,12 @@ private static <T> void releaseConsumers(AtomicBoolean closed, List<KafkaConsume
executor.awaitTermination(1, TimeUnit.MINUTES);
}

private static <T> void initConsumer(String topic, boolean syncCommit, KafkaConsumer<T, T> consumer, AtomicBoolean closed) {
private static <T> void initConsumer(String topic, Collection<TopicPartition> topicPartitions, boolean syncCommit, KafkaConsumer<T, T> consumer, AtomicBoolean closed) {
try (KafkaConsumer<T, T> kafkaConsumer = consumer) {
kafkaConsumer.subscribe(singleton(topic));
if (topicPartitions != null)
kafkaConsumer.assign(topicPartitions);
else
kafkaConsumer.subscribe(singleton(topic));
while (!closed.get()) {
consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
if (syncCommit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ private AutoCloseable consumerGroupClosable(ClusterInstance cluster, GroupProtoc
1,
false,
topicName,
null,
() -> new KafkaConsumer<String, String>(configs)
);
}
Expand Down

0 comments on commit e8307a0

Please sign in to comment.