Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# confluent-kafka-javascript 1.6.1

v1.6.1 is a maintenance release. It is supported for all usage.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's make it 1.7.0


### Enhancements

1. Configurable batch size through the `js.consumer.max.batch.size` property (#389).


# confluent-kafka-javascript 1.6.0

v1.6.0 is a feature release. It is supported for all usage.
Expand Down
6 changes: 3 additions & 3 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,9 @@ producerRun().then(consumerRun).catch(console.error);
- The `heartbeat()` no longer needs to be called by the user in the `eachMessage/eachBatch` callback.
Heartbeats are automatically managed by librdkafka.
- The `partitionsConsumedConcurrently` is supported by both `eachMessage` and `eachBatch`.
- An API compatible version of `eachBatch` is available, but the batch size calculation is not
as per configured parameters, rather, a constant maximum size is configured internally. This is subject
to change.
- An API compatible version of `eachBatch` is available, maximum batch size
can be configured through the `js.consumer.max.batch.size` configuration property
and defaults to 32.
Comment on lines +306 to +308
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- An API compatible version of `eachBatch` is available, maximum batch size
can be configured through the `js.consumer.max.batch.size` configuration property
and defaults to 32.
- An API compatible version of `eachBatch` is available, maximum batch size
can be configured through the `js.consumer.max.batch.size` configuration property
and defaults to 32. It is not dependent on the size of the batches present on the broker, as the batches are constructed client-side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think we should add that while we start attempting to fill batches with js.consumer.max.batch.size, it may be scaled down based on the actual messages received.

The property `eachBatchAutoResolve` is supported.
Within the `eachBatch` callback, use of `uncommittedOffsets` is unsupported,
and within the returned batch, `offsetLag` and `offsetLagLow` are supported.
Expand Down
64 changes: 40 additions & 24 deletions ci/tests/run_perf_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,23 @@



async function main() {

Check failure on line 42 in ci/tests/run_perf_test.js

View check run for this annotation

SonarQube-Confluent / confluent-kafka-javascript Sonarqube Results

ci/tests/run_perf_test.js#L42

Refactor this function to reduce its Cognitive Complexity from 103 to the 15 allowed.
// Run performance tests and store outputs in memory
const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 50000;
const skipCtpTest = process.env.SKIP_CTP_TEST ? process.env.SKIP_CTP_TEST === 'true' : false;
let skipCtpTest = process.env.SKIP_CTP_TEST ? process.env.SKIP_CTP_TEST === 'true' : false;
const concurrentRun = process.env.CONCURRENT_RUN ? process.env.CONCURRENT_RUN === 'true' : false;
if (concurrentRun) {
skipCtpTest = true;
}
if (!process.env.CONSUMER_MAX_BATCH_SIZE) {
process.env.CONSUMER_MAX_BATCH_SIZE = '-1';
}
if (!process.env.PARTITIONS_CONSUMED_CONCURRENTLY) {
process.env.PARTITIONS_CONSUMED_CONCURRENTLY = '2';
}
if (!process.env.COMPRESSION) {
process.env.COMPRESSION = 'GZIP';
}
const consumerMode = process.env.CONSUMER_MODE || 'all';
const produceToSecondTopic = process.env.PRODUCE_TO_SECOND_TOPIC ? process.env.PRODUCE_TO_SECOND_TOPIC === 'true' : false;
const produceToSecondTopicParam = produceToSecondTopic ? '--produce-to-second-topic' : '';
Expand All @@ -58,10 +70,10 @@
}
let outputConfluentProducerConsumer;
let outputKjsProducerConsumer;
const groupIdEachMessageConfluent = `test-group-confluent-message-` + Math.random();

Check failure on line 73 in ci/tests/run_perf_test.js

View check run for this annotation

SonarQube-Confluent / confluent-kafka-javascript Sonarqube Results

ci/tests/run_perf_test.js#L73

Make sure that using this pseudorandom number generator is safe here.
const groupIdEachBatchConfluent = `test-group-confluent-batch-` + Math.random();

Check failure on line 74 in ci/tests/run_perf_test.js

View check run for this annotation

SonarQube-Confluent / confluent-kafka-javascript Sonarqube Results

ci/tests/run_perf_test.js#L74

Make sure that using this pseudorandom number generator is safe here.
const groupIdEachMessageKafkaJS = `test-group-kafkajs-message-` + Math.random();

Check failure on line 75 in ci/tests/run_perf_test.js

View check run for this annotation

SonarQube-Confluent / confluent-kafka-javascript Sonarqube Results

ci/tests/run_perf_test.js#L75

Make sure that using this pseudorandom number generator is safe here.
const groupIdEachBatchKafkaJS = `test-group-kafkajs-batch-` + Math.random();

Check failure on line 76 in ci/tests/run_perf_test.js

View check run for this annotation

SonarQube-Confluent / confluent-kafka-javascript Sonarqube Results

ci/tests/run_perf_test.js#L76

Make sure that using this pseudorandom number generator is safe here.
if (consumerModeAll || consumerModeEachMessage) {
console.log(`Confluent eachMessage group id: ${groupIdEachMessageConfluent}`);
console.log(`KafkaJS eachMessage group id: ${groupIdEachMessageKafkaJS}`);
Expand Down Expand Up @@ -119,11 +131,11 @@

console.log('Running Confluent CTP test...');
const outputConfluentCtp = skipCtpTest ? '' :
runCommand('MODE=confluent MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp');
(await runCommand('MODE=confluent MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp'));

console.log('Running KafkaJS CTP test...');
const outputKjsCtp = skipCtpTest ? '' :
runCommand('MODE=kafkajs MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp');
(await runCommand('MODE=kafkajs MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp'));

// Extract Confluent results
let ctpConfluent, ctpKjs;
Expand Down Expand Up @@ -230,22 +242,22 @@
consumerKjsMessageRate = extractValue(outputKjsProducerConsumer, '=== Consumer Rate msg/s (eachMessage):');
consumerKjsMessageAvgLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachMessage):');
consumerKjsMessageMaxLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachMessage):');
consumerKjsMessageAvgLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachMessage):');

Check failure on line 245 in ci/tests/run_perf_test.js

View check run for this annotation

SonarQube-Confluent / confluent-kafka-javascript Sonarqube Results

ci/tests/run_perf_test.js#L245

Add the "let", "const" or "var" keyword to this declaration of "consumerKjsMessageAvgLatencyT0T2" to make it explicit.
consumerKjsMessageMaxLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachMessage):');

Check failure on line 246 in ci/tests/run_perf_test.js

View check run for this annotation

SonarQube-Confluent / confluent-kafka-javascript Sonarqube Results

ci/tests/run_perf_test.js#L246

Add the "let", "const" or "var" keyword to this declaration of "consumerKjsMessageMaxLatencyT0T2" to make it explicit.
consumerKjsTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachMessage):');
consumerKjsMessageAverageRSS = extractValue(outputKjsProducerConsumer, '=== Average consumer-each-message RSS KB:');
consumerKjsMessageMaxRSS = extractValue(outputKjsProducerConsumer, '=== Max consumer-each-message RSS KB:');
consumerKjsMessageAverageBrokerLag = extractValue(outputKjsProducerConsumer, `=== Average broker lag (${groupIdEachMessageKafkaJS}):`);
consumerKjsMessageMaxBrokerLag = extractValue(outputKjsProducerConsumer, `=== Max broker lag (${groupIdEachMessageKafkaJS}):`);
consumerKjsMessageTotalLagMeasurements = extractValue(outputKjsProducerConsumer, `=== Sample size for broker lag measurement (${groupIdEachMessageKafkaJS}):`);
}
if (consumerModeAll || consumerModeEachBatch) {
consumerKjsTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachMessage):');
consumerKjsBatch = extractValue(outputKjsProducerConsumer, '=== Consumer Rate MB/s (eachBatch):');
consumerKjsBatchRate = extractValue(outputKjsProducerConsumer, '=== Consumer Rate msg/s (eachBatch):');
consumerKjsBatchAvgLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachBatch):');
consumerKjsBatchMaxLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachBatch):');
consumerKjsBatchAvgLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachBatch):');

Check failure on line 259 in ci/tests/run_perf_test.js

View check run for this annotation

SonarQube-Confluent / confluent-kafka-javascript Sonarqube Results

ci/tests/run_perf_test.js#L259

Add the "let", "const" or "var" keyword to this declaration of "consumerKjsBatchAvgLatencyT0T2" to make it explicit.
consumerKjsBatchMaxLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachBatch):');

Check failure on line 260 in ci/tests/run_perf_test.js

View check run for this annotation

SonarQube-Confluent / confluent-kafka-javascript Sonarqube Results

ci/tests/run_perf_test.js#L260

Add the "let", "const" or "var" keyword to this declaration of "consumerKjsBatchMaxLatencyT0T2" to make it explicit.
consumerKjsBatchTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachBatch):');
consumerKjsBatchAverageLag = extractValue(outputKjsProducerConsumer, '=== Average eachBatch lag:');
consumerKjsBatchMaxLag = extractValue(outputKjsProducerConsumer, '=== Max eachBatch lag:');
Expand All @@ -269,37 +281,41 @@
if (consumerModeAll || consumerModeEachMessage) {
console.log(`Consumer rates MB/s (eachMessage): confluent ${consumerConfluentMessage}, kafkajs ${consumerKjsMessage}`);
console.log(`Consumer rates msg/s (eachMessage): confluent ${consumerConfluentMessageRate}, kafkajs ${consumerKjsMessageRate}`);
console.log(`Consumer average E2E latency T0-T1 (eachMessage): confluent ${consumerConfluentMessageAvgLatencyT0T1}, kafkajs ${consumerKjsMessageAvgLatencyT0T1}`);
console.log(`Consumer max E2E latency T0-T1 (eachMessage): confluent ${consumerConfluentMessageMaxLatencyT0T1}, kafkajs ${consumerKjsMessageMaxLatencyT0T1}`);
if (produceToSecondTopic) {
console.log(`Consumer average E2E latency T0-T2 (eachMessage): confluent ${consumerConfluentMessageAvgLatencyT0T2}, kafkajs ${consumerKjsMessageAvgLatencyT0T2}`);
console.log(`Consumer max E2E latency T0-T2 (eachMessage): confluent ${consumerConfluentMessageMaxLatencyT0T2}, kafkajs ${consumerKjsMessageMaxLatencyT0T2}`);
}
console.log(`Consumption time (eachMessage): confluent ${consumerConfluentTime}, kafkajs ${consumerKjsTime}`);
console.log(`Average RSS (eachMessage): confluent ${consumerConfluentMessageAverageRSS}, kafkajs ${consumerKjsMessageAverageRSS}`);
console.log(`Max RSS (eachMessage): confluent ${consumerConfluentMessageMaxRSS}, kafkajs ${consumerKjsMessageMaxRSS}`);
console.log(`Average broker lag (eachMessage): confluent ${consumerConfluentMessageAverageBrokerLag}, kafkajs ${consumerKjsMessageAverageBrokerLag}`);
console.log(`Max broker lag (eachMessage): confluent ${consumerConfluentMessageMaxBrokerLag}, kafkajs ${consumerKjsMessageMaxBrokerLag}`);
console.log(`Sample size for broker lag measurement (eachMessage): confluent ${consumerConfluentMessageTotalLagMeasurements}, kafkajs ${consumerKjsMessageTotalLagMeasurements}`);
if (concurrentRun) {
console.log(`Consumer average E2E latency T0-T1 (eachMessage): confluent ${consumerConfluentMessageAvgLatencyT0T1}, kafkajs ${consumerKjsMessageAvgLatencyT0T1}`);
console.log(`Consumer max E2E latency T0-T1 (eachMessage): confluent ${consumerConfluentMessageMaxLatencyT0T1}, kafkajs ${consumerKjsMessageMaxLatencyT0T1}`);
if (produceToSecondTopic) {
console.log(`Consumer average E2E latency T0-T2 (eachMessage): confluent ${consumerConfluentMessageAvgLatencyT0T2}, kafkajs ${consumerKjsMessageAvgLatencyT0T2}`);
console.log(`Consumer max E2E latency T0-T2 (eachMessage): confluent ${consumerConfluentMessageMaxLatencyT0T2}, kafkajs ${consumerKjsMessageMaxLatencyT0T2}`);
}
console.log(`Average broker lag (eachMessage): confluent ${consumerConfluentMessageAverageBrokerLag}, kafkajs ${consumerKjsMessageAverageBrokerLag}`);
console.log(`Max broker lag (eachMessage): confluent ${consumerConfluentMessageMaxBrokerLag}, kafkajs ${consumerKjsMessageMaxBrokerLag}`);
console.log(`Sample size for broker lag measurement (eachMessage): confluent ${consumerConfluentMessageTotalLagMeasurements}, kafkajs ${consumerKjsMessageTotalLagMeasurements}`);
}
}
if (consumerModeAll || consumerModeEachBatch) {
console.log(`Consumer rates MB/s (eachBatch): confluent ${consumerConfluentBatch}, kafkajs ${consumerKjsBatch}`);
console.log(`Consumer rates msg/s (eachBatch): confluent ${consumerConfluentBatchRate}, kafkajs ${consumerKjsBatchRate}`);
console.log(`Consumer average E2E latency T0-T1 (eachBatch): confluent ${consumerConfluentBatchAvgLatencyT0T1}, kafkajs ${consumerKjsBatchAvgLatencyT0T1}`);
console.log(`Consumer max E2E latency T0-T1 (eachBatch): confluent ${consumerConfluentBatchMaxLatencyT0T1}, kafkajs ${consumerKjsBatchMaxLatencyT0T1}`);
if (produceToSecondTopic) {
console.log(`Consumer average E2E latency T0-T2 (eachBatch): confluent ${consumerConfluentBatchAvgLatencyT0T2}, kafkajs ${consumerKjsBatchAvgLatencyT0T2}`);
console.log(`Consumer max E2E latency T0-T2 (eachBatch): confluent ${consumerConfluentBatchMaxLatencyT0T2}, kafkajs ${consumerKjsBatchMaxLatencyT0T2}`);
}
console.log(`Consumption time (eachBatch): confluent ${consumerConfluentBatchTime}, kafkajs ${consumerKjsBatchTime}`);
console.log(`Average eachBatch lag: confluent ${consumerConfluentBatchAverageLag}, kafkajs ${consumerKjsBatchAverageLag}`);
console.log(`Max eachBatch lag: confluent ${consumerConfluentBatchMaxLag}, kafkajs ${consumerKjsBatchMaxLag}`);
console.log(`Average eachBatch size: confluent ${consumerConfluentBatchAverageSize}, kafkajs ${consumerKjsBatchAverageSize}`);
console.log(`Average RSS (eachBatch): confluent ${consumerConfluentBatchAverageRSS}, kafkajs ${consumerKjsBatchAverageRSS}`);
console.log(`Max RSS (eachBatch): confluent ${consumerConfluentBatchMaxRSS}, kafkajs ${consumerKjsBatchMaxRSS}`);
console.log(`Average broker lag (eachBatch): confluent ${consumerConfluentBatchAverageBrokerLag}, kafkajs ${consumerKjsBatchAverageBrokerLag}`);
console.log(`Max broker lag (eachBatch): confluent ${consumerConfluentBatchMaxBrokerLag}, kafkajs ${consumerKjsBatchMaxBrokerLag}`);
console.log(`Sample size for broker lag measurement (eachBatch): confluent ${consumerConfluentBatchTotalLagMeasurements}, kafkajs ${consumerKjsBatchTotalLagMeasurements}`);
if (concurrentRun) {
console.log(`Consumer average E2E latency T0-T1 (eachBatch): confluent ${consumerConfluentBatchAvgLatencyT0T1}, kafkajs ${consumerKjsBatchAvgLatencyT0T1}`);
console.log(`Consumer max E2E latency T0-T1 (eachBatch): confluent ${consumerConfluentBatchMaxLatencyT0T1}, kafkajs ${consumerKjsBatchMaxLatencyT0T1}`);
if (produceToSecondTopic) {
console.log(`Consumer average E2E latency T0-T2 (eachBatch): confluent ${consumerConfluentBatchAvgLatencyT0T2}, kafkajs ${consumerKjsBatchAvgLatencyT0T2}`);
console.log(`Consumer max E2E latency T0-T2 (eachBatch): confluent ${consumerConfluentBatchMaxLatencyT0T2}, kafkajs ${consumerKjsBatchMaxLatencyT0T2}`);
}
console.log(`Average eachBatch lag: confluent ${consumerConfluentBatchAverageLag}, kafkajs ${consumerKjsBatchAverageLag}`);
console.log(`Max eachBatch lag: confluent ${consumerConfluentBatchMaxLag}, kafkajs ${consumerKjsBatchMaxLag}`);
console.log(`Average broker lag (eachBatch): confluent ${consumerConfluentBatchAverageBrokerLag}, kafkajs ${consumerKjsBatchAverageBrokerLag}`);
console.log(`Max broker lag (eachBatch): confluent ${consumerConfluentBatchMaxBrokerLag}, kafkajs ${consumerKjsBatchMaxBrokerLag}`);
console.log(`Sample size for broker lag measurement (eachBatch): confluent ${consumerConfluentBatchTotalLagMeasurements}, kafkajs ${consumerKjsBatchTotalLagMeasurements}`);
}
}
if (!concurrentRun) {
console.log(`Average RSS: confluent ${consumerConfluentAverageRSS}, kafkajs ${consumerKjsAverageRSS}`);
Expand All @@ -321,7 +337,7 @@
if (consumerModeAll || consumerModeEachMessage) {
if (belowThreshold(consumerConfluentMessage, consumerKjsMessage, maxPerformanceDifference)) {
console.log(`Consumer rates MB/s (eachMessage) differ by more than 30%: confluent ${consumerConfluentMessage}, kafkajs ${consumerKjsMessage}`);
// FIXME: improve consumer performance at least to KafkaJS level

Check warning on line 340 in ci/tests/run_perf_test.js

View check run for this annotation

SonarQube-Confluent / confluent-kafka-javascript Sonarqube Results

ci/tests/run_perf_test.js#L340

Take the required action to fix the issue indicated by this comment.
errcode = 0;
}

Expand Down
2 changes: 1 addition & 1 deletion ci/update-version.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ function getPackageVersion(tag, branch) {

// publish with a -devel suffix for EA and RC releases.
if (tag.prerelease.length > 0) {
baseVersion += '-' + tag.prerelease.join('-');
baseVersion += '-' + tag.prerelease.join('.');
}

console.log(`Package version is "${baseVersion}"`);
Expand Down
8 changes: 5 additions & 3 deletions examples/performance/performance-consolidated.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ const saslPassword = process.env.SASL_PASSWORD;
const topic = process.env.KAFKA_TOPIC || 'test-topic';
const topic2 = process.env.KAFKA_TOPIC2 || 'test-topic2';
const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 1000000;
const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 256;
const batchSize = process.env.BATCH_SIZE ? +process.env.BATCH_SIZE : 100;
const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 4096;
const batchSize = process.env.PRODUCER_BATCH_SIZE ? +process.env.PRODUCER_BATCH_SIZE : 100;
const compression = process.env.COMPRESSION || 'None';
// Between 0 and 1, percentage of random bytes in each message
const randomness = process.env.RANDOMNESS ? +process.env.RANDOMNESS : 0.5;
Expand Down Expand Up @@ -207,7 +207,9 @@ function logParameters(parameters) {
console.log(` ProduceTopic: ${topic2}`);
console.log(` Message Count: ${messageCount}`);
// Seed the topic with messages
await runProducer(parameters, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
await runProducerCKJS(parameters, topic, batchSize,
warmupMessages, messageCount, messageSize, compression,
randomness, limitRPS);
startTrackingMemory();
const ctpRate = await runConsumeTransformProduce(parameters, topic, topic2, warmupMessages, messageCount, messageProcessTimeMs, ctpConcurrency);
endTrackingMemory('consume-transform-produce', `consume-transform-produce-${mode}.json`);
Expand Down
23 changes: 10 additions & 13 deletions examples/performance/performance-primitives-common.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ else {
}
}

function installHandlers() {
function installHandlers(useTerminateTimeout) {
const handlers = {
terminationRequested: false,
terminateTimeout: null,
Expand All @@ -26,8 +26,10 @@ function installHandlers() {
process.on('SIGINT', terminationRequestedCallback);
process.on('SIGTERM', terminationRequestedCallback);
handlers.terminationRequestedCallback = terminationRequestedCallback;
handlers.terminateTimeout = setTimeout(terminationRequestedCallback,
TERMINATE_TIMEOUT_MS);
if (useTerminateTimeout) {
handlers.terminateTimeout = setTimeout(terminationRequestedCallback,
TERMINATE_TIMEOUT_MS);
}
return handlers;
}

Expand Down Expand Up @@ -57,7 +59,7 @@ function genericProduceToTopic(producer, topic, messages) {
}

async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats, actionOnMessages) {
const handlers = installHandlers();
const handlers = installHandlers(totalMessageCnt === -1);
await consumer.connect();
await consumer.subscribe({ topic });

Expand Down Expand Up @@ -179,18 +181,13 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
if (!startTime) {
startTime = hrtime.bigint();
} else if (totalMessageCnt > 0 && messagesMeasured >= totalMessageCnt) {
let durationNanos = Number(hrtime.bigint() - startTime);
durationSeconds = durationNanos / 1e9;
rate = durationNanos === 0 ? Infinity :
(totalMessageSize / durationNanos) * 1e9 / (1024 * 1024); /* MB/s */
console.log(`Recvd ${messagesMeasured} messages in ${durationSeconds} seconds, ${totalMessageSize} bytes; rate is ${rate} MB/s`);
consumer.pause([{ topic }]);
stopConsuming();
}
}

if (actionOnMessages) {
await actionOnMessages(batch.messages);
if (messagesMeasured > 0) {
if (messagesMeasured > 0 && messages.length > 0) {
let i = 1;
const now = Date.now();
for (const message of messages) {
Expand Down Expand Up @@ -243,7 +240,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
}

async function runProducer(producer, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression, randomness, limitRPS) {
const handlers = installHandlers();
const handlers = installHandlers(totalMessageCnt === -1);
let totalMessagesSent = 0;
let totalBytesSent = 0;

Expand Down Expand Up @@ -347,7 +344,7 @@ async function runProducer(producer, topic, batchSize, warmupMessages, totalMess
}

async function runLagMonitoring(admin, topic) {
const handlers = installHandlers();
const handlers = installHandlers(true);
let groupId = process.env.GROUPID_MONITOR;
if (!groupId) {
throw new Error("GROUPID_MONITOR environment variable not set");
Expand Down
8 changes: 8 additions & 0 deletions examples/performance/performance-primitives.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ module.exports = {
newCompatibleProducer,
};


const CONSUMER_MAX_BATCH_SIZE = process.env.CONSUMER_MAX_BATCH_SIZE ? +process.env.CONSUMER_MAX_BATCH_SIZE : null;

function baseConfiguration(parameters) {
let ret = {
'client.id': 'kafka-test-performance',
Expand Down Expand Up @@ -146,6 +149,10 @@ function newCompatibleConsumer(parameters, eachBatch) {
const autoCommitOpts = autoCommit > 0 ?
{ 'enable.auto.commit': true, 'auto.commit.interval.ms': autoCommit } :
{ 'enable.auto.commit': false };
const jsOpts = {};
if (eachBatch && CONSUMER_MAX_BATCH_SIZE !== null) {
jsOpts['js.consumer.max.batch.size'] = CONSUMER_MAX_BATCH_SIZE;
}

let groupId = eachBatch ? process.env.GROUPID_BATCH : process.env.GROUPID_MESSAGE;
if (!groupId) {
Expand All @@ -157,6 +164,7 @@ function newCompatibleConsumer(parameters, eachBatch) {
'auto.offset.reset': 'earliest',
'fetch.queue.backoff.ms': '100',
...autoCommitOpts,
...jsOpts,
});
return new CompatibleConsumer(consumer);
}
Expand Down
Loading