Skip to content

Commit fa36eb1

Browse files
fix(kafkajs): include kafka_cluster_id in DSM backlog offset tracking (#7569)
* fix(kafkajs): include kafka_cluster_id in DSM backlog offset tracking DSM checkpoints correctly included kafka_cluster_id in edge tags, but the backlog/offset tracking (which feeds lag metrics like data_streams.kafka.lag_messages and data_streams.kafka.lag_seconds) did not. This caused cross-cluster offset mixing when the same topic exists on multiple Kafka clusters, producing wildly incorrect lag values. Thread clusterId through to setOffset calls for both producer and consumer commit paths so that backlog entries are scoped per cluster. DSMON-1226 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test(kafkajs): add comments explaining clusterIdAvailable version guard Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test(kafkajs): remove irrelevant comment from getDsmPathwayHash Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(kafkajs): add comment explaining resolvedClusterId safety Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent ef2b498 commit fa36eb1

File tree

4 files changed

+47
-26
lines changed

4 files changed

+47
-26
lines changed

packages/datadog-instrumentations/src/kafkajs.js

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,6 @@ const batchConsumerErrorCh = channel('apm:kafkajs:consume-batch:error')
2424

2525
const disabledHeaderWeakSet = new WeakSet()
2626

27-
function commitsFromEvent (event) {
28-
const { payload: { groupId, topics } } = event
29-
const commitList = []
30-
for (const { topic, partitions } of topics) {
31-
for (const { partition, offset } of partitions) {
32-
commitList.push({
33-
groupId,
34-
partition,
35-
offset,
36-
topic,
37-
})
38-
}
39-
}
40-
consumerCommitCh.publish(commitList)
41-
}
42-
4327
addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKafka) => {
4428
class Kafka extends BaseKafka {
4529
constructor (options) {
@@ -132,6 +116,7 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf
132116
}
133117

134118
const kafkaClusterIdPromise = getKafkaClusterId(this)
119+
let resolvedClusterId = null
135120

136121
const eachMessageExtractor = (args, clusterId) => {
137122
const { topic, partition, message } = args[0]
@@ -146,13 +131,31 @@ addHook({ name: 'kafkajs', file: 'src/index.js', versions: ['>=1.4'] }, (BaseKaf
146131

147132
const consumer = createConsumer.apply(this, arguments)
148133

149-
consumer.on(consumer.events.COMMIT_OFFSETS, commitsFromEvent)
134+
consumer.on(consumer.events.COMMIT_OFFSETS, (event) => {
135+
const { payload: { groupId: commitGroupId, topics } } = event
136+
const commitList = []
137+
for (const { topic, partitions } of topics) {
138+
for (const { partition, offset } of partitions) {
139+
commitList.push({
140+
groupId: commitGroupId,
141+
partition,
142+
offset,
143+
topic,
144+
clusterId: resolvedClusterId,
145+
})
146+
}
147+
}
148+
consumerCommitCh.publish(commitList)
149+
})
150150

151151
const run = consumer.run
152152
const groupId = arguments[0].groupId
153153

154154
consumer.run = function ({ eachMessage, eachBatch, ...runArgs }) {
155155
const wrapConsume = (clusterId) => {
156+
// In kafkajs COMMIT_OFFSETS always happens in the context of one synchronous run
157+
// So this will always reference a correct cluster id
158+
resolvedClusterId = clusterId
156159
return run({
157160
eachMessage: wrappedCallback(
158161
eachMessage,

packages/datadog-plugin-kafkajs/src/consumer.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,18 @@ class KafkajsConsumerPlugin extends ConsumerPlugin {
4040
* @returns {ConsumerBacklog}
4141
*/
4242
transformCommit (commit) {
43-
const { groupId, partition, offset, topic } = commit
44-
return {
43+
const { groupId, partition, offset, topic, clusterId } = commit
44+
const backlog = {
4545
partition,
4646
topic,
4747
type: 'kafka_commit',
4848
offset: Number(offset),
4949
consumer_group: groupId,
5050
}
51+
if (clusterId) {
52+
backlog.kafka_cluster_id = clusterId
53+
}
54+
return backlog
5155
}
5256

5357
commit (commitList) {

packages/datadog-plugin-kafkajs/src/producer.js

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,20 @@ class KafkajsProducerPlugin extends ProducerPlugin {
3737
* @param {ProducerResponseItem} response
3838
* @returns {ProducerBacklog}
3939
*/
40-
transformProduceResponse (response) {
40+
transformProduceResponse (response, clusterId) {
4141
// In produce protocol >=v3, the offset key changes from `offset` to `baseOffset`
4242
const { topicName: topic, partition, offset, baseOffset } = response
4343
const offsetAsLong = offset || baseOffset
44-
return {
44+
const backlog = {
4545
type: 'kafka_produce',
4646
partition,
4747
offset: offsetAsLong ? Number(offsetAsLong) : undefined,
4848
topic,
4949
}
50+
if (clusterId) {
51+
backlog.kafka_cluster_id = clusterId
52+
}
53+
return backlog
5054
}
5155

5256
/**
@@ -56,6 +60,7 @@ class KafkajsProducerPlugin extends ProducerPlugin {
5660
*/
5761
commit (ctx) {
5862
const commitList = ctx.result
63+
const clusterId = ctx.clusterId
5964

6065
if (!this.config.dsmEnabled) return
6166
if (!commitList || !Array.isArray(commitList)) return
@@ -65,7 +70,7 @@ class KafkajsProducerPlugin extends ProducerPlugin {
6570
'offset',
6671
'topic',
6772
]
68-
for (const commit of commitList.map(this.transformProduceResponse)) {
73+
for (const commit of commitList.map(r => this.transformProduceResponse(r, clusterId))) {
6974
if (keys.some(key => !commit.hasOwnProperty(key))) continue
7075
this.tracer.setOffset(commit)
7176
}

packages/datadog-plugin-kafkajs/test/dsm.spec.js

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -310,18 +310,27 @@ describe('Plugin', () => {
310310
assert.strictEqual(runArg?.offset, commitMeta.offset)
311311
assert.strictEqual(runArg?.partition, commitMeta.partition)
312312
assert.strictEqual(runArg?.topic, commitMeta.topic)
313-
assertObjectContains(runArg, {
313+
const expectedBacklog = {
314314
type: 'kafka_commit',
315315
consumer_group: 'test-group',
316-
})
316+
}
317+
// kafka_cluster_id is only available in kafkajs >=1.13
318+
if (clusterIdAvailable) {
319+
expectedBacklog.kafka_cluster_id = testKafkaClusterId
320+
}
321+
assertObjectContains(runArg, expectedBacklog)
317322
})
318323
}
319324

320325
it('Should add backlog on producer response', async () => {
321326
await sendMessages(kafka, testTopic, messages)
322327
sinon.assert.calledOnce(setOffsetSpy)
323-
const { topic } = setOffsetSpy.lastCall.args[0]
324-
assert.strictEqual(topic, testTopic)
328+
const runArg = setOffsetSpy.lastCall.args[0]
329+
assert.strictEqual(runArg.topic, testTopic)
330+
// kafka_cluster_id is only available in kafkajs >=1.13
331+
if (clusterIdAvailable) {
332+
assert.strictEqual(runArg.kafka_cluster_id, testKafkaClusterId)
333+
}
325334
})
326335
})
327336
})

0 commit comments

Comments
 (0)