From 0b12b525e410e6e3dfd497a27f750ee58c3aff9d Mon Sep 17 00:00:00 2001 From: Pablo Reszczynski Date: Wed, 15 Oct 2025 15:06:58 -0300 Subject: [PATCH 1/5] Add configurable maxBatchSize to consumer.run() Allow users to configure the maximum batch size when using eachBatch handler instead of the hardcoded value of 32. Changes: - Add maxBatchSize parameter to ConsumerRunConfig type - Update run() method to accept and validate maxBatchSize - Default to 32 to maintain backwards compatibility - Validate that maxBatchSize is a positive integer - Update #runInternal to use configurable value - Add JSDoc documentation for the new parameter - Add comprehensive test coverage for the feature The maxBatchSize parameter controls how many messages are included in a single batch when using the eachBatch handler. Users can now tune this based on their message size and processing requirements. Tests verify default behavior, custom values, validation, large batch sizes, and that the parameter only affects eachBatch (not eachMessage). --- lib/kafkajs/_consumer.js | 14 +- .../consumer/consumeMessages.spec.js | 153 ++++++++++++++++++ types/kafkajs.d.ts | 1 + 3 files changed, 166 insertions(+), 2 deletions(-) diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index ada184f8..982a1453 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -1214,6 +1214,7 @@ class Consumer { * @param {function?} config.eachBatch - The function to call for processing each batch of messages - can only be set if eachMessage is not set. * @param {boolean?} config.eachBatchAutoResolve - Whether to automatically resolve offsets for each batch (only applicable if eachBatch is set, true by default). * @param {number?} config.partitionsConsumedConcurrently - The limit to the number of partitions consumed concurrently (1 by default). + * @param {number?} config.maxBatchSize - The maximum number of messages to include in a batch when using eachBatch (32 by default). */ async run(config) { if (this.#state !== ConsumerState.CONNECTED) { @@ -1249,6 +1250,16 @@ class Consumer { configCopy.partitionsConsumedConcurrently = 1; } + if (!Object.hasOwn(config, 'maxBatchSize')) { + configCopy.maxBatchSize = 32; + } else if (typeof config.maxBatchSize !== 'number' || + config.maxBatchSize <= 0 || + !Number.isInteger(config.maxBatchSize)) { + throw new error.KafkaJSError( + 'maxBatchSize must be a positive integer.', + { code: error.ErrorCodes.ERR__INVALID_ARG }); + } + this.#messageCache = new MessageCache(this.#logger); /* We deliberately don't await this because we want to return from this method immediately. */ this.#runInternal(configCopy); @@ -1559,8 +1570,7 @@ class Consumer { async #runInternal(config) { this.#concurrency = config.partitionsConsumedConcurrently; const perMessageProcessor = config.eachMessage ? this.#messageProcessor : this.#batchProcessor; - /* TODO: make this dynamic, based on max batch size / size of last message seen. */ - const maxBatchSize = 32; + const maxBatchSize = config.maxBatchSize; const fetcher = config.eachMessage ? (savedIdx) => this.#consumeSingleCached(savedIdx) : (savedIdx) => this.#consumeCachedN(savedIdx, maxBatchSize); diff --git a/test/promisified/consumer/consumeMessages.spec.js b/test/promisified/consumer/consumeMessages.spec.js index ca9204c6..62d95db4 100644 --- a/test/promisified/consumer/consumeMessages.spec.js +++ b/test/promisified/consumer/consumeMessages.spec.js @@ -907,4 +907,157 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit await waitFor(() => (messagesConsumed === (partition0ProducedMessages + messages1.length + messages2.length)), () => { }, 100); }); + it('uses default maxBatchSize of 32 when not specified', async () => { + await producer.connect(); + await consumer.connect(); + await consumer.subscribe({ topic: topicName }); + + const batchSizes = []; + consumer.run({ + partitionsConsumedConcurrently, + eachBatch: async ({ batch }) => { + batchSizes.push(batch.messages.length); + } + }); + + /* Produce enough messages to see batch size growth pattern */ + const messages = Array(128) + .fill() + .map(() => { + const value = secureRandom(); + return { value: `value-${value}`, partition: 0 }; + }); + + await producer.send({ topic: topicName, messages }); + await waitFor(() => batchSizes.length >= 7, () => { }, 100); + + /* With default batch size of 32, we should see growth pattern: + * 1, 1, 2, 2, 4, 4, 8, 8, 16, 16, 32, 32, ... + * The maximum batch size should be capped at 32 */ + const maxBatchSize = Math.max(...batchSizes); + expect(maxBatchSize).toBeLessThanOrEqual(32); + }); + + it('respects custom maxBatchSize value', async () => { + await producer.connect(); + await consumer.connect(); + await consumer.subscribe({ topic: topicName }); + + const customMaxBatchSize = 10; + const batchSizes = []; + consumer.run({ + partitionsConsumedConcurrently, + maxBatchSize: customMaxBatchSize, + eachBatch: async ({ batch }) => { + batchSizes.push(batch.messages.length); + } + }); + + /* Produce enough messages to exceed the custom batch size */ + const messages = Array(64) + .fill() + .map(() => { + const value = secureRandom(); + return { value: `value-${value}`, partition: 0 }; + }); + + await producer.send({ topic: topicName, messages }); + await waitFor(() => batchSizes.length >= 5, () => { }, 100); + + /* All batches should be at most customMaxBatchSize */ + const maxBatchSize = Math.max(...batchSizes); + expect(maxBatchSize).toBeLessThanOrEqual(customMaxBatchSize); + }); + + it('handles large maxBatchSize correctly', async () => { + await producer.connect(); + await consumer.connect(); + await consumer.subscribe({ topic: topicName }); + + const customMaxBatchSize = 200; + const batchSizes = []; + let messagesConsumed = 0; + consumer.run({ + partitionsConsumedConcurrently, + maxBatchSize: customMaxBatchSize, + eachBatch: async ({ batch }) => { + batchSizes.push(batch.messages.length); + messagesConsumed += batch.messages.length; + } + }); + + const totalMessages = 150; + const messages = Array(totalMessages) + .fill() + .map(() => { + const value = secureRandom(); + return { value: `value-${value}`, partition: 0 }; + }); + + await producer.send({ topic: topicName, messages }); + await waitFor(() => messagesConsumed >= totalMessages, () => { }, 100); + + /* All batches should be at most customMaxBatchSize */ + const maxBatchSize = Math.max(...batchSizes); + expect(maxBatchSize).toBeLessThanOrEqual(customMaxBatchSize); + expect(messagesConsumed).toEqual(totalMessages); + }); + + it('throws error when maxBatchSize is not a positive integer', async () => { + await consumer.connect(); + await consumer.subscribe({ topic: topicName }); + + /* Negative value */ + expect(() => consumer.run({ + maxBatchSize: -1, + eachBatch: async () => { } + })).toThrow('maxBatchSize must be a positive integer'); + + /* Zero */ + expect(() => consumer.run({ + maxBatchSize: 0, + eachBatch: async () => { } + })).toThrow('maxBatchSize must be a positive integer'); + + /* Non-integer */ + expect(() => consumer.run({ + maxBatchSize: 3.5, + eachBatch: async () => { } + })).toThrow('maxBatchSize must be a positive integer'); + + /* Non-number */ + expect(() => consumer.run({ + maxBatchSize: 'invalid', + eachBatch: async () => { } + })).toThrow('maxBatchSize must be a positive integer'); + }); + + it('maxBatchSize only affects eachBatch, not eachMessage', async () => { + await producer.connect(); + await consumer.connect(); + await consumer.subscribe({ topic: topicName }); + + const messagesConsumed = []; + consumer.run({ + partitionsConsumedConcurrently, + maxBatchSize: 5, /* Should be ignored for eachMessage */ + eachMessage: async (event) => { + messagesConsumed.push(event); + } + }); + + const messages = Array(20) + .fill() + .map(() => { + const value = secureRandom(); + return { value: `value-${value}`, partition: 0 }; + }); + + await producer.send({ topic: topicName, messages }); + await waitForMessages(messagesConsumed, { number: messages.length }); + + /* Each message should be processed individually */ + expect(messagesConsumed.length).toEqual(messages.length); + }); + }); diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts index 6045a77e..cab95f6a 100644 --- a/types/kafkajs.d.ts +++ b/types/kafkajs.d.ts @@ -328,6 +328,7 @@ export type ConsumerSubscribeTopics = { topics: (string | RegExp)[]; replace?: b export type ConsumerRunConfig = { eachBatchAutoResolve?: boolean, partitionsConsumedConcurrently?: number, + maxBatchSize?: number, eachMessage?: EachMessageHandler eachBatch?: EachBatchHandler } From e375e055fdf81aca6110c1fcf8d579bfea02ab92 Mon Sep 17 00:00:00 2001 From: Pablo Reszczynski Date: Wed, 15 Oct 2025 15:34:17 -0300 Subject: [PATCH 2/5] Update test/promisified/consumer/consumeMessages.spec.js Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../promisified/consumer/consumeMessages.spec.js | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/test/promisified/consumer/consumeMessages.spec.js b/test/promisified/consumer/consumeMessages.spec.js index 62d95db4..b2250441 100644 --- a/test/promisified/consumer/consumeMessages.spec.js +++ b/test/promisified/consumer/consumeMessages.spec.js @@ -1008,28 +1008,28 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit await consumer.subscribe({ topic: topicName }); /* Negative value */ - expect(() => consumer.run({ + await expect(consumer.run({ maxBatchSize: -1, eachBatch: async () => { } - })).toThrow('maxBatchSize must be a positive integer'); + })).rejects.toThrow('maxBatchSize must be a positive integer'); /* Zero */ - expect(() => consumer.run({ + await expect(consumer.run({ maxBatchSize: 0, eachBatch: async () => { } - })).toThrow('maxBatchSize must be a positive integer'); + })).rejects.toThrow('maxBatchSize must be a positive integer'); /* Non-integer */ - expect(() => consumer.run({ + await expect(consumer.run({ maxBatchSize: 3.5, eachBatch: async () => { } - })).toThrow('maxBatchSize must be a positive integer'); + })).rejects.toThrow('maxBatchSize must be a positive integer'); /* Non-number */ - expect(() => consumer.run({ + await expect(consumer.run({ maxBatchSize: 'invalid', eachBatch: async () => { } - })).toThrow('maxBatchSize must be a positive integer'); + })).rejects.toThrow('maxBatchSize must be a positive integer'); }); it('maxBatchSize only affects eachBatch, not eachMessage', async () => { From bf20f1c0e91a741f2942ce0ef9366252f264d4e7 Mon Sep 17 00:00:00 2001 From: Pablo Reszczynski Date: Wed, 15 Oct 2025 15:34:25 -0300 Subject: [PATCH 3/5] Update lib/kafkajs/_consumer.js Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- lib/kafkajs/_consumer.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 982a1453..d0c5f24d 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -1258,6 +1258,8 @@ class Consumer { throw new error.KafkaJSError( 'maxBatchSize must be a positive integer.', { code: error.ErrorCodes.ERR__INVALID_ARG }); + } else { + configCopy.maxBatchSize = config.maxBatchSize; } this.#messageCache = new MessageCache(this.#logger); From 1d27ac8a11be1d4f81a1520cba35921233ed7b51 Mon Sep 17 00:00:00 2001 From: Pablo Reszczynski Date: Wed, 15 Oct 2025 15:34:40 -0300 Subject: [PATCH 4/5] Update lib/kafkajs/_consumer.js Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- lib/kafkajs/_consumer.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index d0c5f24d..3687ffc9 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -1256,7 +1256,7 @@ class Consumer { config.maxBatchSize <= 0 || !Number.isInteger(config.maxBatchSize)) { throw new error.KafkaJSError( - 'maxBatchSize must be a positive integer.', + 'maxBatchSize must be a positive integer', { code: error.ErrorCodes.ERR__INVALID_ARG }); } else { configCopy.maxBatchSize = config.maxBatchSize; From 161faf60df3914643b77bcf53ae64842cfcee265 Mon Sep 17 00:00:00 2001 From: Pablo Reszczynski Date: Wed, 15 Oct 2025 15:06:58 -0300 Subject: [PATCH 5/5] Add configurable maxBatchSize to consumer.run() Allow users to configure the maximum batch size when using eachBatch handler instead of the hardcoded value of 32. Changes: - Add maxBatchSize parameter to ConsumerRunConfig type - Update run() method to accept and validate maxBatchSize - Default to 32 to maintain backwards compatibility - Validate that maxBatchSize is a positive integer - Update #runInternal to use configurable value - Add JSDoc documentation for the new parameter - Add comprehensive test coverage for the feature The maxBatchSize parameter controls how many messages are included in a single batch when using the eachBatch handler. Users can now tune this based on their message size and processing requirements. Tests verify default behavior, custom values, validation, large batch sizes, and that the parameter only affects eachBatch (not eachMessage). --- lib/kafkajs/_consumer.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 3687ffc9..d0c5f24d 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -1256,7 +1256,7 @@ class Consumer { config.maxBatchSize <= 0 || !Number.isInteger(config.maxBatchSize)) { throw new error.KafkaJSError( - 'maxBatchSize must be a positive integer', + 'maxBatchSize must be a positive integer.', { code: error.ErrorCodes.ERR__INVALID_ARG }); } else { configCopy.maxBatchSize = config.maxBatchSize;