-
Notifications
You must be signed in to change notification settings - Fork 18
[KIP-848]: Add testing changes and describe consumer group changes for KIP-848 #329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Adds support for running performance tests with the new Consumer Group Protocol (KIP-848) alongside the classic protocol by extending test helpers, consumer configuration, and CI setup.
- Introduces
TEST_CONSUMER_GROUP_PROTOCOL
env var and helper functions to toggle protocol behavior in tests. - Updates
createConsumer
and internal config mapping to validate and strip classic-only settings when using the new protocol. - Extends Docker Compose files and Semaphore CI pipeline to run perf tests under both classic and consumer protocols.
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
test/promisified/testhelpers.js | Adds testConsumerGroupProtocol* helpers and adjusts createConsumer to apply/remove settings based on protocol |
test/docker/kraft/server.properties | Configures KRaft broker with consumer group protocol settings |
test/docker/docker-compose-kraft.yml | Provides a Docker Compose definition for the KRaft broker used in consumer-protocol tests |
lib/kafkajs/_consumer.js | Extends #kafkaJSToConsumerConfig to accept an isClassicProtocol flag and enforce validations |
.semaphore/semaphore.yml | Adds two Semaphore CI jobs running perf tests for classic and consumer protocols |
Comments suppressed due to low confidence (3)
test/promisified/testhelpers.js:18
- Add a JSDoc or inline comment explaining that
TEST_CONSUMER_GROUP_PROTOCOL
should be set to either 'classic' or 'consumer', and what the helper returns in each case.
function testConsumerGroupProtocol() {
lib/kafkajs/_consumer.js:515
- For non-classic protocols, no default
partition.assignment.strategy
is set. If this is intentional, consider adding a comment to clarify fallback behavior or explicitly document the absence of a default strategy.
} else if (isClassicProtocol) {
.semaphore/semaphore.yml:170
- This line and the following commands lack a '+' prefix in the diff, so they won’t be added under the new consumer-protocol job. Ensure the
NODE_OPTIONS
, working directory change, andnpm install
commands are included in the patch.
- export NODE_OPTIONS='--max-old-space-size=1536'
@@ -129,6 +155,7 @@ class SequentialPromises { | |||
} | |||
|
|||
module.exports = { | |||
testConsumerGroupProtocolClassic, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Since testConsumerGroupProtocol
is defined but not exported, either export it for clarity or rename it (e.g., _testConsumerGroupProtocol
) to indicate it’s intended as a private helper.
Copilot uses AI. Check for mistakes.
This comment has been minimized.
This comment has been minimized.
6 similar comments
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
334c1d2
to
6b868fa
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
1 similar comment
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
* Added new fields in DescribeConsumerGroup * error debug * test changes * test enabled
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Majority of changes look good. I tested locally with AK 4 also. Some comments to address here and here.
Please add a CHANGELOG.md entry
lib/kafkajs/_consumer.js
Outdated
@@ -605,8 +621,10 @@ class Consumer { | |||
} | |||
|
|||
#finalizedConfig() { | |||
const protocol = this.#userConfig['group.protocol']; | |||
const isClassicProtocol = protocol === undefined || protocol === 'classic'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comparison should be case insensitive
@@ -12,7 +13,9 @@ describe('Consumer > incremental rebalance', () => { | |||
|
|||
const consumerConfig = { | |||
groupId, | |||
partitionAssigners: [PartitionAssigners.cooperativeSticky], | |||
...(testConsumerGroupProtocolClassic() && { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this syntax is quite hard to read, do it with an if block instead later
@@ -1,5 +1,5 @@ | |||
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS. | |||
const { Kafka, ConsumerGroupStates } = require('@confluentinc/kafka-javascript').KafkaJS; | |||
const { Kafka, ConsumerGroupStates, ConsumerGroupTypes } = require('@confluentinc/kafka-javascript').KafkaJS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since we're not actually using it, why have it here?
@@ -81,11 +82,12 @@ describe('Admin > describeGroups', () => { | |||
expect(describeGroupsResult.groups[0]).toEqual( | |||
expect.objectContaining({ | |||
groupId, | |||
protocol: 'roundrobin', | |||
partitionAssignor: 'roundrobin', | |||
protocol: expect.any(String), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to decide the actual value beforehand, stored it in a variable and replace it with 'expectedProtocol' or something like that?
This comment has been minimized.
This comment has been minimized.
CHANGELOG.md
Outdated
## Enhancements | ||
|
||
1. References librdkafka v2.11.0. Refer to the [librdkafka v2.11.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.0) for more information. | ||
2. [KIP-848] `describeGroups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to nil. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its defaults to nil
change to
"it defaults to being undefined"
This PR intends to make changes to run Performance Tests on Semaphore with the newly introduced Consumer Group Protocol (KIP 848).