diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index ada184f8..d0c5f24d 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -1214,6 +1214,7 @@ class Consumer { * @param {function?} config.eachBatch - The function to call for processing each batch of messages - can only be set if eachMessage is not set. * @param {boolean?} config.eachBatchAutoResolve - Whether to automatically resolve offsets for each batch (only applicable if eachBatch is set, true by default). * @param {number?} config.partitionsConsumedConcurrently - The limit to the number of partitions consumed concurrently (1 by default). + * @param {number?} config.maxBatchSize - The maximum number of messages to include in a batch when using eachBatch (32 by default). */ async run(config) { if (this.#state !== ConsumerState.CONNECTED) { @@ -1249,6 +1250,18 @@ class Consumer { configCopy.partitionsConsumedConcurrently = 1; } + if (!Object.hasOwn(config, 'maxBatchSize')) { + configCopy.maxBatchSize = 32; + } else if (typeof config.maxBatchSize !== 'number' || + config.maxBatchSize <= 0 || + !Number.isInteger(config.maxBatchSize)) { + throw new error.KafkaJSError( + 'maxBatchSize must be a positive integer.', + { code: error.ErrorCodes.ERR__INVALID_ARG }); + } else { + configCopy.maxBatchSize = config.maxBatchSize; + } + this.#messageCache = new MessageCache(this.#logger); /* We deliberately don't await this because we want to return from this method immediately. */ this.#runInternal(configCopy); @@ -1559,8 +1572,7 @@ class Consumer { async #runInternal(config) { this.#concurrency = config.partitionsConsumedConcurrently; const perMessageProcessor = config.eachMessage ? this.#messageProcessor : this.#batchProcessor; - /* TODO: make this dynamic, based on max batch size / size of last message seen. */ - const maxBatchSize = 32; + const maxBatchSize = config.maxBatchSize; const fetcher = config.eachMessage ? (savedIdx) => this.#consumeSingleCached(savedIdx) : (savedIdx) => this.#consumeCachedN(savedIdx, maxBatchSize); diff --git a/test/promisified/consumer/consumeMessages.spec.js b/test/promisified/consumer/consumeMessages.spec.js index ca9204c6..b2250441 100644 --- a/test/promisified/consumer/consumeMessages.spec.js +++ b/test/promisified/consumer/consumeMessages.spec.js @@ -907,4 +907,157 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit await waitFor(() => (messagesConsumed === (partition0ProducedMessages + messages1.length + messages2.length)), () => { }, 100); }); + it('uses default maxBatchSize of 32 when not specified', async () => { + await producer.connect(); + await consumer.connect(); + await consumer.subscribe({ topic: topicName }); + + const batchSizes = []; + consumer.run({ + partitionsConsumedConcurrently, + eachBatch: async ({ batch }) => { + batchSizes.push(batch.messages.length); + } + }); + + /* Produce enough messages to see batch size growth pattern */ + const messages = Array(128) + .fill() + .map(() => { + const value = secureRandom(); + return { value: `value-${value}`, partition: 0 }; + }); + + await producer.send({ topic: topicName, messages }); + await waitFor(() => batchSizes.length >= 7, () => { }, 100); + + /* With default batch size of 32, we should see growth pattern: + * 1, 1, 2, 2, 4, 4, 8, 8, 16, 16, 32, 32, ... + * The maximum batch size should be capped at 32 */ + const maxBatchSize = Math.max(...batchSizes); + expect(maxBatchSize).toBeLessThanOrEqual(32); + }); + + it('respects custom maxBatchSize value', async () => { + await producer.connect(); + await consumer.connect(); + await consumer.subscribe({ topic: topicName }); + + const customMaxBatchSize = 10; + const batchSizes = []; + consumer.run({ + partitionsConsumedConcurrently, + maxBatchSize: customMaxBatchSize, + eachBatch: async ({ batch }) => { + batchSizes.push(batch.messages.length); + } + }); + + /* Produce enough messages to exceed the custom batch size */ + const messages = Array(64) + .fill() + .map(() => { + const value = secureRandom(); + return { value: `value-${value}`, partition: 0 }; + }); + + await producer.send({ topic: topicName, messages }); + await waitFor(() => batchSizes.length >= 5, () => { }, 100); + + /* All batches should be at most customMaxBatchSize */ + const maxBatchSize = Math.max(...batchSizes); + expect(maxBatchSize).toBeLessThanOrEqual(customMaxBatchSize); + }); + + it('handles large maxBatchSize correctly', async () => { + await producer.connect(); + await consumer.connect(); + await consumer.subscribe({ topic: topicName }); + + const customMaxBatchSize = 200; + const batchSizes = []; + let messagesConsumed = 0; + consumer.run({ + partitionsConsumedConcurrently, + maxBatchSize: customMaxBatchSize, + eachBatch: async ({ batch }) => { + batchSizes.push(batch.messages.length); + messagesConsumed += batch.messages.length; + } + }); + + const totalMessages = 150; + const messages = Array(totalMessages) + .fill() + .map(() => { + const value = secureRandom(); + return { value: `value-${value}`, partition: 0 }; + }); + + await producer.send({ topic: topicName, messages }); + await waitFor(() => messagesConsumed >= totalMessages, () => { }, 100); + + /* All batches should be at most customMaxBatchSize */ + const maxBatchSize = Math.max(...batchSizes); + expect(maxBatchSize).toBeLessThanOrEqual(customMaxBatchSize); + expect(messagesConsumed).toEqual(totalMessages); + }); + + it('throws error when maxBatchSize is not a positive integer', async () => { + await consumer.connect(); + await consumer.subscribe({ topic: topicName }); + + /* Negative value */ + await expect(consumer.run({ + maxBatchSize: -1, + eachBatch: async () => { } + })).rejects.toThrow('maxBatchSize must be a positive integer'); + + /* Zero */ + await expect(consumer.run({ + maxBatchSize: 0, + eachBatch: async () => { } + })).rejects.toThrow('maxBatchSize must be a positive integer'); + + /* Non-integer */ + await expect(consumer.run({ + maxBatchSize: 3.5, + eachBatch: async () => { } + })).rejects.toThrow('maxBatchSize must be a positive integer'); + + /* Non-number */ + await expect(consumer.run({ + maxBatchSize: 'invalid', + eachBatch: async () => { } + })).rejects.toThrow('maxBatchSize must be a positive integer'); + }); + + it('maxBatchSize only affects eachBatch, not eachMessage', async () => { + await producer.connect(); + await consumer.connect(); + await consumer.subscribe({ topic: topicName }); + + const messagesConsumed = []; + consumer.run({ + partitionsConsumedConcurrently, + maxBatchSize: 5, /* Should be ignored for eachMessage */ + eachMessage: async (event) => { + messagesConsumed.push(event); + } + }); + + const messages = Array(20) + .fill() + .map(() => { + const value = secureRandom(); + return { value: `value-${value}`, partition: 0 }; + }); + + await producer.send({ topic: topicName, messages }); + await waitForMessages(messagesConsumed, { number: messages.length }); + + /* Each message should be processed individually */ + expect(messagesConsumed.length).toEqual(messages.length); + }); + }); diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts index 6045a77e..cab95f6a 100644 --- a/types/kafkajs.d.ts +++ b/types/kafkajs.d.ts @@ -328,6 +328,7 @@ export type ConsumerSubscribeTopics = { topics: (string | RegExp)[]; replace?: b export type ConsumerRunConfig = { eachBatchAutoResolve?: boolean, partitionsConsumedConcurrently?: number, + maxBatchSize?: number, eachMessage?: EachMessageHandler eachBatch?: EachBatchHandler }