diff --git a/CHANGELOG.md b/CHANGELOG.md index b5823789..3e031d8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# confluent-kafka-javascript 1.6.1 + +v1.6.1 is a maintenance release. It is supported for all usage. + +### Enhancements + +1. Configurable batch size through the `js.consumer.max.batch.size` property (#389). + + # confluent-kafka-javascript 1.6.0 v1.6.0 is a feature release. It is supported for all usage. diff --git a/MIGRATION.md b/MIGRATION.md index 27ae6438..26ad9efd 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -303,9 +303,9 @@ producerRun().then(consumerRun).catch(console.error); - The `heartbeat()` no longer needs to be called by the user in the `eachMessage/eachBatch` callback. Heartbeats are automatically managed by librdkafka. - The `partitionsConsumedConcurrently` is supported by both `eachMessage` and `eachBatch`. - - An API compatible version of `eachBatch` is available, but the batch size calculation is not - as per configured parameters, rather, a constant maximum size is configured internally. This is subject - to change. + - An API compatible version of `eachBatch` is available, maximum batch size + can be configured through the `js.consumer.max.batch.size` configuration property + and defaults to 32. The property `eachBatchAutoResolve` is supported. Within the `eachBatch` callback, use of `uncommittedOffsets` is unsupported, and within the returned batch, `offsetLag` and `offsetLagLow` are supported. diff --git a/ci/tests/run_perf_test.js b/ci/tests/run_perf_test.js index b8e9e808..83c71b7f 100644 --- a/ci/tests/run_perf_test.js +++ b/ci/tests/run_perf_test.js @@ -42,8 +42,20 @@ function belowTarget(value, target) { async function main() { // Run performance tests and store outputs in memory const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 50000; - const skipCtpTest = process.env.SKIP_CTP_TEST ? process.env.SKIP_CTP_TEST === 'true' : false; + let skipCtpTest = process.env.SKIP_CTP_TEST ? process.env.SKIP_CTP_TEST === 'true' : false; const concurrentRun = process.env.CONCURRENT_RUN ? process.env.CONCURRENT_RUN === 'true' : false; + if (concurrentRun) { + skipCtpTest = true; + } + if (!process.env.CONSUMER_MAX_BATCH_SIZE) { + process.env.CONSUMER_MAX_BATCH_SIZE = '-1'; + } + if (!process.env.PARTITIONS_CONSUMED_CONCURRENTLY) { + process.env.PARTITIONS_CONSUMED_CONCURRENTLY = '2'; + } + if (!process.env.COMPRESSION) { + process.env.COMPRESSION = 'GZIP'; + } const consumerMode = process.env.CONSUMER_MODE || 'all'; const produceToSecondTopic = process.env.PRODUCE_TO_SECOND_TOPIC ? process.env.PRODUCE_TO_SECOND_TOPIC === 'true' : false; const produceToSecondTopicParam = produceToSecondTopic ? '--produce-to-second-topic' : ''; @@ -119,11 +131,11 @@ async function main() { console.log('Running Confluent CTP test...'); const outputConfluentCtp = skipCtpTest ? '' : - runCommand('MODE=confluent MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp'); + (await runCommand('MODE=confluent MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp')); console.log('Running KafkaJS CTP test...'); const outputKjsCtp = skipCtpTest ? '' : - runCommand('MODE=kafkajs MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp'); + (await runCommand('MODE=kafkajs MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp')); // Extract Confluent results let ctpConfluent, ctpKjs; @@ -232,6 +244,7 @@ async function main() { consumerKjsMessageMaxLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachMessage):'); consumerKjsMessageAvgLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachMessage):'); consumerKjsMessageMaxLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachMessage):'); + consumerKjsTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachMessage):'); consumerKjsMessageAverageRSS = extractValue(outputKjsProducerConsumer, '=== Average consumer-each-message RSS KB:'); consumerKjsMessageMaxRSS = extractValue(outputKjsProducerConsumer, '=== Max consumer-each-message RSS KB:'); consumerKjsMessageAverageBrokerLag = extractValue(outputKjsProducerConsumer, `=== Average broker lag (${groupIdEachMessageKafkaJS}):`); @@ -239,7 +252,6 @@ async function main() { consumerKjsMessageTotalLagMeasurements = extractValue(outputKjsProducerConsumer, `=== Sample size for broker lag measurement (${groupIdEachMessageKafkaJS}):`); } if (consumerModeAll || consumerModeEachBatch) { - consumerKjsTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachMessage):'); consumerKjsBatch = extractValue(outputKjsProducerConsumer, '=== Consumer Rate MB/s (eachBatch):'); consumerKjsBatchRate = extractValue(outputKjsProducerConsumer, '=== Consumer Rate msg/s (eachBatch):'); consumerKjsBatchAvgLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachBatch):'); @@ -269,37 +281,41 @@ async function main() { if (consumerModeAll || consumerModeEachMessage) { console.log(`Consumer rates MB/s (eachMessage): confluent ${consumerConfluentMessage}, kafkajs ${consumerKjsMessage}`); console.log(`Consumer rates msg/s (eachMessage): confluent ${consumerConfluentMessageRate}, kafkajs ${consumerKjsMessageRate}`); - console.log(`Consumer average E2E latency T0-T1 (eachMessage): confluent ${consumerConfluentMessageAvgLatencyT0T1}, kafkajs ${consumerKjsMessageAvgLatencyT0T1}`); - console.log(`Consumer max E2E latency T0-T1 (eachMessage): confluent ${consumerConfluentMessageMaxLatencyT0T1}, kafkajs ${consumerKjsMessageMaxLatencyT0T1}`); - if (produceToSecondTopic) { - console.log(`Consumer average E2E latency T0-T2 (eachMessage): confluent ${consumerConfluentMessageAvgLatencyT0T2}, kafkajs ${consumerKjsMessageAvgLatencyT0T2}`); - console.log(`Consumer max E2E latency T0-T2 (eachMessage): confluent ${consumerConfluentMessageMaxLatencyT0T2}, kafkajs ${consumerKjsMessageMaxLatencyT0T2}`); - } console.log(`Consumption time (eachMessage): confluent ${consumerConfluentTime}, kafkajs ${consumerKjsTime}`); console.log(`Average RSS (eachMessage): confluent ${consumerConfluentMessageAverageRSS}, kafkajs ${consumerKjsMessageAverageRSS}`); console.log(`Max RSS (eachMessage): confluent ${consumerConfluentMessageMaxRSS}, kafkajs ${consumerKjsMessageMaxRSS}`); - console.log(`Average broker lag (eachMessage): confluent ${consumerConfluentMessageAverageBrokerLag}, kafkajs ${consumerKjsMessageAverageBrokerLag}`); - console.log(`Max broker lag (eachMessage): confluent ${consumerConfluentMessageMaxBrokerLag}, kafkajs ${consumerKjsMessageMaxBrokerLag}`); - console.log(`Sample size for broker lag measurement (eachMessage): confluent ${consumerConfluentMessageTotalLagMeasurements}, kafkajs ${consumerKjsMessageTotalLagMeasurements}`); + if (concurrentRun) { + console.log(`Consumer average E2E latency T0-T1 (eachMessage): confluent ${consumerConfluentMessageAvgLatencyT0T1}, kafkajs ${consumerKjsMessageAvgLatencyT0T1}`); + console.log(`Consumer max E2E latency T0-T1 (eachMessage): confluent ${consumerConfluentMessageMaxLatencyT0T1}, kafkajs ${consumerKjsMessageMaxLatencyT0T1}`); + if (produceToSecondTopic) { + console.log(`Consumer average E2E latency T0-T2 (eachMessage): confluent ${consumerConfluentMessageAvgLatencyT0T2}, kafkajs ${consumerKjsMessageAvgLatencyT0T2}`); + console.log(`Consumer max E2E latency T0-T2 (eachMessage): confluent ${consumerConfluentMessageMaxLatencyT0T2}, kafkajs ${consumerKjsMessageMaxLatencyT0T2}`); + } + console.log(`Average broker lag (eachMessage): confluent ${consumerConfluentMessageAverageBrokerLag}, kafkajs ${consumerKjsMessageAverageBrokerLag}`); + console.log(`Max broker lag (eachMessage): confluent ${consumerConfluentMessageMaxBrokerLag}, kafkajs ${consumerKjsMessageMaxBrokerLag}`); + console.log(`Sample size for broker lag measurement (eachMessage): confluent ${consumerConfluentMessageTotalLagMeasurements}, kafkajs ${consumerKjsMessageTotalLagMeasurements}`); + } } if (consumerModeAll || consumerModeEachBatch) { console.log(`Consumer rates MB/s (eachBatch): confluent ${consumerConfluentBatch}, kafkajs ${consumerKjsBatch}`); console.log(`Consumer rates msg/s (eachBatch): confluent ${consumerConfluentBatchRate}, kafkajs ${consumerKjsBatchRate}`); - console.log(`Consumer average E2E latency T0-T1 (eachBatch): confluent ${consumerConfluentBatchAvgLatencyT0T1}, kafkajs ${consumerKjsBatchAvgLatencyT0T1}`); - console.log(`Consumer max E2E latency T0-T1 (eachBatch): confluent ${consumerConfluentBatchMaxLatencyT0T1}, kafkajs ${consumerKjsBatchMaxLatencyT0T1}`); - if (produceToSecondTopic) { - console.log(`Consumer average E2E latency T0-T2 (eachBatch): confluent ${consumerConfluentBatchAvgLatencyT0T2}, kafkajs ${consumerKjsBatchAvgLatencyT0T2}`); - console.log(`Consumer max E2E latency T0-T2 (eachBatch): confluent ${consumerConfluentBatchMaxLatencyT0T2}, kafkajs ${consumerKjsBatchMaxLatencyT0T2}`); - } console.log(`Consumption time (eachBatch): confluent ${consumerConfluentBatchTime}, kafkajs ${consumerKjsBatchTime}`); - console.log(`Average eachBatch lag: confluent ${consumerConfluentBatchAverageLag}, kafkajs ${consumerKjsBatchAverageLag}`); - console.log(`Max eachBatch lag: confluent ${consumerConfluentBatchMaxLag}, kafkajs ${consumerKjsBatchMaxLag}`); console.log(`Average eachBatch size: confluent ${consumerConfluentBatchAverageSize}, kafkajs ${consumerKjsBatchAverageSize}`); console.log(`Average RSS (eachBatch): confluent ${consumerConfluentBatchAverageRSS}, kafkajs ${consumerKjsBatchAverageRSS}`); console.log(`Max RSS (eachBatch): confluent ${consumerConfluentBatchMaxRSS}, kafkajs ${consumerKjsBatchMaxRSS}`); - console.log(`Average broker lag (eachBatch): confluent ${consumerConfluentBatchAverageBrokerLag}, kafkajs ${consumerKjsBatchAverageBrokerLag}`); - console.log(`Max broker lag (eachBatch): confluent ${consumerConfluentBatchMaxBrokerLag}, kafkajs ${consumerKjsBatchMaxBrokerLag}`); - console.log(`Sample size for broker lag measurement (eachBatch): confluent ${consumerConfluentBatchTotalLagMeasurements}, kafkajs ${consumerKjsBatchTotalLagMeasurements}`); + if (concurrentRun) { + console.log(`Consumer average E2E latency T0-T1 (eachBatch): confluent ${consumerConfluentBatchAvgLatencyT0T1}, kafkajs ${consumerKjsBatchAvgLatencyT0T1}`); + console.log(`Consumer max E2E latency T0-T1 (eachBatch): confluent ${consumerConfluentBatchMaxLatencyT0T1}, kafkajs ${consumerKjsBatchMaxLatencyT0T1}`); + if (produceToSecondTopic) { + console.log(`Consumer average E2E latency T0-T2 (eachBatch): confluent ${consumerConfluentBatchAvgLatencyT0T2}, kafkajs ${consumerKjsBatchAvgLatencyT0T2}`); + console.log(`Consumer max E2E latency T0-T2 (eachBatch): confluent ${consumerConfluentBatchMaxLatencyT0T2}, kafkajs ${consumerKjsBatchMaxLatencyT0T2}`); + } + console.log(`Average eachBatch lag: confluent ${consumerConfluentBatchAverageLag}, kafkajs ${consumerKjsBatchAverageLag}`); + console.log(`Max eachBatch lag: confluent ${consumerConfluentBatchMaxLag}, kafkajs ${consumerKjsBatchMaxLag}`); + console.log(`Average broker lag (eachBatch): confluent ${consumerConfluentBatchAverageBrokerLag}, kafkajs ${consumerKjsBatchAverageBrokerLag}`); + console.log(`Max broker lag (eachBatch): confluent ${consumerConfluentBatchMaxBrokerLag}, kafkajs ${consumerKjsBatchMaxBrokerLag}`); + console.log(`Sample size for broker lag measurement (eachBatch): confluent ${consumerConfluentBatchTotalLagMeasurements}, kafkajs ${consumerKjsBatchTotalLagMeasurements}`); + } } if (!concurrentRun) { console.log(`Average RSS: confluent ${consumerConfluentAverageRSS}, kafkajs ${consumerKjsAverageRSS}`); diff --git a/ci/update-version.js b/ci/update-version.js index 531ed2ef..796261f9 100644 --- a/ci/update-version.js +++ b/ci/update-version.js @@ -89,7 +89,7 @@ function getPackageVersion(tag, branch) { // publish with a -devel suffix for EA and RC releases. if (tag.prerelease.length > 0) { - baseVersion += '-' + tag.prerelease.join('-'); + baseVersion += '-' + tag.prerelease.join('.'); } console.log(`Package version is "${baseVersion}"`); diff --git a/examples/performance/performance-consolidated.js b/examples/performance/performance-consolidated.js index 4684a709..864c2824 100644 --- a/examples/performance/performance-consolidated.js +++ b/examples/performance/performance-consolidated.js @@ -20,8 +20,8 @@ const saslPassword = process.env.SASL_PASSWORD; const topic = process.env.KAFKA_TOPIC || 'test-topic'; const topic2 = process.env.KAFKA_TOPIC2 || 'test-topic2'; const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 1000000; -const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 256; -const batchSize = process.env.BATCH_SIZE ? +process.env.BATCH_SIZE : 100; +const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 4096; +const batchSize = process.env.PRODUCER_BATCH_SIZE ? +process.env.PRODUCER_BATCH_SIZE : 100; const compression = process.env.COMPRESSION || 'None'; // Between 0 and 1, percentage of random bytes in each message const randomness = process.env.RANDOMNESS ? +process.env.RANDOMNESS : 0.5; @@ -207,7 +207,9 @@ function logParameters(parameters) { console.log(` ProduceTopic: ${topic2}`); console.log(` Message Count: ${messageCount}`); // Seed the topic with messages - await runProducer(parameters, topic, batchSize, warmupMessages, messageCount, messageSize, compression); + await runProducerCKJS(parameters, topic, batchSize, + warmupMessages, messageCount, messageSize, compression, + randomness, limitRPS); startTrackingMemory(); const ctpRate = await runConsumeTransformProduce(parameters, topic, topic2, warmupMessages, messageCount, messageProcessTimeMs, ctpConcurrency); endTrackingMemory('consume-transform-produce', `consume-transform-produce-${mode}.json`); diff --git a/examples/performance/performance-primitives-common.js b/examples/performance/performance-primitives-common.js index 1103a1fa..ff1f0027 100644 --- a/examples/performance/performance-primitives-common.js +++ b/examples/performance/performance-primitives-common.js @@ -14,7 +14,7 @@ else { } } -function installHandlers() { +function installHandlers(useTerminateTimeout) { const handlers = { terminationRequested: false, terminateTimeout: null, @@ -26,8 +26,10 @@ function installHandlers() { process.on('SIGINT', terminationRequestedCallback); process.on('SIGTERM', terminationRequestedCallback); handlers.terminationRequestedCallback = terminationRequestedCallback; - handlers.terminateTimeout = setTimeout(terminationRequestedCallback, - TERMINATE_TIMEOUT_MS); + if (useTerminateTimeout) { + handlers.terminateTimeout = setTimeout(terminationRequestedCallback, + TERMINATE_TIMEOUT_MS); + } return handlers; } @@ -57,7 +59,7 @@ function genericProduceToTopic(producer, topic, messages) { } async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats, actionOnMessages) { - const handlers = installHandlers(); + const handlers = installHandlers(totalMessageCnt === -1); await consumer.connect(); await consumer.subscribe({ topic }); @@ -179,18 +181,13 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac if (!startTime) { startTime = hrtime.bigint(); } else if (totalMessageCnt > 0 && messagesMeasured >= totalMessageCnt) { - let durationNanos = Number(hrtime.bigint() - startTime); - durationSeconds = durationNanos / 1e9; - rate = durationNanos === 0 ? Infinity : - (totalMessageSize / durationNanos) * 1e9 / (1024 * 1024); /* MB/s */ - console.log(`Recvd ${messagesMeasured} messages in ${durationSeconds} seconds, ${totalMessageSize} bytes; rate is ${rate} MB/s`); - consumer.pause([{ topic }]); + stopConsuming(); } } if (actionOnMessages) { await actionOnMessages(batch.messages); - if (messagesMeasured > 0) { + if (messagesMeasured > 0 && messages.length > 0) { let i = 1; const now = Date.now(); for (const message of messages) { @@ -243,7 +240,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac } async function runProducer(producer, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression, randomness, limitRPS) { - const handlers = installHandlers(); + const handlers = installHandlers(totalMessageCnt === -1); let totalMessagesSent = 0; let totalBytesSent = 0; @@ -347,7 +344,7 @@ async function runProducer(producer, topic, batchSize, warmupMessages, totalMess } async function runLagMonitoring(admin, topic) { - const handlers = installHandlers(); + const handlers = installHandlers(true); let groupId = process.env.GROUPID_MONITOR; if (!groupId) { throw new Error("GROUPID_MONITOR environment variable not set"); diff --git a/examples/performance/performance-primitives.js b/examples/performance/performance-primitives.js index 84d56383..6fdd0104 100644 --- a/examples/performance/performance-primitives.js +++ b/examples/performance/performance-primitives.js @@ -19,6 +19,9 @@ module.exports = { newCompatibleProducer, }; + +const CONSUMER_MAX_BATCH_SIZE = process.env.CONSUMER_MAX_BATCH_SIZE ? +process.env.CONSUMER_MAX_BATCH_SIZE : null; + function baseConfiguration(parameters) { let ret = { 'client.id': 'kafka-test-performance', @@ -146,6 +149,10 @@ function newCompatibleConsumer(parameters, eachBatch) { const autoCommitOpts = autoCommit > 0 ? { 'enable.auto.commit': true, 'auto.commit.interval.ms': autoCommit } : { 'enable.auto.commit': false }; + const jsOpts = {}; + if (eachBatch && CONSUMER_MAX_BATCH_SIZE !== null) { + jsOpts['js.consumer.max.batch.size'] = CONSUMER_MAX_BATCH_SIZE; + } let groupId = eachBatch ? process.env.GROUPID_BATCH : process.env.GROUPID_MESSAGE; if (!groupId) { @@ -157,6 +164,7 @@ function newCompatibleConsumer(parameters, eachBatch) { 'auto.offset.reset': 'earliest', 'fetch.queue.backoff.ms': '100', ...autoCommitOpts, + ...jsOpts, }); return new CompatibleConsumer(consumer); } diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index ada184f8..6a1d8987 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -161,11 +161,6 @@ class Consumer { */ #messageCacheMaxSize = 1; - /** - * Number of times we tried to increase the cache. - */ - #increaseCount = 0; - /** * Whether the user has enabled manual offset management (commits). */ @@ -182,6 +177,12 @@ class Consumer { */ #partitionCount = 0; + /** + * Maximum batch size passed in eachBatch calls. + */ + #maxBatchSize = 32; + #maxBatchesSize = 32; + /** * Whether worker termination has been scheduled. */ @@ -197,6 +198,9 @@ class Consumer { */ #concurrency = 1; + + #runConfig = null; + /** * Promise that resolves together with last in progress fetch. * It's set to null when no fetch is in progress. @@ -234,7 +238,15 @@ class Consumer { /** * Last fetch real time clock in nanoseconds. */ - #lastFetchClockNs = 0; + #lastFetchClockNs = 0n; + /** + * Last number of messages fetched. + */ + #lastFetchedMessageCnt = 0n; + /** + * Last fetch concurrency used. + */ + #lastFetchedConcurrency = 0n; /** * List of pending operations to be executed after @@ -311,8 +323,6 @@ class Consumer { * consumed messages upto N from the internalClient, but the user has stale'd the cache * after consuming just k (< N) messages. We seek back to last consumed offset + 1. */ this.#messageCache.clear(); - this.#messageCacheMaxSize = 1; - this.#increaseCount = 0; const clearPartitions = this.assignment(); const seeks = []; for (const topicPartition of clearPartitions) { @@ -691,6 +701,21 @@ class Consumer { this.#cacheExpirationTimeoutMs = this.#maxPollIntervalMs; rdKafkaConfig['max.poll.interval.ms'] = this.#maxPollIntervalMs * 2; + if (rdKafkaConfig['js.consumer.max.batch.size'] !== undefined) { + const maxBatchSize = +rdKafkaConfig['js.consumer.max.batch.size']; + if (!Number.isInteger(maxBatchSize) || (maxBatchSize <= 0 && maxBatchSize !== -1)) { + throw new error.KafkaJSError( + "'js.consumer.max.batch.size' must be a positive integer or -1 for unlimited batch size.", + { code: error.ErrorCodes.ERR__INVALID_ARG }); + } + this.#maxBatchSize = maxBatchSize; + this.#maxBatchesSize = maxBatchSize; + if (maxBatchSize === -1) { + this.#messageCacheMaxSize = Number.MAX_SAFE_INTEGER; + } + delete rdKafkaConfig['js.consumer.max.batch.size']; + } + return rdKafkaConfig; } @@ -844,33 +869,6 @@ class Consumer { await this.commitOffsets(); } - /** - * Request a size increase. - * It increases the size by 2x, but only if the size is less than 1024, - * only if the size has been requested to be increased twice in a row. - * @private - */ - #increaseMaxSize() { - if (this.#messageCacheMaxSize === 1024) - return; - this.#increaseCount++; - if (this.#increaseCount <= 1) - return; - this.#messageCacheMaxSize = Math.min(this.#messageCacheMaxSize << 1, 1024); - this.#increaseCount = 0; - } - - /** - * Request a size decrease. - * It decreases the size to 80% of the last received size, with a minimum of 1. - * @param {number} recvdSize - the number of messages received in the last poll. - * @private - */ - #decreaseMaxSize(recvdSize) { - this.#messageCacheMaxSize = Math.max(Math.floor((recvdSize * 8) / 10), 1); - this.#increaseCount = 0; - } - /** * Converts a list of messages returned by node-rdkafka into a message that can be used by the eachBatch callback. * @param {import("../..").Message[]} messages - must not be empty. Must contain messages from the same topic and partition. @@ -957,6 +955,47 @@ class Consumer { return returnPayload; } + #updateMaxMessageCacheSize() { + if (this.#maxBatchSize === -1) { + // In case of unbounded max batch size it returns all available messages + // for a partition in each batch. Cache is unbounded as well as + // it takes only one call to process each partition. + return; + } + + const nowNs = hrtime.bigint(); + if (this.#lastFetchedMessageCnt > 0 && this.#lastFetchClockNs > 0n && + nowNs > this.#lastFetchClockNs) { + const consumptionDurationSeconds = Number(nowNs - this.#lastFetchClockNs) / 1e9; + const messagesPerSecondSingleWorker = this.#lastFetchedMessageCnt / this.#lastFetchedConcurrency / consumptionDurationSeconds; + // Keep enough messages in the cache for 1.5 seconds of concurrent consumption. + this.#messageCacheMaxSize = Math.round(1.5 * messagesPerSecondSingleWorker) * this.#concurrency; + const minCacheSize = this.#runConfig.eachBatch ? this.#maxBatchesSize : this.#concurrency; + if (this.#messageCacheMaxSize < minCacheSize) + // Keep at least one batch or one message per worker. + // It's possible less workers than requested were active in previous run. + this.#messageCacheMaxSize = minCacheSize; + else if (this.#messageCacheMaxSize > minCacheSize * 10) + // Keep at most 10 messages or batches per requested worker. + this.#messageCacheMaxSize = minCacheSize * 10; + } + } + + #saveFetchStats(messages) { + this.#lastFetchClockNs = hrtime.bigint(); + const partitionsNum = new Map(); + for (const msg of messages) { + const key = partitionKey(msg); + partitionsNum.set(key, 1); + if (partitionsNum.size >= this.#concurrency) { + break; + } + } + this.#lastFetchedConcurrency = partitionsNum.size; + this.#lastFetchedMessageCnt = messages.length; + } + + async #fetchAndResolveWith(takeFromCache, size) { if (this.#fetchInProgress) { await this.#fetchInProgress; @@ -983,6 +1022,8 @@ class Consumer { const fetchResult = new DeferredPromise(); this.#logger.debug(`Attempting to fetch ${size} messages to the message cache`, this.#createConsumerBindingMessageMetadata()); + + this.#updateMaxMessageCacheSize(); this.#internalClient.consume(size, (err, messages) => fetchResult.resolve([err, messages])); @@ -999,13 +1040,8 @@ class Consumer { this.#messageCache.addMessages(messages); const res = takeFromCache(); - this.#lastFetchClockNs = hrtime.bigint(); + this.#saveFetchStats(messages); this.#maxPollIntervalRestart.resolve(); - if (messages.length === this.#messageCacheMaxSize) { - this.#increaseMaxSize(); - } else { - this.#decreaseMaxSize(messages.length); - } return res; } finally { this.#fetchInProgress.resolve(); @@ -1557,13 +1593,13 @@ class Consumer { * @private */ async #runInternal(config) { + this.#runConfig = config; this.#concurrency = config.partitionsConsumedConcurrently; + this.#maxBatchesSize = this.#maxBatchSize * this.#concurrency; const perMessageProcessor = config.eachMessage ? this.#messageProcessor : this.#batchProcessor; - /* TODO: make this dynamic, based on max batch size / size of last message seen. */ - const maxBatchSize = 32; const fetcher = config.eachMessage ? (savedIdx) => this.#consumeSingleCached(savedIdx) - : (savedIdx) => this.#consumeCachedN(savedIdx, maxBatchSize); + : (savedIdx) => this.#consumeCachedN(savedIdx, this.#maxBatchSize); this.#workers = []; await this.#lock.write(async () => { @@ -1574,6 +1610,8 @@ class Consumer { this.#workerTerminationScheduled = new DeferredPromise(); this.#lastFetchClockNs = hrtime.bigint(); + this.#lastFetchedMessageCnt = 0; + this.#lastFetchedConcurrency = 0; if (this.#pendingOperations.length === 0) { const workersToSpawn = Math.max(1, Math.min(this.#concurrency, this.#partitionCount)); const cacheExpirationLoop = this.#cacheExpirationLoop(); diff --git a/lib/util.js b/lib/util.js index ae539b46..389c3411 100644 --- a/lib/util.js +++ b/lib/util.js @@ -52,4 +52,4 @@ util.dictToStringList = function (mapOrObject) { return list; }; -util.bindingVersion = '1.6.0'; +util.bindingVersion = '1.6.1-alpha.1'; diff --git a/package-lock.json b/package-lock.json index cdf72ab0..8c7bcf24 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@confluentinc/kafka-javascript", - "version": "1.6.0", + "version": "1.6.1-alpha.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@confluentinc/kafka-javascript", - "version": "1.6.0", + "version": "1.6.1-alpha.1", "hasInstallScript": true, "license": "MIT", "workspaces": [ @@ -9969,7 +9969,7 @@ }, "schemaregistry": { "name": "@confluentinc/schemaregistry", - "version": "1.6.0", + "version": "1.6.1-alpha.1", "license": "MIT", "dependencies": { "@aws-sdk/client-kms": "^3.637.0", diff --git a/package.json b/package.json index 4d36d2da..0e0e27c3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@confluentinc/kafka-javascript", - "version": "1.6.0", + "version": "1.6.1-alpha.1", "description": "Node.js bindings for librdkafka", "librdkafka": "2.12.0", "librdkafka_win": "2.12.0", diff --git a/schemaregistry/package.json b/schemaregistry/package.json index 5f9bd0f9..95cd317e 100644 --- a/schemaregistry/package.json +++ b/schemaregistry/package.json @@ -1,6 +1,6 @@ { "name": "@confluentinc/schemaregistry", - "version": "1.6.0", + "version": "1.6.1-alpha.1", "description": "Node.js client for Confluent Schema Registry", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/test/promisified/admin/fetch_offsets.spec.js b/test/promisified/admin/fetch_offsets.spec.js index 08e5a81a..c6412530 100644 --- a/test/promisified/admin/fetch_offsets.spec.js +++ b/test/promisified/admin/fetch_offsets.spec.js @@ -132,8 +132,7 @@ describe("fetchOffset function", () => { await consumer.run({ eachMessage: async ({ topic, partition, message }) => { - messagesConsumed.push(message); // Populate messagesConsumed - if (messagesConsumed.length === 5) { + if (messagesConsumed.length === 4) { await consumer.commitOffsets([ { topic, @@ -142,6 +141,7 @@ describe("fetchOffset function", () => { }, ]); } + messagesConsumed.push(message); // Populate messagesConsumed }, }); diff --git a/test/promisified/consumer/consumeMessages.spec.js b/test/promisified/consumer/consumeMessages.spec.js index ca9204c6..5c0df12d 100644 --- a/test/promisified/consumer/consumeMessages.spec.js +++ b/test/promisified/consumer/consumeMessages.spec.js @@ -412,6 +412,11 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit partitions: partitions, }); + // If you have a large consume time and consuming one message at a time, + // you need to have very small batch sizes to keep the concurrency up. + // It's to avoid having a too large cache and postponing the next fetch + // and so the rebalance too much. + const producer = createProducer({}, {'batch.num.messages': '1'}); await producer.connect(); await consumer.connect(); await consumer.subscribe({ topic: topicName }); @@ -448,6 +453,7 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit await producer.send({ topic: topicName, messages }); await maxConcurrentWorkersReached; expect(inProgressMaxValue).toBe(expectedMaxConcurrentWorkers); + await producer.disconnect(); }); it('consume GZIP messages', async () => { @@ -612,6 +618,7 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit let assigns = 0; let revokes = 0; let lost = 0; + let firstBatchProcessing; consumer = createConsumer({ groupId, maxWaitTimeInMs: 100, @@ -642,9 +649,6 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit let errors = false; let receivedMessages = 0; - const batchLengths = [1, 1, 2, - /* cache reset */ - 1, 1]; consumer.run({ partitionsConsumedConcurrently, eachBatchAutoResolve: true, @@ -652,17 +656,14 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit receivedMessages++; try { - expect(event.batch.messages.length) - .toEqual(batchLengths[receivedMessages - 1]); - - if (receivedMessages === 3) { - expect(event.isStale()).toEqual(false); - await sleep(7500); - /* 7.5s 'processing' - * doesn't exceed max poll interval. - * Cache reset is transparent */ - expect(event.isStale()).toEqual(false); - } + expect(event.isStale()).toEqual(false); + await sleep(7500); + /* 7.5s 'processing' + * doesn't exceed max poll interval. + * Cache reset is transparent */ + expect(event.isStale()).toEqual(false); + if (firstBatchProcessing === undefined) + firstBatchProcessing = receivedMessages; } catch (e) { console.error(e); errors = true; @@ -686,6 +687,8 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit /* Triggers revocation */ await consumer.disconnect(); + expect(firstBatchProcessing).toBeDefined(); + expect(receivedMessages).toBeGreaterThan(firstBatchProcessing); /* First assignment */ expect(assigns).toEqual(1); /* Revocation on disconnect */ @@ -732,13 +735,7 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit let errors = false; let receivedMessages = 0; - const batchLengths = [/* first we reach batches of 32 message and fetches of 64 - * max poll interval exceeded happens on second - * 32 messages batch of the 64 msg fetch. */ - 1, 1, 2, 2, 4, 4, 8, 8, 16, 16, 32, 32, 32, 32, - /* max poll interval exceeded, 32 reprocessed + - * 1 new message. */ - 1, 1, 2, 2, 4, 4, 8, 8, 3]; + let firstLongBatchProcessing; consumer.run({ partitionsConsumedConcurrently, eachBatchAutoResolve: true, @@ -746,17 +743,15 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit receivedMessages++; try { - expect(event.batch.messages.length) - .toEqual(batchLengths[receivedMessages - 1]); - - if (receivedMessages === 13) { + if (!firstLongBatchProcessing && event.batch.messages.length >= 32) { expect(event.isStale()).toEqual(false); await sleep(6000); /* 6s 'processing' * cache clearance starts at 7000 */ expect(event.isStale()).toEqual(false); + firstLongBatchProcessing = receivedMessages; } - if ( receivedMessages === 14) { + if (firstLongBatchProcessing && receivedMessages === firstLongBatchProcessing + 1) { expect(event.isStale()).toEqual(false); await sleep(10000); /* 10s 'processing' @@ -791,6 +786,9 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit /* Triggers revocation */ await consumer.disconnect(); + expect(firstLongBatchProcessing).toBeDefined(); + expect(receivedMessages).toBeGreaterThan(firstLongBatchProcessing); + /* First assignment + assignment after partitions lost */ expect(assigns).toEqual(2); /* Partitions lost + revocation on disconnect */ diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts index 6045a77e..a9bb92ac 100644 --- a/types/kafkajs.d.ts +++ b/types/kafkajs.d.ts @@ -244,7 +244,17 @@ export interface ConsumerConfig { partitionAssignors?: PartitionAssignors[], } -export type ConsumerGlobalAndTopicConfig = ConsumerGlobalConfig & ConsumerTopicConfig; +export interface JSConsumerConfig { + /** + * Maximum batch size passed in eachBatch calls. + * A value of -1 means no limit. + * + * @default 32 + */ + 'js.consumer.max.batch.size'?: string | number +} + +export type ConsumerGlobalAndTopicConfig = ConsumerGlobalConfig & ConsumerTopicConfig & JSConsumerConfig; export interface ConsumerConstructorConfig extends ConsumerGlobalAndTopicConfig { kafkaJS?: ConsumerConfig;