-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16598 Mirgrate ResetConsumerGroupOffsetTest
to new test infra
#15779
Conversation
@chia7712 Could you please take a look at this question? Thank you. |
Hey @m1a2st , the way to get the servers' addresses is |
@m1a2st Please add following server property: @ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
}) The default replica of offset topic is 3, and the new infra create single broker. Hence, creating offset topic get failed, and hence you can't find a coordinator as the offset partition can't get be ready. Also, please notice @lianetm comments that bootstrap servers need to be updated manually if you kill any broker. |
Rely on #15766 |
@chia7712 , please take a look for this PR, thank you |
@m1a2st Could you please use |
ConsumerGroupExecutor executor = addConsumerGroupExecutor(numConsumers, topic, group, GroupProtocol.CLASSIC.name); | ||
awaitConsumerProgress(topic, group, totalMessages); | ||
executor.shutdown(); | ||
private Consumer<String, String> createNoAutoCommitConsumer(ClusterInstance cluster, String group) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you test the new consumer also?
https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/ClusterInstance.java#L157
ClusterInstance
can return the supported protocol, and you can leverage it to create specify consumer.
@m1a2st Could you please make sure "all" available consumer groups are included in this test? For example: |
produceMessages(cluster, topic, 100); | ||
for (int i = 1; i <= 3; i++) { | ||
String group = GROUP_PREFIX + i; | ||
try (AutoCloseable consumerGroupCloseable = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712, There is a weird problem, I use the try with auto closeable resource, I should not use onsumerGroupCloseable.close()
this method to close resource. But if I don't use close()
method, when I testing Kraft and co-Kraft mode with multi groupProtocol, it will be fail. Please take a look, thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @m1a2st, I remove consumerGroupCloseable.close()
and both KRAFT and CO_KRAFT can pass. Not sure whether you can use String group = GROUP_PREFIX + groupProtocol.name() + i;
and try it again, because you use same group names for both group protocols.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529, Thanks for your comment, and I change to use random group id and then tests will pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@m1a2st thanks for updated PR!
.collect(Collectors.toSet()); | ||
long count, | ||
GroupProtocol groupProtocol) throws Exception { | ||
try (Consumer<String, String> consumer = createNoAutoCommitConsumer(cluster, group, groupProtocol)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we can rewrite this by Admin
. for example:
try (Admin admin = Admin.create(Collections.singletonMap(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) {
TestUtils.waitForCondition(() -> admin.listConsumerGroupOffsets(group).all().get().get(group).entrySet()
.stream().filter(e -> e.getKey().topic().equals(topic))
.mapToLong(e -> e.getValue().offset()).sum() == count, "Expected that consumer group has consumed all messages from topic/partition. " +
"Expected offset: " + count +
". Actual offset: " +
admin.listConsumerGroupOffsets(group).all().get().get(group).entrySet()
.stream().filter(e -> e.getKey().topic().equals(topic)));
}
TopicPartition tp0 = new TopicPartition(topic, 0); | ||
TopicPartition tp1 = new TopicPartition(topic, 1); | ||
createTopic(topic, 2, 1, new Properties(), listenerName(), new Properties()); | ||
private Map<TopicPartition, Long> committedOffsets(ClusterInstance cluster, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
.collect(Collectors.toList()); | ||
kafka.utils.TestUtils.produceMessages(servers(), seq(records), 1); | ||
private static List<String> generateIds(String name) { | ||
return IntStream.rangeClosed(1, 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we reduce the number? There are 5 (cluster) * 3 (groups) * 3 (topics) * 2 (protocols) = 90 ...
@chia7712, Thanks for your review, I have been changed according to your conversations. |
@m1a2st Those tests verify all groups so the loop of try (Admin admin = cluster.createAdminClient()) {
admin.deleteConsumerGroups(groups).all().get();
} |
@chia7712 , Thanks for your review, these test passed. |
@m1a2st could you please rebase code to trigger QA again? |
@chia7712, Thanks for your conversations, I already rebase this code. |
String group, | ||
long count) throws Exception { | ||
try (Admin admin = Admin.create(singletonMap(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()))) { | ||
TestUtils.waitForCondition(() -> admin.listConsumerGroupOffsets(group) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you rewrite it by committedOffsets
?
@chia7712, Please review, Thanks for your comments. |
private static final String TOPIC_PREFIX = "foo-"; | ||
private static final String GROUP_PREFIX = "test.group-"; | ||
|
||
private static void generator(ClusterGenerator clusterGenerator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rebase code to fix this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I rebase it.
It seems there are something is failed. Let's wait for https://issues.apache.org/jira/browse/KAFKA-16828 |
|
||
Map<String, Map<TopicPartition, OffsetAndMetadata>> exportedOffsets = service.resetOffsets(); | ||
bw.write(service.exportOffsetsToCsv(exportedOffsets)); | ||
bw.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BufferedWriter
is in try-resource so do we need to call it again? Maybe we can add a little helper to output string content to specific file
String topic, | ||
String group, | ||
long count) throws Exception { | ||
TestUtils.waitForCondition(() -> committedOffsets(cluster, topic, group) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we avoid creating Admin
repeatedly?
private Map<String, Object> composeConsumerConfigs(ClusterInstance cluster, | ||
String group, | ||
GroupProtocol groupProtocol) { | ||
HashMap<String, Object> configs = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This configs could make the test slower. In those test cases they expect to wait "committed consumer offsets", and hence we have to wait (balance time: 3s + internal commit time: 5s) at least ...
Maybe we can reduce the value of both group.initial.rebalance.delay.ms
and auto.commit.interval.ms
to "1000"? WDYT?
…apache#15779) Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
…apache#15779) Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Currently, I'm working on the ResetConsumerGroupOffsetTest. I'm a bit puzzled about the parameter
bootstrapServers(listenerName())
in the old code. It seems to return localhost:+random port number. My understanding is that in the Scala part of the code, it first creates a broker and then returns the IP and port number of that broker. However, I'm not sure which part of the new testing framework corresponds to this parameter. Currently, I'm usingClusterInstance
, but in the subsequent segmentconsumerGroupCommand.collectGroupState(group).coordinator.host()
, it times out.