fix(kafkajs): read clusterId from existing metadata#8389
Conversation
Overall package sizeSelf size: 5.82 MB Dependency sizes| name | version | self size | total size | |------|---------|-----------|------------| | import-in-the-middle | 3.0.1 | 82.56 kB | 817.39 kB | | dc-polyfill | 0.1.11 | 25.74 kB | 25.74 kB |🤖 This report was automatically generated by heaviest-objects-in-the-universe |
|
🎯 Code Coverage (details) 🔗 Commit SHA: de9ab75 | Docs | Datadog PR Page | Give us feedback! |
BenchmarksBenchmark execution time: 2026-05-12 12:24:07 Comparing candidate commit de9ab75 in PR branch Found 0 performance improvements and 0 performance regressions! Performance is the same for 1513 metrics, 80 unstable metrics. |
| addHook({ name: 'kafkajs', file: 'src/producer/index.js', versions: ['>=1.4'] }, (createProducer) => | ||
| shimmer.wrapFunction(createProducer, original => function wrappedCreateProducer (params) { | ||
| const producer = original(params) | ||
| if (params?.cluster) { | ||
| clientToCluster.set(producer, params.cluster) | ||
| } | ||
| return producer | ||
| }) | ||
| ) | ||
|
|
||
| addHook({ name: 'kafkajs', file: 'src/consumer/index.js', versions: ['>=1.4'] }, (createConsumer) => | ||
| shimmer.wrapFunction(createConsumer, original => function wrappedCreateConsumer (params) { | ||
| const consumer = original(params) | ||
| if (params?.cluster) { | ||
| clientToCluster.set(consumer, params.cluster) | ||
| } | ||
| return consumer | ||
| }) | ||
| ) |
There was a problem hiding this comment.
Why the need for wrapping the same functions that we already wrap below?
There was a problem hiding this comment.
These are different functions from different files. The one here receives the cluster property that is created inside of the method that we instrument below. It is the easiest and cleanest to get access to the cluster that way.
The cluster id discovery used a separate `kafka.admin()` connection whose `describeCluster()` rejection wasn't awaited, surfacing as `process.unhandledRejection` whenever the admin handshake failed (auth, broker restart, network blip). Read `cluster.brokerPool.metadata.clusterId` instead, priming it on first send via `cluster.refreshMetadataIfNecessary()`; kafkajs's `sharedPromiseTo` collapses our call with its internal one, so total latency is unchanged. The cluster reference for each producer/consumer is captured at creation time via two new `addHook` entries on `src/producer/index.js` / `src/consumer/index.js` and stored in a `WeakMap` exported from `helpers/kafka`. The kafkajs object stays untouched — no Symbol-keyed property to leak through `Reflect.ownKeys`, no string-keyed underscore for user serializers to pick up, and the entry drops as soon as the producer is GC'd. Refs: #5270 Refs: #8253
a1ae644 to
0d852ca
Compare
The cluster id discovery used a separate `kafka.admin()` connection whose `describeCluster()` rejection wasn't awaited, surfacing as `process.unhandledRejection` whenever the admin handshake failed (auth, broker restart, network blip). Read `cluster.brokerPool.metadata.clusterId` instead, priming it on first send via `cluster.refreshMetadataIfNecessary()`; kafkajs's `sharedPromiseTo` collapses our call with its internal one, so total latency is unchanged. The cluster reference for each producer/consumer is captured at creation time via two new `addHook` entries on `src/producer/index.js` / `src/consumer/index.js` and stored in a `WeakMap` exported from `helpers/kafka`. The kafkajs object stays untouched — no Symbol-keyed property to leak through `Reflect.ownKeys`, no string-keyed underscore for user serializers to pick up, and the entry drops as soon as the producer is GC'd. Refs: #5270 Refs: #8253
The cluster id discovery used a separate `kafka.admin()` connection whose `describeCluster()` rejection wasn't awaited, surfacing as `process.unhandledRejection` whenever the admin handshake failed (auth, broker restart, network blip). Read `cluster.brokerPool.metadata.clusterId` instead, priming it on first send via `cluster.refreshMetadataIfNecessary()`; kafkajs's `sharedPromiseTo` collapses our call with its internal one, so total latency is unchanged. The cluster reference for each producer/consumer is captured at creation time via two new `addHook` entries on `src/producer/index.js` / `src/consumer/index.js` and stored in a `WeakMap` exported from `helpers/kafka`. The kafkajs object stays untouched — no Symbol-keyed property to leak through `Reflect.ownKeys`, no string-keyed underscore for user serializers to pick up, and the entry drops as soon as the producer is GC'd. Refs: #5270 Refs: #8253
Summary
The cluster id discovery used a separate
kafka.admin()connection whosedescribeCluster()rejection wasn't awaited, surfacing asprocess.unhandledRejectionwhenever the admin handshake failed (auth, broker restart, network blip).cluster.brokerPool.metadata.clusterIdinstead, priming it on first send viacluster.refreshMetadataIfNecessary(). kafkajs'ssharedPromiseTocollapses our call with its internal one, so total latency is unchanged.addHookentries onsrc/producer/index.js/src/consumer/index.jsand stored in aWeakMapexported fromhelpers/kafka. The kafkajs object stays untouched — no Symbol-keyed property to leak throughReflect.ownKeys, no string-keyed underscore for user serializers to pick up.The DSM spec drops its
clusterIdAvailable = semver.intersects(version, '>=1.13')flag because the metadata path works on every kafkajs version this integration targets (>=1.4); the admin-based discovery only worked on >=1.13.Test plan
PLUGINS=kafkajs SPEC=index npm run test:plugins— pins that nokafka.admin()connection is opened during normal send (regression test for the unhandled-rejection failure mode).PLUGINS=kafkajs SPEC=dsm npm run test:plugins— pins thekafka_cluster_idtag is set across the entire kafkajs version matrix.Stacked on top of #8388.
Refs: #5270
Refs: #8253