Skip to content

List consumer group offsets #49

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 118 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
118 commits
Select commit Hold shift + click to select a range
63f05de
1st commit
PratRanj07 May 20, 2024
4445321
2nd commit
PratRanj07 May 20, 2024
28aff0a
3rd commit
PratRanj07 May 20, 2024
0b3a617
Added tests and examples
PratRanj07 May 21, 2024
3c4a907
Formatting
PratRanj07 May 21, 2024
76639a6
Formatting
PratRanj07 May 21, 2024
6ff253b
little change
PratRanj07 May 21, 2024
925fd21
some small changes
PratRanj07 May 21, 2024
5b769a8
Merge branch 'dev_early_access_development_branch' into listConsumerG…
PratRanj07 May 22, 2024
195a922
changes requested
PratRanj07 May 30, 2024
380ee83
requested changes
PratRanj07 May 30, 2024
254765b
requested changes
PratRanj07 Jun 3, 2024
3ba54a1
name change
PratRanj07 Jun 3, 2024
ffe16f7
indentation
PratRanj07 Jun 3, 2024
a98699a
indentation
PratRanj07 Jun 4, 2024
160f762
Add data-governance to code owners for schema registry clients (#52)
emasab Jun 4, 2024
2a99288
Fix deprecation warning
milindl Jun 5, 2024
28e28e3
Separate eachMessage and eachBatch internal consume loop
milindl Jun 5, 2024
ffbbafd
Update performance example with more cases
milindl Jun 10, 2024
42004b1
Add per-partition cache with global expiry
milindl Jun 10, 2024
f949d88
Add per-partition cache expiry logic
milindl Jun 12, 2024
7f24913
Allow cache to disburse multiple messages at once
milindl Jun 13, 2024
fbbf9f2
Add per-partition concurrency
milindl Jun 17, 2024
8c11d0e
Add partition level concurrency to faux-eachBatch
milindl Jun 17, 2024
98ba984
Create persistent workers for per-partition concurrency, prevents exc…
milindl Jun 20, 2024
4f0f25b
Fix tests for Per Partition Concurrency
milindl Jun 24, 2024
ba0603b
Add message set capability to message cache
milindl Jun 25, 2024
fdf56ef
Add naive batching (without resolution handling)
milindl Jul 1, 2024
5ecf261
Add batch staleness, resolution, and offset management to eachBatch
milindl Jul 2, 2024
b0e4372
Update tests for true eachBatch
milindl Jul 2, 2024
ac0bece
Remove debug-only properties
milindl Jul 8, 2024
72305d2
Update MIGRATION.md for eachBatch
milindl Jul 8, 2024
f4b4aaf
Bump version
milindl Jul 8, 2024
31e325c
Fix linting and Makefile issues (#2)
milindl Aug 6, 2024
89e8227
Add SchemaRegistryClient, RestService, and testing (#1)
Claimundefine Aug 8, 2024
f34e086
Add mock client for testing
Claimundefine Aug 9, 2024
6c919ff
Remove testing artifacts
Claimundefine Aug 9, 2024
69daca9
Fix flaky e2e tests (#54)
PratRanj07 Aug 12, 2024
4f2d255
Preset fix (#6)
Claimundefine Aug 13, 2024
14d33b6
Do not modify RegExps which don't start with a ^
milindl Aug 13, 2024
9fe9571
Fix argument mutation in run, pause and resume
milindl Aug 14, 2024
1dcfe39
Dekregistry client (#67)
Claimundefine Aug 19, 2024
b69e87f
Add clientConfig, baseUrl retry, RestError, encodeURIComponent (#12) …
Claimundefine Aug 21, 2024
d73a14d
Update tsconfig.json (#69)
rayokota Aug 21, 2024
bc059a4
Fix broken tests (#70)
Claimundefine Aug 21, 2024
a85cda0
Add commitCb method (#59)
emasab Aug 22, 2024
4b9b340
Fix eslint config (#71)
rayokota Aug 22, 2024
3aab3c2
Add eslint rules (#72)
rayokota Aug 22, 2024
2bbb2af
First cut at JavaScript serdes (#73)
rayokota Aug 23, 2024
f724ed8
Add assign/unassign within rebalance callbacks
milindl Aug 27, 2024
a348985
Add performance benchmarking script modes and README
milindl Sep 11, 2024
15fff05
Add confluent debian repo for performance benchmark
milindl Sep 11, 2024
ffae694
Remove store from promisified API
milindl Aug 12, 2024
aceae76
Add binding level debug logging and client name to logs
milindl Sep 11, 2024
eddaabc
Fix typo in script name
milindl Sep 11, 2024
8bd4940
First cut at Data Contract rules (#77)
rayokota Sep 11, 2024
3d54a18
Separate SR into a different workspace (#78)
rayokota Sep 12, 2024
34302ba
Refactor to always use a barrier for pending operation (#26)
emasab Sep 13, 2024
ad06919
Schemaregistry rebase (#33) (#80)
Claimundefine Sep 13, 2024
9b88c91
Add Docker environment for integration tests (#34) (#81)
Claimundefine Sep 13, 2024
5424a4a
Fix log level config in light of binding logs
milindl Sep 14, 2024
3ca8437
Remove consumerGroupId argument from sendOffsets and add tests (#82)
milindl Sep 14, 2024
d2b7227
Performance measurement improvements
emasab Sep 15, 2024
546df33
Admin examples for available APIs (#84)
emasab Sep 15, 2024
cd0887a
Fix listGroups segfault when passing an undefined matchConsumerGroupS…
emasab Sep 16, 2024
5c637c0
Add more unit tests; minor fixes for KMS clients (#86)
rayokota Sep 16, 2024
cbc69be
Bump version to 0.1.17-devel
milindl Sep 17, 2024
ecdd836
Add complex encryption tests (#89)
rayokota Sep 17, 2024
1b77019
Add index.ts (#91)
rayokota Sep 20, 2024
ac1367c
Enhance HighLevelProducer to take schema serializers (#92)
rayokota Sep 20, 2024
71c4aeb
Add auth features (#47) (#94)
Claimundefine Sep 20, 2024
ffbffe8
Add more JSON Schema validation tests (#95)
rayokota Sep 20, 2024
5adb821
Move ts-jest to dev dependencies (#96)
rayokota Sep 20, 2024
b6379d3
Add JSON integration tests (#46) (#97)
Claimundefine Sep 22, 2024
49e12c6
Unsubscribe before disconnecting to mitigate hangs on destroy (#98)
milindl Sep 24, 2024
5356f81
Pass creds to DEK Registry client (#99)
rayokota Sep 24, 2024
a8e5b39
Bump version to 0.2.0 and drop -devel (#100)
milindl Sep 25, 2024
8b41c1e
Remove mandatory basic or bearer auth credentials (#57) (#101)
Claimundefine Sep 25, 2024
69b28a5
Add build script and readme (#104)
rayokota Sep 26, 2024
a8e3914
Add license (#105)
rayokota Sep 26, 2024
acc94a4
Add clearLatestCaches/clearCaches API, fix test to call clearLatestCa…
rayokota Sep 26, 2024
12cf126
Add avro integration tests (#56) (#106)
Claimundefine Sep 27, 2024
12e33c9
Add tsdoc (#107)
rayokota Sep 27, 2024
98f12f8
Enhance docs (#108)
rayokota Sep 27, 2024
9c7f096
Update schemaregistry README (#109)
rayokota Sep 28, 2024
63a949f
Add restService interfaces to exported types (#110)
rayokota Sep 28, 2024
ad0ff8c
Rename DekClient to avoid conflict with Client (#112)
rayokota Sep 30, 2024
52944ea
Schemaregistry examples (#69) (#113)
Claimundefine Sep 30, 2024
228f64b
Add schemaregistry examples workspace with avro, json, and csfle exam…
Claimundefine Oct 1, 2024
3431a92
bugfix integ tests for registering -value (#71) (#115)
Claimundefine Oct 1, 2024
5cc2dee
Bump version to v0.2.1 (#116)
milindl Oct 3, 2024
73ca334
Update version to 0.2.1 for EA release (#72) (#117)
Claimundefine Oct 10, 2024
4c7c8df
Add Kafka Oauth implementation (#74) (#119)
Claimundefine Oct 11, 2024
fad64ce
Upgrade librdkafka to v2.6.0 (#120)
emasab Oct 14, 2024
a86c3b4
Bump version to 0.3.0-RC1 and: (#122)
emasab Oct 17, 2024
b3712ba
v0.3.0 (#126)
emasab Oct 17, 2024
1501a64
Minor optimization to reduce schema ID lookups (#123)
rayokota Oct 17, 2024
29bc526
v0.3.0-RC2 (#127)
emasab Oct 18, 2024
4e42726
v0.3.0 final release (#128)
emasab Oct 18, 2024
e3de7e4
Fix header conversion in eachBatch (#130)
milindl Oct 21, 2024
0f3a167
1st commit
PratRanj07 May 20, 2024
1bde73a
2nd commit
PratRanj07 May 20, 2024
ce5a4e9
3rd commit
PratRanj07 May 20, 2024
beafa7c
changes requested
PratRanj07 May 30, 2024
a7c5aca
requested changes
PratRanj07 May 30, 2024
99b0252
required Changes
PratRanj07 Oct 25, 2024
f6f5b54
Merge master
PratRanj07 Oct 25, 2024
603ca2e
remove unnecessary changes
PratRanj07 Oct 25, 2024
2f86c63
indentation and unnecessary changes
PratRanj07 Oct 25, 2024
a758b90
indentation
PratRanj07 Oct 25, 2024
7085111
comment removed
PratRanj07 Oct 25, 2024
b2e28fa
comment added
PratRanj07 Oct 25, 2024
92f262d
changelog entry
PratRanj07 Oct 25, 2024
2d90d5b
Changed topic partition js to c conversion structure
PratRanj07 Oct 28, 2024
c435397
refactoring
PratRanj07 Oct 30, 2024
7f6dd40
Requested changes
PratRanj07 Oct 30, 2024
1c1cfe8
final changes
PratRanj07 Nov 4, 2024
3c494e4
Merge master
PratRanj07 Nov 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# confluent-kafka-javascript v0.3.1
# confluent-kafka-javascript v0.4.0

v0.3.1 is a limited availability maintenance release. It is supported for all usage.
v0.4.0 is a limited availability feature release. It is supported for all usage.

## Enhancements

1. Fixes an issue where headers were not passed correctly to the `eachBatch` callback (#130).
2. Add support for an Admin API to list a consumer group's offsets (#49).


# confluent-kafka-javascript v0.3.0
Expand Down
2 changes: 2 additions & 0 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ The admin-client only has support for a limited subset of methods, with more to
* The `describeGroups` method is supported with additional `timeout` and `includeAuthorizedOperations` options.
A number of additional properties have been added to the returned groups.
* The `deleteGroups` method is supported with an additional `timeout` option.
* The `fetchOffsets` method is supported with additional `timeout` and
`requireStableOffsets` option but `resolveOffsets` option is not yet supported.

### Using the Schema Registry

Expand Down
96 changes: 96 additions & 0 deletions examples/kafkajs/admin/fetch-offsets.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
const { parseArgs } = require('node:util');

async function fetchOffsets() {
const args = parseArgs({
allowPositionals: true,
options: {
'bootstrap-servers': {
type: 'string',
short: 'b',
default: 'localhost:9092',
},
'timeout': {
type: 'string',
short: 'm',
default: '5000',
},
'require-stable-offsets': {
type: 'boolean',
short: 'r',
default: false,
},
},
});

const {
'bootstrap-servers': bootstrapServers,
timeout,
'require-stable-offsets': requireStableOffsets,
} = args.values;

const [groupId, ...rest] = args.positionals;

if (!groupId) {
console.error('Group ID is required');
process.exit(1);
}

const kafka = new Kafka({
kafkaJS: {
brokers: [bootstrapServers],
},
});

const admin = kafka.admin();
await admin.connect();

try {
// Parse topics and partitions from remaining arguments
const topicInput = parseTopicsAndPartitions(rest);

// Fetch offsets for the specified consumer group
const offsets = await admin.fetchOffsets({
groupId: groupId,
topics: topicInput,
requireStableOffsets,
timeout: Number(timeout),
});

console.log(`Offsets for Consumer Group "${groupId}":`, JSON.stringify(offsets, null, 2));
} catch (err) {
console.error('Error fetching consumer group offsets:', err);
} finally {
await admin.disconnect();
}
}

// Helper function to parse topics and partitions from arguments
function parseTopicsAndPartitions(args) {
if (args.length === 0) return undefined;

const topicInput = [];
let i = 0;

while (i < args.length) {
const topic = args[i];
i++;

const partitions = [];
while (i < args.length && !isNaN(args[i])) {
partitions.push(Number(args[i]));
i++;
}

// Add topic with partitions (or an empty array if no partitions specified)
if (partitions.length > 0) {
topicInput.push({ topic, partitions });
} else {
topicInput.push(topic); // Add as a string if no partitions specified
}
}

return topicInput;
}

fetchOffsets();
47 changes: 47 additions & 0 deletions lib/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -437,3 +437,50 @@ AdminClient.prototype.listTopics = function (options, cb) {
}
});
};

/**
* List offsets for topic partition(s) for consumer group(s).
*
* @param {import("../../types/rdkafka").ListGroupOffsets} listGroupOffsets - The list of groupId, partitions to fetch offsets for.
* If partitions is null, list offsets for all partitions
* in the group.
* @param {number?} options.timeout - The request timeout in milliseconds.
* May be unset (default: 5000)
* @param {boolean?} options.requireStableOffsets - Whether broker should return stable offsets
* (transaction-committed). (default: false)
*
* @param {function} cb - The callback to be executed when finished.
*/
AdminClient.prototype.listConsumerGroupOffsets = function (listGroupOffsets, options, cb) {

if (!this._isConnected) {
throw new Error('Client is disconnected');
}

if (!listGroupOffsets[0].groupId) {
throw new Error('groupId must be provided');
}


if (!Object.hasOwn(options, 'timeout')) {
options.timeout = 5000;
}

if (!Object.hasOwn(options, 'requireStableOffsets')) {
options.requireStableOffsets = false;
}

this._client.listConsumerGroupOffsets(listGroupOffsets, options, function (err, offsets) {
if (err) {
if (cb) {
cb(LibrdKafkaError.create(err));
}
return;
}

if (cb) {
cb(null, offsets);
}
});
};

117 changes: 117 additions & 0 deletions lib/kafkajs/_admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,123 @@ class Admin {
});
});
}

/**
* Fetch the offsets for topic partition(s) for consumer group(s).
*
* @param {string} options.groupId - The group ID to fetch offsets for.
* @param {import("../../types/kafkajs").TopicInput} options.topics - The topics to fetch offsets for.
* @param {boolean} options.resolveOffsets - not yet implemented
* @param {number?} options.timeout - The request timeout in milliseconds.
* May be unset (default: 5000)
* @param {boolean?} options.requireStableOffsets - Whether broker should return stable offsets
* (transaction-committed). (default: false)
*
* @returns {Promise<Array<topic: string, partitions: import('../../types/kafkajs').FetchOffsetsPartition>>}
*/
async fetchOffsets(options = {}) {
if (this.#state !== AdminState.CONNECTED) {
throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE });
}

if (Object.hasOwn(options, "resolveOffsets")) {
throw new error.KafkaJSError("resolveOffsets is not yet implemented.", { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
}

const { groupId, topics } = options;

if (!groupId) {
throw new error.KafkaJSError("groupId is required.", { code: error.ErrorCodes.ERR__INVALID_ARG });
}

let partitions = null;
let originalTopics = null;

/*
If the input is a list of topic string, the user expects us to
fetch offsets for all all partitions of all the input topics. In
librdkafka, we can only fetch offsets by topic partitions, or else,
we can fetch all of them. Thus, we must fetch offsets for all topic
partitions (by settings partitions to null) and filter by the topic strings later.
*/
if (topics && Array.isArray(topics)) {
if (typeof topics[0] === 'string') {
originalTopics = topics;
partitions = null;
} else if (typeof topics[0] === 'object' && Array.isArray(topics[0].partitions)) {
partitions = topics.flatMap(topic => topic.partitions.map(partition => ({
topic: topic.topic,
partition
})));
} else {
throw new error.KafkaJSError("Invalid topics format.", { code: error.ErrorCodes.ERR__INVALID_ARG });
}
}

const listGroupOffsets = [{
groupId,
partitions
}];


return new Promise((resolve, reject) => {
this.#internalClient.listConsumerGroupOffsets(listGroupOffsets, options, (err, offsets) => {
if (err) {
reject(createKafkaJsErrorFromLibRdKafkaError(err));
} else {

/**
* Offsets is an array of group results, each containing a group id,
* an error and an array of partitions.
* We need to convert it to the required format of an array of topics, each
* containing an array of partitions.
*/
const topicPartitionMap = new Map();

if (offsets.length !== 1) {
reject(new error.KafkaJSError("Unexpected number of group results."));
return;
}

const groupResult = offsets[0];

if (groupResult.error) {
reject(createKafkaJsErrorFromLibRdKafkaError(groupResult.error));
return;
}

// Traverse the partitions and group them by topic
groupResult.partitions.forEach(partitionObj => {
const { topic, partition, offset, leaderEpoch, metadata, error } = partitionObj;
const fetchOffsetsPartition = {
partition: partition,
offset: String(offset),
metadata: metadata || null,
leaderEpoch: leaderEpoch || null,
error: error || null
};

// Group partitions by topic
if (!topicPartitionMap.has(topic)) {
topicPartitionMap.set(topic, []);
}
topicPartitionMap.get(topic).push(fetchOffsetsPartition);
});

// Convert the map back to the desired array format
let convertedOffsets = Array.from(topicPartitionMap, ([topic, partitions]) => ({
topic,
partitions
}));

if (originalTopics !== null) {
convertedOffsets = convertedOffsets.filter(convertedOffset => originalTopics.includes(convertedOffset.topic));
}
resolve(convertedOffsets);
}
});
});
}
}

module.exports = {
Expand Down
Loading