-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19042 Rewrite ConsumerBounceTest by Java #19822
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-19042 Rewrite ConsumerBounceTest by Java #19822
Conversation
83f8f02
to
419729c
Compare
45770eb
to
7e162ae
Compare
A label of 'needs-attention' was automatically added to this PR in order to raise the |
…of IntegrationTestUtils, and helper to class ClusterInstance
4832b9b
to
7ddfa18
Compare
future1.get | ||
future2.get | ||
} | ||
|
||
/** | ||
* If we have a running consumer group of size N, configure consumer.group.max.size = N-1 and restart all brokers, | ||
* the group should be forced to rebalance when it becomes hosted on a Coordinator with the new config. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is disabled and Im afraid of missing something when migrating so I leave it here.
...ts-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java
Outdated
Show resolved
Hide resolved
...ts-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java
Outdated
Show resolved
Hide resolved
int numRecords = 10; | ||
ClientsTestUtils.sendRecords(clusterInstance, topicPartition, numRecords); | ||
|
||
checkCloseGoodPath(numRecords, "group1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you have to set the GroupProtocol
explicitly. otherwise, all consumers in this test will use classic group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catch it up!
Consumer<byte[], byte[]> dynamicConsumer = createConsumerAndReceive(dynamicGroup, false, numRecords); | ||
Consumer<byte[], byte[]> manualConsumer = createConsumerAndReceive(manualGroup, true, numRecords); | ||
|
||
findCoordinators(List.of(dynamicGroup, manualGroup)).forEach(clusterInstance::shutdownBroker); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please use Admin
to find out the coordinator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't use admin
to findCoordinator because the group3
is not created on broker side.
From my understanding, consumer.assign
won't go through group manager so group3
should not be created.
Admin will throw GroupIdNotFoundException
But when we send RPC directly, we can find the coordinator for group3
.
Im try finding the difference.
You can run cfc0ee8 to observe this behaviour.
@TaiJuWu please fix the conflicts |
assertInstanceOf(GroupMaxSizeReachedException.class, rejectedConsumer.getThrownException().get()); | ||
|
||
// assert group continues to live | ||
ClientsTestUtils.sendRecordsToTopic(clusterInstance, topic, Integer.parseInt(MAX_GROUP_SIZE), 0, Integer.parseInt(MAX_GROUP_SIZE) * 100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure the relation between MAX_GROUP_SIZE
and records. It seems the scenario is to send over 100 records, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, the test need to ensure all group can receive record continuously so we need to send record to each partition.
We make the number of partition equals MAX_GROUP_SIZE
and let all partition have records so it should be partition.
I will rename variable to partition to improve readability.
|
||
private final Set<TopicPartition> partitionAssignment = new HashSet<>(); | ||
private final Set<TopicPartition> partitionsToAssign; | ||
private final ConsumerRebalanceListener userRebalanceListener; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this variable is useless
|
||
assertInstanceOf(GroupMaxSizeReachedException.class, rejectedConsumer.getThrownException().get()); | ||
|
||
// assert group continues to live |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please enrich the comment? We expect the records to be distributed across all partitions.. For another, perhaps we can simplify the code.
var data = "data".getBytes(StandardCharsets.UTF_8);
try (Producer<byte[], byte[]> producer = clusterInstance.producer()) {
IntStream.range(0, numPartition * 100).forEach(index ->
producer.send(new ProducerRecord<>(topic, index % numPartition, data, data)));
}
This PR does the following: 1) Rewrites consumerBounceTest in Java. 2) Moves the test to clients-integration-test. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This PR does the following: 1) Rewrites consumerBounceTest in Java. 2) Moves the test to clients-integration-test. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This PR do following:
consumerBounceTest
to javaReviewers: Chia-Ping Tsai chia7712@gmail.com