-
Notifications
You must be signed in to change notification settings - Fork 22
Description
Environment Information
- OS [e.g. Mac, Arch, Windows 10]: Node JS Alpine container
- Node Version [e.g. 8.2.1]: v18.20.5
- NPM Version [e.g. 5.4.2]: 10.8.2
- C++ Toolchain [e.g. Visual Studio, llvm, g++]:
- confluent-kafka-javascript version [e.g. 2.3.3]: 1.2.0
Steps to Reproduce
use many producer.send calls, each with a big array of messages
confluent-kafka-javascript Configuration Settings
import { KafkaJS } from "@confluentinc/kafka-javascript";
const Kafka = KafkaJS.Kafka;
const kafakStr = "ws";
const kafka = new Kafka({
kafkaJS: {
clientId: kafakStr,
brokers: [process.env.KAFKA],
},
});
const kafkaObj = {
producer: kafka.producer({
"batch.size": 100000,
"linger.ms": 50,
"compression.type": "lz4",
kafkaJS: { acks: 1 },
}),
consumer: kafka.consumer({ kafkaJS: { groupId: kafakStr } }),
};
Additional context
my app is connected in a websocket to stock market data provider.
the provider is sending messages in a very fast rate, as much as 500,000 messages per 10s.
to avoid WS delays and messages drop, I move the load to later stages, by simply sending every incoming message in kafka.
the kafka producer is configure with "high throughput" parameters (see below).
the problematic time is around the market close (16:15 US ET), that's when the rate is fastest
the first symptom I saw was that the Node JS process was getting out of memory.
then I used --max-old-space-size=8192 in Node JS, and tried to avoid too many promises spawning, so instead of sending each message in a different promise, I aggregate them into 2 interchanging buckets, that are being sent every 10 ms.
after this latest change, I started getting the RangeError: Too many elements passed to Promise.all error.
it seems that still, with or without aggregating the messages (in the app, before sending) every message has a promise created for it.
any help is appreciated!
logs:
2025-03-12 16:13:47.349 Memory usage: RSS=598.38MB | HeapUsed=209.62MB | arrayBuffers=10.77MB | external=12.12MB | heapTotal=350.04MB
2025-03-12 16:14:17.925 Memory usage: RSS=1174.94MB | HeapUsed=538.16MB | arrayBuffers=57.72MB | external=59.06MB | heapTotal=726.14MB
2025-03-12 16:14:38.151 Memory usage: RSS=1781.49MB | HeapUsed=1254.74MB | arrayBuffers=82.97MB | external=84.31MB | heapTotal=1298.77MB
2025-03-12 16:14:58.199 Memory usage: RSS=2210.79MB | HeapUsed=1687.32MB | arrayBuffers=71.02MB | external=72.37MB | heapTotal=1743.24MB
2025-03-12 16:15:18.199 Memory usage: RSS=3028.63MB | HeapUsed=2464.37MB | arrayBuffers=91.61MB | external=92.95MB | heapTotal=2531.88MB
2025-03-12 16:15:39.413 Memory usage: RSS=3719.82MB | HeapUsed=3098.89MB | arrayBuffers=122.87MB | external=124.21MB | heapTotal=3177.13MB
2025-03-12 16:16:45.464 !!!!!!!!!!! uncaught exception: RangeError: Too many elements passed to Promise.all
2025-03-12 16:16:45.464 RangeError: Too many elements passed to Promise.all
at Function.all (<anonymous>)
at Producer.send (/app/node_modules/.pnpm/@confluentinc+kafka-javascript@1.2.0/node_modules/@confluentinc/kafka-javascript/lib/kafkajs/_producer.js:654:45)
at Timeout._onTimeout (file:///app/server.js:198:22)
at listOnTimeout (node:internal/timers:569:17)
at process.processTimers (node:internal/timers:512:7)
code:
setInterval(() => {
if (busySending) return;
busySending = true;
if (arrFilling === "A") {
arrFilling = "B";
if (messagesA.length) {
kafka.producer.send({ topic: "trades", messages: messagesA }).then(() => {
messagesA = [];
busySending = false;
});
} else {
busySending = false;
}
} else {
arrFilling = "A";
if (messagesB.length) {
kafka.producer.send({ topic: "trades", messages: messagesB }).then(() => {
messagesB = [];
busySending = false;
});
} else {
busySending = false;
}
}
}, 10);