Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions lib/kafkajs/_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,7 @@ class Consumer {
* @param {function?} config.eachBatch - The function to call for processing each batch of messages - can only be set if eachMessage is not set.
* @param {boolean?} config.eachBatchAutoResolve - Whether to automatically resolve offsets for each batch (only applicable if eachBatch is set, true by default).
* @param {number?} config.partitionsConsumedConcurrently - The limit to the number of partitions consumed concurrently (1 by default).
* @param {number?} config.maxBatchSize - The maximum number of messages to include in a batch when using eachBatch (32 by default).
*/
async run(config) {
if (this.#state !== ConsumerState.CONNECTED) {
Expand Down Expand Up @@ -1249,6 +1250,18 @@ class Consumer {
configCopy.partitionsConsumedConcurrently = 1;
}

if (!Object.hasOwn(config, 'maxBatchSize')) {
configCopy.maxBatchSize = 32;
} else if (typeof config.maxBatchSize !== 'number' ||
config.maxBatchSize <= 0 ||
!Number.isInteger(config.maxBatchSize)) {
throw new error.KafkaJSError(
'maxBatchSize must be a positive integer.',
{ code: error.ErrorCodes.ERR__INVALID_ARG });
} else {
configCopy.maxBatchSize = config.maxBatchSize;
}

this.#messageCache = new MessageCache(this.#logger);
/* We deliberately don't await this because we want to return from this method immediately. */
this.#runInternal(configCopy);
Expand Down Expand Up @@ -1559,8 +1572,7 @@ class Consumer {
async #runInternal(config) {
this.#concurrency = config.partitionsConsumedConcurrently;
const perMessageProcessor = config.eachMessage ? this.#messageProcessor : this.#batchProcessor;
/* TODO: make this dynamic, based on max batch size / size of last message seen. */
const maxBatchSize = 32;
const maxBatchSize = config.maxBatchSize;
const fetcher = config.eachMessage
? (savedIdx) => this.#consumeSingleCached(savedIdx)
: (savedIdx) => this.#consumeCachedN(savedIdx, maxBatchSize);
Expand Down
153 changes: 153 additions & 0 deletions test/promisified/consumer/consumeMessages.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -907,4 +907,157 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
await waitFor(() => (messagesConsumed === (partition0ProducedMessages + messages1.length + messages2.length)), () => { }, 100);
});

it('uses default maxBatchSize of 32 when not specified', async () => {
await producer.connect();
await consumer.connect();
await consumer.subscribe({ topic: topicName });

const batchSizes = [];
consumer.run({
partitionsConsumedConcurrently,
eachBatch: async ({ batch }) => {
batchSizes.push(batch.messages.length);
}
});

/* Produce enough messages to see batch size growth pattern */
const messages = Array(128)
.fill()
.map(() => {
const value = secureRandom();
return { value: `value-${value}`, partition: 0 };
});

await producer.send({ topic: topicName, messages });
await waitFor(() => batchSizes.length >= 7, () => { }, 100);

/* With default batch size of 32, we should see growth pattern:
* 1, 1, 2, 2, 4, 4, 8, 8, 16, 16, 32, 32, ...
* The maximum batch size should be capped at 32 */
const maxBatchSize = Math.max(...batchSizes);
expect(maxBatchSize).toBeLessThanOrEqual(32);
});

it('respects custom maxBatchSize value', async () => {
await producer.connect();
await consumer.connect();
await consumer.subscribe({ topic: topicName });

const customMaxBatchSize = 10;
const batchSizes = [];
consumer.run({
partitionsConsumedConcurrently,
maxBatchSize: customMaxBatchSize,
eachBatch: async ({ batch }) => {
batchSizes.push(batch.messages.length);
}
});

/* Produce enough messages to exceed the custom batch size */
const messages = Array(64)
.fill()
.map(() => {
const value = secureRandom();
return { value: `value-${value}`, partition: 0 };
});

await producer.send({ topic: topicName, messages });
await waitFor(() => batchSizes.length >= 5, () => { }, 100);

/* All batches should be at most customMaxBatchSize */
const maxBatchSize = Math.max(...batchSizes);
expect(maxBatchSize).toBeLessThanOrEqual(customMaxBatchSize);
});

it('handles large maxBatchSize correctly', async () => {
await producer.connect();
await consumer.connect();
await consumer.subscribe({ topic: topicName });

const customMaxBatchSize = 200;
const batchSizes = [];
let messagesConsumed = 0;
consumer.run({
partitionsConsumedConcurrently,
maxBatchSize: customMaxBatchSize,
eachBatch: async ({ batch }) => {
batchSizes.push(batch.messages.length);
messagesConsumed += batch.messages.length;
}
});

const totalMessages = 150;
const messages = Array(totalMessages)
.fill()
.map(() => {
const value = secureRandom();
return { value: `value-${value}`, partition: 0 };
});

await producer.send({ topic: topicName, messages });
await waitFor(() => messagesConsumed >= totalMessages, () => { }, 100);

/* All batches should be at most customMaxBatchSize */
const maxBatchSize = Math.max(...batchSizes);
expect(maxBatchSize).toBeLessThanOrEqual(customMaxBatchSize);
expect(messagesConsumed).toEqual(totalMessages);
});

it('throws error when maxBatchSize is not a positive integer', async () => {
await consumer.connect();
await consumer.subscribe({ topic: topicName });

/* Negative value */
await expect(consumer.run({
maxBatchSize: -1,
eachBatch: async () => { }
})).rejects.toThrow('maxBatchSize must be a positive integer');

/* Zero */
await expect(consumer.run({
maxBatchSize: 0,
eachBatch: async () => { }
})).rejects.toThrow('maxBatchSize must be a positive integer');

/* Non-integer */
await expect(consumer.run({
maxBatchSize: 3.5,
eachBatch: async () => { }
})).rejects.toThrow('maxBatchSize must be a positive integer');

/* Non-number */
await expect(consumer.run({
maxBatchSize: 'invalid',
eachBatch: async () => { }
})).rejects.toThrow('maxBatchSize must be a positive integer');
});

it('maxBatchSize only affects eachBatch, not eachMessage', async () => {
await producer.connect();
await consumer.connect();
await consumer.subscribe({ topic: topicName });

const messagesConsumed = [];
consumer.run({
partitionsConsumedConcurrently,
maxBatchSize: 5, /* Should be ignored for eachMessage */
eachMessage: async (event) => {
messagesConsumed.push(event);
}
});

const messages = Array(20)
.fill()
.map(() => {
const value = secureRandom();
return { value: `value-${value}`, partition: 0 };
});

await producer.send({ topic: topicName, messages });
await waitForMessages(messagesConsumed, { number: messages.length });

/* Each message should be processed individually */
expect(messagesConsumed.length).toEqual(messages.length);
});

});
1 change: 1 addition & 0 deletions types/kafkajs.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ export type ConsumerSubscribeTopics = { topics: (string | RegExp)[]; replace?: b
export type ConsumerRunConfig = {
eachBatchAutoResolve?: boolean,
partitionsConsumedConcurrently?: number,
maxBatchSize?: number,
eachMessage?: EachMessageHandler
eachBatch?: EachBatchHandler
}
Expand Down