Description
Environment Information
- OS: Mac M3 Sonoma 14.6.1
- Node Version: 22.12.0
- NPM Version: 10.9.0
- confluent-kafka-javascript version: 1.2.0
Context
The migration guide states:
An API compatible version of eachBatch is available, but the batch size calculation is not as per configured parameters, rather, a constant maximum size is configured internally. This is subject to change.
What is not stated:
- The maximum batch size is
32
. - The batch size of
32
is only reached after a ramp up through 10 additional batches of size:1
,1
,2
,2
,4
,4
,8
,8
,16
, and16
.
An inquiry to Confluent received the following response:
Regarding the actual batch size of 32:
This value is arbitrary for now, and we plan to make it configurable in the future. However, it is currently capped at 32, with no set timeline for when this will change.Regarding the implementation:
The batch size of messages retrieved by eachBatch does not directly correspond to the actual batch size received from Kafka. While we maintain API compatibility with KafkaJS, the behavior differs.
Internally, we prefetch larger batches of messages and then split them into smaller batches before delivering them to the user. This approach ensures that concerns about downstream messaging or additional network overhead for fetching messages do not cause issues.
We implemented it this way because, for now, the Kafka client library abstracts away per-partition details required by eachBatch, so we handle the splitting ourselves.
The actual size of messages fetched from Kafka over the network is configurable through the Kafka client library’s configuration properties.
I'd like to raise two points:
- We have several apps are designed to process large batches of small messages, and the efficiency of the app is directly dependent on the ability to aggregate many messages at once and reduce the number of downstream network interactions with databases or other services. With KafkaJS our batch sizes are typically in the 100's and 1000's. The Confluent limit of 32 is severely hampering, resulting in 5-200x (or more) increase in the number of downstream network interactions required to process the same volume of data.
- The ramp-up that with single-message batches makes it annoying difficult to write integration tests that exercise batch functionality. Prior tests written for KafkaJS batches become invalid after converting to Confluent; you can no longer make the assumption that if you seed 5 messages in a topic and start a consumer that you'll receive 1 batch of
5
messages... now it's 4 batches of1
,1
,2
, and1
.
Steps to Reproduce
The following code illustrates the batch size sequence/limit and provides a comparison with KafkaJS:
import {KafkaJS as Confluent, RdKafka} from "@confluentinc/kafka-javascript";
import {Admin, Consumer, Kafka, logLevel, Producer, EachBatchPayload} from "kafkajs";
const topicFn = () => `test-batch-sizes-${Date.now()}`;
const groupIdFn = () => `test-group-${Date.now()}`;
describe("Batch sizes", async () => {
let kafka: Kafka | Confluent.Kafka;
let admin: Admin | Confluent.Admin | undefined;
let producer: Producer | Confluent.Producer | undefined;
let consumer: Consumer | Confluent.Consumer | undefined;
let ready: boolean;
let topic: string;
let received: number;
let batches: number[];
before(() => {
ready = false;
});
beforeEach(async () => {
consumer!.pause([{topic}]);
received = 0;
batches = [];
});
describe("with KafkaJS", () => {
before(async () => {
kafka = new Kafka({brokers: ["localhost:9092"], logLevel: logLevel.NOTHING});
admin = kafka.admin();
await admin.connect();
topic = topicFn();
await admin.createTopics({topics: [{topic}]});
producer = kafka.producer();
await producer.connect();
consumer = kafka.consumer({groupId: groupIdFn()});
consumer.on(consumer.events.GROUP_JOIN, (event: any) => {
ready = true;
});
await consumer.connect();
await consumer.subscribe({topic, fromBeginning: true});
await consumer.run({eachBatch: doConsumer});
await until(() => ready);
});
it("10 messages", async () => doTest(10));
it("50 messages", async () => doTest(50));
it("100 messages", async () => doTest(100));
it("500 messages", async () => doTest(500));
it("1000 messages", async () => doTest(1000));
it("5000 messages", async () => doTest(5000));
after(async () => {
await consumer?.disconnect();
await producer?.disconnect();
await admin?.disconnect();
});
});
describe("with Confluent", () => {
before(async () => {
kafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"], logLevel: Confluent.logLevel.NOTHING}});
admin = kafka.admin();
await admin.connect();
topic = topicFn();
await admin.createTopics({topics: [{topic}]});
producer = kafka.producer();
await producer.connect();
consumer = kafka.consumer({
kafkaJS: {groupId: groupIdFn(), fromBeginning: true},
rebalance_cb: (err: any, assignment: any, consumer: any) => {
if (err.code !== RdKafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) return;
if (!ready) ready = true;
}
});
await consumer.connect();
await consumer.subscribe({topic});
await consumer.run({eachBatch: doConsumer});
await until(() => ready);
});
it("10 messages", async () => doTest(10));
it("50 messages", async () => doTest(50));
it("100 messages", async () => doTest(100));
it("500 messages", async () => doTest(500));
it("1000 messages", async () => doTest(1000));
it("5000 messages", async () => doTest(5000));
after(async () => {
await consumer?.disconnect();
await producer?.disconnect();
await admin?.disconnect();
});
});
async function doConsumer(payload: EachBatchPayload | Confluent.EachBatchPayload) {
const size = payload.batch.messages.length;
received += size;
batches.push(size);
}
async function doTest(count: number, sizeInKb: number = 1) {
await sendMessages(count, sizeInKb);
consumer!.resume([{topic}]);
await until(() => received === count);
console.log(`Processed ${received} ${sizeInKb}KB messages in ${batches.length} batches: ${batches}`);
}
async function sendMessages(count: number, sizeInKb: number) {
const messages = [];
for (let i = 0; i < count; i++) {
messages.push({value: payload(sizeInKb)});
}
await producer!.send({topic: topic, messages});
}
function payload(sizeInKb: number): string {
return `${"0".repeat(100)}1`.repeat(sizeInKb);
}
async function until(condition: () => boolean) {
const timeout = 60000;
const finish = Date.now() + timeout;
while (Date.now() <= finish) {
const result = condition();
if (result) return;
await new Promise(resolve => setTimeout(resolve, 50));
}
throw new Error(`Failed within ${timeout!}ms`);
}
});
With KafkaJS:
Processed 10 1KB messages in 1 batches: 10
Processed 50 1KB messages in 1 batches: 50
Processed 100 1KB messages in 1 batches: 100
Processed 500 1KB messages in 1 batches: 500
Processed 1000 1KB messages in 1 batches: 1000
Processed 5000 1KB messages in 1 batches: 5000
With Confluent:
Processed 10 1KB messages in 5 batches: 1,1,2,2,4
Processed 50 1KB messages in 10 batches: 1,1,2,2,4,4,8,8,16,4
Processed 100 1KB messages in 12 batches: 1,1,2,2,4,4,8,8,16,16,32,6
Processed 500 1KB messages in 24 batches: 1,1,2,2,4,4,8,8,16,16,32,32,32,32,32,32,32,32,32,32,32,32,32,22
Processed 1000 1KB messages in 40 batches: 1,1,2,2,4,4,8,8,16,16,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,10
Processed 5000 1KB messages in 165 batches: 1,1,2,2,4,4,8,8,16,16,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,32,10