Skip to content

KAFKA-19042 Rewrite ConsumerBounceTest by Java #19822

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 48 commits into from
Jun 29, 2025

Conversation

TaiJuWu
Copy link
Collaborator

@TaiJuWu TaiJuWu commented May 27, 2025

This PR do following:

  1. rewrite consumerBounceTest to java
  2. Move test to clients-intergration-test

Reviewers: Chia-Ping Tsai chia7712@gmail.com

@github-actions github-actions bot added triage PRs from the community clients labels May 27, 2025
@TaiJuWu TaiJuWu force-pushed the KAFKA-19042_ConsumerBounceTest branch from 83f8f02 to 419729c Compare May 31, 2025 23:52
@TaiJuWu TaiJuWu force-pushed the KAFKA-19042_ConsumerBounceTest branch from 45770eb to 7e162ae Compare June 1, 2025 13:05
Copy link

github-actions bot commented Jun 4, 2025

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

@TaiJuWu TaiJuWu force-pushed the KAFKA-19042_ConsumerBounceTest branch from 4832b9b to 7ddfa18 Compare June 11, 2025 07:18
@github-actions github-actions bot removed the triage PRs from the community label Jun 14, 2025
@TaiJuWu TaiJuWu changed the title (wip )KAFKA-19042 consumer bounce test KAFKA-19042 consumer bounce test Jun 20, 2025
future1.get
future2.get
}

/**
* If we have a running consumer group of size N, configure consumer.group.max.size = N-1 and restart all brokers,
* the group should be forced to rebalance when it becomes hosted on a Coordinator with the new config.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is disabled and Im afraid of missing something when migrating so I leave it here.

int numRecords = 10;
ClientsTestUtils.sendRecords(clusterInstance, topicPartition, numRecords);

checkCloseGoodPath(numRecords, "group1");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have to set the GroupProtocol explicitly. otherwise, all consumers in this test will use classic group.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catch it up!

Consumer<byte[], byte[]> dynamicConsumer = createConsumerAndReceive(dynamicGroup, false, numRecords);
Consumer<byte[], byte[]> manualConsumer = createConsumerAndReceive(manualGroup, true, numRecords);

findCoordinators(List.of(dynamicGroup, manualGroup)).forEach(clusterInstance::shutdownBroker);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please use Admin to find out the coordinator?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use admin to findCoordinator because the group3 is not created on broker side.
From my understanding, consumer.assign won't go through group manager so group3 should not be created.
Admin will throw GroupIdNotFoundException

But when we send RPC directly, we can find the coordinator for group3.
Im try finding the difference.

You can run cfc0ee8 to observe this behaviour.

@chia7712
Copy link
Member

@TaiJuWu please fix the conflicts

assertInstanceOf(GroupMaxSizeReachedException.class, rejectedConsumer.getThrownException().get());

// assert group continues to live
ClientsTestUtils.sendRecordsToTopic(clusterInstance, topic, Integer.parseInt(MAX_GROUP_SIZE), 0, Integer.parseInt(MAX_GROUP_SIZE) * 100);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure the relation between MAX_GROUP_SIZE and records. It seems the scenario is to send over 100 records, right?

Copy link
Collaborator Author

@TaiJuWu TaiJuWu Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, the test need to ensure all group can receive record continuously so we need to send record to each partition.
We make the number of partition equals MAX_GROUP_SIZE and let all partition have records so it should be partition.

I will rename variable to partition to improve readability.


private final Set<TopicPartition> partitionAssignment = new HashSet<>();
private final Set<TopicPartition> partitionsToAssign;
private final ConsumerRebalanceListener userRebalanceListener;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this variable is useless


assertInstanceOf(GroupMaxSizeReachedException.class, rejectedConsumer.getThrownException().get());

// assert group continues to live
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please enrich the comment? We expect the records to be distributed across all partitions.. For another, perhaps we can simplify the code.

        var data = "data".getBytes(StandardCharsets.UTF_8);
        try (Producer<byte[], byte[]> producer = clusterInstance.producer()) {
            IntStream.range(0, numPartition * 100).forEach(index ->
                producer.send(new ProducerRecord<>(topic, index % numPartition, data, data)));
        }

@chia7712 chia7712 changed the title KAFKA-19042 consumer bounce test KAFKA-19042 Rewrite ConsumerBounceTest by Java Jun 29, 2025
@chia7712 chia7712 merged commit a95522a into apache:trunk Jun 29, 2025
24 checks passed
@TaiJuWu TaiJuWu deleted the KAFKA-19042_ConsumerBounceTest branch June 30, 2025 01:44
jiafu1115 pushed a commit to jiafu1115/kafka that referenced this pull request Jul 2, 2025
This PR does the following:
1) Rewrites consumerBounceTest in Java.
2) Moves the test to clients-integration-test.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
jiafu1115 pushed a commit to jiafu1115/kafka that referenced this pull request Jul 3, 2025
This PR does the following:
1) Rewrites consumerBounceTest in Java.
2) Moves the test to clients-integration-test.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build Gradle build or GitHub Actions clients core Kafka Broker
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants