Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions #15821

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

FrankYang0529
Copy link
Member

By using ClusterTestExtensions, ListConsumerGroupTest get get away from KafkaServerTestHarness dependency.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@FrankYang0529
Copy link
Member Author

Rely on #15766.

@FrankYang0529 FrankYang0529 force-pushed the minor-migrate-ListConsumerGroup branch 2 times, most recently from 7933c99 to 7c85ff9 Compare May 7, 2024 15:23
@FrankYang0529
Copy link
Member Author

Hi @chia7712, I rebase latest trunk branch, so we have ConsumerGroupCommandTestUtils now. The only remaining part is SimpleConsumerGroupExecutor. This is used by ListConsumerGroupTest and DescribeConsumerGroupTest. I think we can create a new class SimpleConsumerGroupExecutorTestUtils for it. WDYT? Thank you.

class SimpleConsumerGroupExecutor extends AbstractConsumerGroupExecutor {
public SimpleConsumerGroupExecutor(String broker, String groupId, Collection<TopicPartition> partitions) {
super(1);
SimpleConsumerRunnable th = new SimpleConsumerRunnable(broker, groupId, partitions);
th.configure();
submit(th);
}
}

@chia7712
Copy link
Contributor

chia7712 commented May 8, 2024

This is used by ListConsumerGroupTest and DescribeConsumerGroupTest. I think we can create a new class SimpleConsumerGroupExecutorTestUtils for it. WDYT? Thank you.

That is addressed already. see
https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java#L92

@FrankYang0529 FrankYang0529 force-pushed the minor-migrate-ListConsumerGroup branch from 7c85ff9 to e8307a0 Compare May 9, 2024 15:21
@FrankYang0529
Copy link
Member Author

This is used by ListConsumerGroupTest and DescribeConsumerGroupTest. I think we can create a new class SimpleConsumerGroupExecutorTestUtils for it. WDYT? Thank you.

That is addressed already. see https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java#L92

Thanks. The SimpleConsumerGroupExecutor subscribe topic partitions. I have updated ConsumerGroupCommandTestUtils to support it.

class SimpleConsumerRunnable extends AbstractConsumerRunnable {
final Collection<TopicPartition> partitions;
public SimpleConsumerRunnable(String broker, String groupId, Collection<TopicPartition> partitions) {
super(broker, groupId, Optional.empty(), false);
this.partitions = partitions;
}
@Override
void subscribe() {
consumer.assign(partitions);
}
}

@FrankYang0529 FrankYang0529 marked this pull request as ready for review May 9, 2024 15:23
@@ -92,6 +94,7 @@ static void generator(ClusterGenerator clusterGenerator) {
static <T> AutoCloseable buildConsumers(int numberOfConsumers,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have two buildConsumers to deal with "assign"/"subscribe" individually?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I update to define two buildConsumers. May you help me take a look? If we're good with current design. I will update to #15908.

@FrankYang0529 FrankYang0529 force-pushed the minor-migrate-ListConsumerGroup branch from e8307a0 to 37ae9fb Compare May 11, 2024 02:34
@chia7712
Copy link
Contributor

@FrankYang0529 please rebase code to include 334d5d5

Signed-off-by: PoAn Yang <payang@apache.org>
@FrankYang0529 FrankYang0529 force-pushed the minor-migrate-ListConsumerGroup branch from 37ae9fb to 6191872 Compare May 13, 2024 16:17
@@ -93,6 +95,27 @@ static <T> AutoCloseable buildConsumers(int numberOfConsumers,
}
}

static <T> AutoCloseable buildConsumers(int numberOfConsumers,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please DON'T generate a bunch of duplicate code...

    static <T> AutoCloseable buildConsumers(int numberOfConsumers,
                                            boolean syncCommit,
                                            Set<TopicPartition> partitions,
                                            Supplier<KafkaConsumer<T, T>> consumerSupplier) {
        return buildConsumers(numberOfConsumers, syncCommit, consumerSupplier,
            consumer -> consumer.assign(partitions));
    }

    static <T> AutoCloseable buildConsumers(int numberOfConsumers,
                                            boolean syncCommit,
                                            String topic,
                                            Supplier<KafkaConsumer<T, T>> consumerSupplier) {
        return buildConsumers(numberOfConsumers, syncCommit, consumerSupplier,
            consumer -> consumer.subscribe(Collections.singleton(topic)));
    }

    private static <T> AutoCloseable buildConsumers(int numberOfConsumers,
                                                    boolean syncCommit,
                                                    Supplier<KafkaConsumer<T, T>> consumerSupplier,
                                                    Consumer<KafkaConsumer<T, T>> setPartitions) {
        List<KafkaConsumer<T, T>> consumers = new ArrayList<>(numberOfConsumers);
        ExecutorService executor = Executors.newFixedThreadPool(numberOfConsumers);
        AtomicBoolean closed = new AtomicBoolean(false);
        final AutoCloseable closeable = () -> releaseConsumers(closed, consumers, executor);
        try {
            for (int i = 0; i < numberOfConsumers; i++) {
                KafkaConsumer<T, T> consumer = consumerSupplier.get();
                consumers.add(consumer);
                executor.execute(() -> initConsumer(syncCommit, () -> {
                    setPartitions.accept(consumer);
                    return consumer;
                }, closed));
            }
            return closeable;
        } catch (Throwable e) {
            Utils.closeQuietly(closeable, "Release Consumer");
            throw e;
        }
    }

    private static <T> void releaseConsumers(AtomicBoolean closed, List<KafkaConsumer<T, T>> consumers, ExecutorService executor) throws InterruptedException {
        closed.set(true);
        consumers.forEach(KafkaConsumer::wakeup);
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
    }

    private static <T> void initConsumer(boolean syncCommit,
                                         Supplier<KafkaConsumer<T, T>> consumerSupplier,
                                         AtomicBoolean closed) {
        try (KafkaConsumer<T, T> kafkaConsumer = consumerSupplier.get()) {
            while (!closed.get()) {
                kafkaConsumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                if (syncCommit)
                    kafkaConsumer.commitSync();
            }
        } catch (WakeupException e) {
            // OK
        }
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants