diff --git a/.gitignore b/.gitignore index 07f5f6b2..d49064a2 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,5 @@ coverage .nyc_output/ *lcov.info **/lcov-report +examples/performance/*.json +*.log diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index cc1d5ce5..6a0c8c45 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -191,7 +191,7 @@ blocks: - export NODE_OPTIONS='--max-old-space-size=1536' - cd examples/performance - npm install - - bash -c '../../ci/tests/run_perf_test.sh' + - node '../../ci/tests/run_perf_test.js' - rm -rf ./node_modules - name: "Linux amd64: Release" diff --git a/ci/tests/run_perf_test.js b/ci/tests/run_perf_test.js new file mode 100644 index 00000000..b8e9e808 --- /dev/null +++ b/ci/tests/run_perf_test.js @@ -0,0 +1,382 @@ +#!/usr/bin/env node + +const util = require('util'); +const exec = util.promisify(require('child_process').exec); + +async function runCommand(command) { + try { + const output = await exec(command, { encoding: 'utf8', stdio: 'pipe' }); + return output.stdout; + } catch (error) { + return (error.stdout || '') + (error.stderr || '') + (error.message || ''); + } +} + +function extractValue(content, pattern) { + try { + const lines = content.split('\n'); + const matchingLine = lines.find(line => line.includes(pattern)); + if (matchingLine) { + const value = matchingLine.split(':')[1]?.trim(); + return Number(value || ''); + } + return NaN; + } catch (error) { + return NaN; + } +} + +function belowThreshold(value, target, threshold = 0.7) { + if (isNaN(value) || isNaN(target)) + return false; + return value < (target * threshold); +} + +function belowTarget(value, target) { + return belowThreshold(value, target, 1); +} + + + + +async function main() { + // Run performance tests and store outputs in memory + const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 50000; + const skipCtpTest = process.env.SKIP_CTP_TEST ? process.env.SKIP_CTP_TEST === 'true' : false; + const concurrentRun = process.env.CONCURRENT_RUN ? process.env.CONCURRENT_RUN === 'true' : false; + const consumerMode = process.env.CONSUMER_MODE || 'all'; + const produceToSecondTopic = process.env.PRODUCE_TO_SECOND_TOPIC ? process.env.PRODUCE_TO_SECOND_TOPIC === 'true' : false; + const produceToSecondTopicParam = produceToSecondTopic ? '--produce-to-second-topic' : ''; + const consumerModeAll = consumerMode === 'all'; + const consumerModeEachMessage = consumerMode === 'eachMessage'; + const consumerModeEachBatch = consumerMode === 'eachBatch'; + let consumerParam = '--consumer'; + if (consumerModeEachMessage) { + consumerParam = '--consumer-each-message'; + } else if (consumerModeEachBatch) { + consumerParam = '--consumer-each-batch'; + } + let outputConfluentProducerConsumer; + let outputKjsProducerConsumer; + const groupIdEachMessageConfluent = `test-group-confluent-message-` + Math.random(); + const groupIdEachBatchConfluent = `test-group-confluent-batch-` + Math.random(); + const groupIdEachMessageKafkaJS = `test-group-kafkajs-message-` + Math.random(); + const groupIdEachBatchKafkaJS = `test-group-kafkajs-batch-` + Math.random(); + if (consumerModeAll || consumerModeEachMessage) { + console.log(`Confluent eachMessage group id: ${groupIdEachMessageConfluent}`); + console.log(`KafkaJS eachMessage group id: ${groupIdEachMessageKafkaJS}`); + } + if (consumerModeAll || consumerModeEachBatch) { + console.log(`Confluent eachBatch group id: ${groupIdEachBatchConfluent}`); + console.log(`KafkaJS eachBatch group id: ${groupIdEachBatchKafkaJS}`); + } + + const runProducerConsumerMode = async (mode) => { + const modeLabel = mode === 'confluent' ? 'Confluent' : 'KafkaJS'; + const groupIdEachMessage = mode === 'confluent' ? groupIdEachMessageConfluent : groupIdEachMessageKafkaJS; + const groupIdEachBatch = mode === 'confluent' ? groupIdEachBatchConfluent : groupIdEachBatchKafkaJS; + + if (concurrentRun) { + console.log(`Running ${modeLabel} Producer/Consumer test (concurrently)...`); + const TERMINATE_TIMEOUT_MS = process.env.TERMINATE_TIMEOUT_MS ? +process.env.TERMINATE_TIMEOUT_MS : 600000; + // Wait 2s more to see if all lag is caught up + const TERMINATE_TIMEOUT_MS_CONSUMERS = TERMINATE_TIMEOUT_MS + 2000; + + await runCommand(`MODE=${mode} node performance-consolidated.js --create-topics`); + const allPromises = []; + allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} node performance-consolidated.js --producer`)); + if (consumerModeAll || consumerModeEachMessage) { + allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MESSAGE=${groupIdEachMessage} node performance-consolidated.js --consumer-each-message ${produceToSecondTopicParam}`)); + } + if (consumerModeAll || consumerModeEachBatch) { + allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_BATCH=${groupIdEachBatch} node performance-consolidated.js --consumer-each-batch ${produceToSecondTopicParam}`)); + } + if (consumerModeAll || consumerModeEachMessage) { + allPromises.push(runCommand(`MODE=${mode} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MONITOR=${groupIdEachMessage} node performance-consolidated.js --monitor-lag`)); + } + if (consumerModeAll || consumerModeEachBatch) { + allPromises.push(runCommand(`MODE=${mode} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MONITOR=${groupIdEachBatch} node performance-consolidated.js --monitor-lag`)); + } + const results = await Promise.allSettled(allPromises); + return results.map(r => r.status === 'fulfilled' ? r.value : '').join('\n'); + } else { + console.log(`Running ${modeLabel} Producer/Consumer test...`); + return runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} GROUPID_MESSAGE=${groupIdEachMessage} GROUPID_BATCH=${groupIdEachBatch} node performance-consolidated.js --create-topics ${consumerParam} ${produceToSecondTopicParam} --producer`); + } + } + + const runProducerConsumer = async () => { + console.log(`Running Producer/Consumer tests with Confluent Kafka JS at ${new Date().toISOString()}`); + outputConfluentProducerConsumer = await runProducerConsumerMode('confluent'); + console.log(`Running Producer/Consumer tests with KafkaJS at ${new Date().toISOString()}`); + outputKjsProducerConsumer = await runProducerConsumerMode('kafkajs'); + console.log(`Producer/Consumer tests completed at ${new Date().toISOString()}`); + } + + await runProducerConsumer(); + console.log(outputConfluentProducerConsumer); + console.log(outputKjsProducerConsumer); + + console.log('Running Confluent CTP test...'); + const outputConfluentCtp = skipCtpTest ? '' : + runCommand('MODE=confluent MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp'); + + console.log('Running KafkaJS CTP test...'); + const outputKjsCtp = skipCtpTest ? '' : + runCommand('MODE=kafkajs MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp'); + + // Extract Confluent results + let ctpConfluent, ctpKjs; + let consumerConfluentMessage; + let consumerConfluentMessageRate; + let consumerConfluentMessageAvgLatencyT0T1; + let consumerConfluentMessageMaxLatencyT0T1; + let consumerConfluentMessageAvgLatencyT0T2; + let consumerConfluentMessageMaxLatencyT0T2; + let consumerConfluentTime; + let consumerConfluentMessageAverageRSS; + let consumerConfluentMessageMaxRSS; + let consumerConfluentMessageAverageBrokerLag; + let consumerConfluentMessageMaxBrokerLag; + let consumerConfluentMessageTotalLagMeasurements; + + let consumerConfluentBatch; + let consumerConfluentBatchRate; + let consumerConfluentBatchAvgLatencyT0T1; + let consumerConfluentBatchMaxLatencyT0T1; + let consumerConfluentBatchAvgLatencyT0T2; + let consumerConfluentBatchMaxLatencyT0T2; + let consumerConfluentBatchTime; + let consumerConfluentBatchAverageLag; + let consumerConfluentBatchMaxLag; + let consumerConfluentBatchAverageSize; + let consumerConfluentBatchAverageRSS; + let consumerConfluentBatchMaxRSS; + let consumerConfluentBatchAverageBrokerLag; + let consumerConfluentBatchMaxBrokerLag; + let consumerConfluentBatchTotalLagMeasurements; + + const producerConfluent = extractValue(outputConfluentProducerConsumer, '=== Producer Rate:'); + const producerConfluentAverageRSS = extractValue(outputConfluentProducerConsumer, '=== Average producer RSS KB:'); + const producerConfluentMaxRSS = extractValue(outputConfluentProducerConsumer, '=== Max producer RSS KB:'); + if (consumerModeAll || consumerModeEachMessage) { + consumerConfluentMessage = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate MB/s (eachMessage):'); + consumerConfluentMessageRate = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate msg/s (eachMessage):'); + consumerConfluentMessageAvgLatencyT0T1 = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachMessage):'); + consumerConfluentMessageMaxLatencyT0T1 = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachMessage):'); + consumerConfluentMessageAvgLatencyT0T2 = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachMessage):'); + consumerConfluentMessageMaxLatencyT0T2 = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachMessage):'); + consumerConfluentTime = extractValue(outputConfluentProducerConsumer, '=== Consumption time (eachMessage):'); + consumerConfluentMessageAverageRSS = extractValue(outputConfluentProducerConsumer, '=== Average consumer-each-message RSS KB:'); + consumerConfluentMessageMaxRSS = extractValue(outputConfluentProducerConsumer, '=== Max consumer-each-message RSS KB:'); + consumerConfluentMessageAverageBrokerLag = extractValue(outputConfluentProducerConsumer, `=== Average broker lag (${groupIdEachMessageConfluent}):`); + consumerConfluentMessageMaxBrokerLag = extractValue(outputConfluentProducerConsumer, `=== Max broker lag (${groupIdEachMessageConfluent}):`); + consumerConfluentMessageTotalLagMeasurements = extractValue(outputConfluentProducerConsumer, `=== Sample size for broker lag measurement (${groupIdEachMessageConfluent}):`); + } + if (consumerModeAll || consumerModeEachBatch) { + consumerConfluentBatch = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate MB/s (eachBatch):'); + consumerConfluentBatchRate = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate msg/s (eachBatch):'); + consumerConfluentBatchAvgLatencyT0T1 = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachBatch):'); + consumerConfluentBatchMaxLatencyT0T1 = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachBatch):'); + consumerConfluentBatchAvgLatencyT0T2 = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachBatch):'); + consumerConfluentBatchMaxLatencyT0T2 = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachBatch):'); + consumerConfluentBatchTime = extractValue(outputConfluentProducerConsumer, '=== Consumption time (eachBatch):'); + consumerConfluentBatchAverageLag = extractValue(outputConfluentProducerConsumer, '=== Average eachBatch lag:'); + consumerConfluentBatchMaxLag = extractValue(outputConfluentProducerConsumer, '=== Max eachBatch lag:'); + consumerConfluentBatchAverageSize = extractValue(outputConfluentProducerConsumer, '=== Average eachBatch size:'); + consumerConfluentBatchAverageRSS = extractValue(outputConfluentProducerConsumer, '=== Average consumer-each-batch RSS KB:'); + consumerConfluentBatchMaxRSS = extractValue(outputConfluentProducerConsumer, '=== Max consumer-each-batch RSS KB:'); + consumerConfluentBatchAverageBrokerLag = extractValue(outputConfluentProducerConsumer, `=== Average broker lag (${groupIdEachBatchConfluent}):`); + consumerConfluentBatchMaxBrokerLag = extractValue(outputConfluentProducerConsumer, `=== Max broker lag (${groupIdEachBatchConfluent}):`); + consumerConfluentBatchTotalLagMeasurements = extractValue(outputConfluentProducerConsumer, `=== Sample size for broker lag measurement (${groupIdEachBatchConfluent}):`); + } + const consumerConfluentAverageRSS = extractValue(outputConfluentProducerConsumer, '=== Max Average RSS across tests:'); + const consumerConfluentMaxRSS = extractValue(outputConfluentProducerConsumer, '=== Max RSS across tests:'); + if (!skipCtpTest) { + ctpConfluent = extractValue(outputConfluentCtp, '=== Consume-Transform-Produce Rate:'); + } + + // Extract KafkaJS results + let consumerKjsMessage; + let consumerKjsMessageRate; + let consumerKjsMessageAvgLatencyT0T1; + let consumerKjsMessageMaxLatencyT0T1; + let consumerKjsTime; + let consumerKjsMessageAverageRSS; + let consumerKjsMessageMaxRSS; + let consumerKjsMessageAverageBrokerLag; + let consumerKjsMessageMaxBrokerLag; + let consumerKjsMessageTotalLagMeasurements; + + let consumerKjsBatch; + let consumerKjsBatchRate; + let consumerKjsBatchAvgLatencyT0T1; + let consumerKjsBatchMaxLatencyT0T1; + let consumerKjsBatchTime; + let consumerKjsBatchAverageLag; + let consumerKjsBatchMaxLag; + let consumerKjsBatchAverageSize; + let consumerKjsBatchAverageRSS; + let consumerKjsBatchMaxRSS; + let consumerKjsBatchAverageBrokerLag; + let consumerKjsBatchMaxBrokerLag; + let consumerKjsBatchTotalLagMeasurements; + + const producerKjs = extractValue(outputKjsProducerConsumer, '=== Producer Rate:'); + const producerKjsAverageRSS = extractValue(outputKjsProducerConsumer, '=== Average producer RSS KB:'); + const producerKjsMaxRSS = extractValue(outputKjsProducerConsumer, '=== Max producer RSS KB:'); + if (consumerModeAll || consumerModeEachMessage) { + consumerKjsMessage = extractValue(outputKjsProducerConsumer, '=== Consumer Rate MB/s (eachMessage):'); + consumerKjsMessageRate = extractValue(outputKjsProducerConsumer, '=== Consumer Rate msg/s (eachMessage):'); + consumerKjsMessageAvgLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachMessage):'); + consumerKjsMessageMaxLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachMessage):'); + consumerKjsMessageAvgLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachMessage):'); + consumerKjsMessageMaxLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachMessage):'); + consumerKjsMessageAverageRSS = extractValue(outputKjsProducerConsumer, '=== Average consumer-each-message RSS KB:'); + consumerKjsMessageMaxRSS = extractValue(outputKjsProducerConsumer, '=== Max consumer-each-message RSS KB:'); + consumerKjsMessageAverageBrokerLag = extractValue(outputKjsProducerConsumer, `=== Average broker lag (${groupIdEachMessageKafkaJS}):`); + consumerKjsMessageMaxBrokerLag = extractValue(outputKjsProducerConsumer, `=== Max broker lag (${groupIdEachMessageKafkaJS}):`); + consumerKjsMessageTotalLagMeasurements = extractValue(outputKjsProducerConsumer, `=== Sample size for broker lag measurement (${groupIdEachMessageKafkaJS}):`); + } + if (consumerModeAll || consumerModeEachBatch) { + consumerKjsTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachMessage):'); + consumerKjsBatch = extractValue(outputKjsProducerConsumer, '=== Consumer Rate MB/s (eachBatch):'); + consumerKjsBatchRate = extractValue(outputKjsProducerConsumer, '=== Consumer Rate msg/s (eachBatch):'); + consumerKjsBatchAvgLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachBatch):'); + consumerKjsBatchMaxLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachBatch):'); + consumerKjsBatchAvgLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachBatch):'); + consumerKjsBatchMaxLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachBatch):'); + consumerKjsBatchTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachBatch):'); + consumerKjsBatchAverageLag = extractValue(outputKjsProducerConsumer, '=== Average eachBatch lag:'); + consumerKjsBatchMaxLag = extractValue(outputKjsProducerConsumer, '=== Max eachBatch lag:'); + consumerKjsBatchAverageSize = extractValue(outputKjsProducerConsumer, '=== Average eachBatch size:'); + consumerKjsBatchAverageRSS = extractValue(outputKjsProducerConsumer, '=== Average consumer-each-batch RSS KB:'); + consumerKjsBatchMaxRSS = extractValue(outputKjsProducerConsumer, '=== Max consumer-each-batch RSS KB:'); + consumerKjsBatchAverageBrokerLag = extractValue(outputKjsProducerConsumer, `=== Average broker lag (${groupIdEachBatchKafkaJS}):`); + consumerKjsBatchMaxBrokerLag = extractValue(outputKjsProducerConsumer, `=== Max broker lag (${groupIdEachBatchKafkaJS}):`); + consumerKjsBatchTotalLagMeasurements = extractValue(outputKjsProducerConsumer, `=== Sample size for broker lag measurement (${groupIdEachBatchKafkaJS}):`); + } + const consumerKjsAverageRSS = extractValue(outputKjsProducerConsumer, '=== Max Average RSS across tests:'); + const consumerKjsMaxRSS = extractValue(outputKjsProducerConsumer, '=== Max RSS across tests:'); + if (!skipCtpTest) { + ctpKjs = extractValue(outputKjsCtp, '=== Consume-Transform-Produce Rate:'); + } + + // Print results + console.log(`Producer rates: confluent ${producerConfluent}, kafkajs ${producerKjs}`); + console.log(`Average RSS (produce): confluent ${producerConfluentAverageRSS}, kafkajs ${producerKjsAverageRSS}`); + console.log(`Max RSS (produce): confluent ${producerConfluentMaxRSS}, kafkajs ${producerKjsMaxRSS}`); + if (consumerModeAll || consumerModeEachMessage) { + console.log(`Consumer rates MB/s (eachMessage): confluent ${consumerConfluentMessage}, kafkajs ${consumerKjsMessage}`); + console.log(`Consumer rates msg/s (eachMessage): confluent ${consumerConfluentMessageRate}, kafkajs ${consumerKjsMessageRate}`); + console.log(`Consumer average E2E latency T0-T1 (eachMessage): confluent ${consumerConfluentMessageAvgLatencyT0T1}, kafkajs ${consumerKjsMessageAvgLatencyT0T1}`); + console.log(`Consumer max E2E latency T0-T1 (eachMessage): confluent ${consumerConfluentMessageMaxLatencyT0T1}, kafkajs ${consumerKjsMessageMaxLatencyT0T1}`); + if (produceToSecondTopic) { + console.log(`Consumer average E2E latency T0-T2 (eachMessage): confluent ${consumerConfluentMessageAvgLatencyT0T2}, kafkajs ${consumerKjsMessageAvgLatencyT0T2}`); + console.log(`Consumer max E2E latency T0-T2 (eachMessage): confluent ${consumerConfluentMessageMaxLatencyT0T2}, kafkajs ${consumerKjsMessageMaxLatencyT0T2}`); + } + console.log(`Consumption time (eachMessage): confluent ${consumerConfluentTime}, kafkajs ${consumerKjsTime}`); + console.log(`Average RSS (eachMessage): confluent ${consumerConfluentMessageAverageRSS}, kafkajs ${consumerKjsMessageAverageRSS}`); + console.log(`Max RSS (eachMessage): confluent ${consumerConfluentMessageMaxRSS}, kafkajs ${consumerKjsMessageMaxRSS}`); + console.log(`Average broker lag (eachMessage): confluent ${consumerConfluentMessageAverageBrokerLag}, kafkajs ${consumerKjsMessageAverageBrokerLag}`); + console.log(`Max broker lag (eachMessage): confluent ${consumerConfluentMessageMaxBrokerLag}, kafkajs ${consumerKjsMessageMaxBrokerLag}`); + console.log(`Sample size for broker lag measurement (eachMessage): confluent ${consumerConfluentMessageTotalLagMeasurements}, kafkajs ${consumerKjsMessageTotalLagMeasurements}`); + } + if (consumerModeAll || consumerModeEachBatch) { + console.log(`Consumer rates MB/s (eachBatch): confluent ${consumerConfluentBatch}, kafkajs ${consumerKjsBatch}`); + console.log(`Consumer rates msg/s (eachBatch): confluent ${consumerConfluentBatchRate}, kafkajs ${consumerKjsBatchRate}`); + console.log(`Consumer average E2E latency T0-T1 (eachBatch): confluent ${consumerConfluentBatchAvgLatencyT0T1}, kafkajs ${consumerKjsBatchAvgLatencyT0T1}`); + console.log(`Consumer max E2E latency T0-T1 (eachBatch): confluent ${consumerConfluentBatchMaxLatencyT0T1}, kafkajs ${consumerKjsBatchMaxLatencyT0T1}`); + if (produceToSecondTopic) { + console.log(`Consumer average E2E latency T0-T2 (eachBatch): confluent ${consumerConfluentBatchAvgLatencyT0T2}, kafkajs ${consumerKjsBatchAvgLatencyT0T2}`); + console.log(`Consumer max E2E latency T0-T2 (eachBatch): confluent ${consumerConfluentBatchMaxLatencyT0T2}, kafkajs ${consumerKjsBatchMaxLatencyT0T2}`); + } + console.log(`Consumption time (eachBatch): confluent ${consumerConfluentBatchTime}, kafkajs ${consumerKjsBatchTime}`); + console.log(`Average eachBatch lag: confluent ${consumerConfluentBatchAverageLag}, kafkajs ${consumerKjsBatchAverageLag}`); + console.log(`Max eachBatch lag: confluent ${consumerConfluentBatchMaxLag}, kafkajs ${consumerKjsBatchMaxLag}`); + console.log(`Average eachBatch size: confluent ${consumerConfluentBatchAverageSize}, kafkajs ${consumerKjsBatchAverageSize}`); + console.log(`Average RSS (eachBatch): confluent ${consumerConfluentBatchAverageRSS}, kafkajs ${consumerKjsBatchAverageRSS}`); + console.log(`Max RSS (eachBatch): confluent ${consumerConfluentBatchMaxRSS}, kafkajs ${consumerKjsBatchMaxRSS}`); + console.log(`Average broker lag (eachBatch): confluent ${consumerConfluentBatchAverageBrokerLag}, kafkajs ${consumerKjsBatchAverageBrokerLag}`); + console.log(`Max broker lag (eachBatch): confluent ${consumerConfluentBatchMaxBrokerLag}, kafkajs ${consumerKjsBatchMaxBrokerLag}`); + console.log(`Sample size for broker lag measurement (eachBatch): confluent ${consumerConfluentBatchTotalLagMeasurements}, kafkajs ${consumerKjsBatchTotalLagMeasurements}`); + } + if (!concurrentRun) { + console.log(`Average RSS: confluent ${consumerConfluentAverageRSS}, kafkajs ${consumerKjsAverageRSS}`); + console.log(`Max RSS: confluent ${consumerConfluentMaxRSS}, kafkajs ${consumerKjsMaxRSS}`); + } + if (!skipCtpTest) { + console.log(`CTP rates: confluent ${ctpConfluent}, kafkajs ${ctpKjs}`); + } + + let errcode = 0; + const maxPerformanceDifference = 0.7; + + // Compare against KJS (30% threshold) + if (belowThreshold(producerConfluent, producerKjs, maxPerformanceDifference)) { + console.log(`Producer rates differ by more than 30%: confluent ${producerConfluent}, kafkajs ${producerKjs}`); + errcode = 1; + } + + if (consumerModeAll || consumerModeEachMessage) { + if (belowThreshold(consumerConfluentMessage, consumerKjsMessage, maxPerformanceDifference)) { + console.log(`Consumer rates MB/s (eachMessage) differ by more than 30%: confluent ${consumerConfluentMessage}, kafkajs ${consumerKjsMessage}`); + // FIXME: improve consumer performance at least to KafkaJS level + errcode = 0; + } + + // Lower is better for time + if (belowThreshold(consumerKjsTime, consumerConfluentTime, maxPerformanceDifference)) { + console.log(`Consumption time (eachMessage) differ by more than 30%: confluent ${consumerConfluentTime}, kafkajs ${consumerKjsTime}`); + errcode = 0; + } + } + + if (consumerModeAll || consumerModeEachBatch) { + if (belowThreshold(consumerConfluentBatch, consumerKjsBatch, maxPerformanceDifference)) { + console.log(`Consumer rates (eachBatch) differ by more than 30%: confluent ${consumerConfluentBatch}, kafkajs ${consumerKjsBatch}`); + errcode = 0; + } + + // Lower is better for time + if (belowThreshold(consumerKjsBatchTime, consumerConfluentBatchTime, maxPerformanceDifference)) { + console.log(`Consumption time (eachBatch) differ by more than 30%: confluent ${consumerConfluentBatchTime}, kafkajs ${consumerKjsBatchTime}`); + errcode = 0; + } + } + + if (!skipCtpTest && belowThreshold(ctpConfluent, ctpKjs, maxPerformanceDifference)) { + console.log(`CTP rates differ by more than 30%: confluent ${ctpConfluent}, kafkajs ${ctpKjs}`); + errcode = 1; + } + + // Compare against target numbers + const TARGET_PRODUCE = process.env.TARGET_PRODUCE_PERFORMANCE || '35'; + const TARGET_CONSUME = process.env.TARGET_CONSUME_PERFORMANCE || '18'; + const TARGET_CTP = process.env.TARGET_CTP_PERFORMANCE || '0.02'; + + if (belowTarget(producerConfluent, TARGET_PRODUCE)) { + console.log(`Confluent producer rate is below target: ${producerConfluent}`); + errcode = 1; + } + + if ((consumerModeAll || consumerModeEachMessage) && belowTarget(consumerConfluentMessage, TARGET_CONSUME)) { + console.log(`Confluent consumer rate (eachMessage) is below target: ${consumerConfluentMessage}`); + errcode = 1; + } + + if ((consumerModeAll || consumerModeEachBatch) && belowTarget(consumerConfluentBatch, TARGET_CONSUME)) { + console.log(`Confluent consumer rate (eachBatch) is below target: ${consumerConfluentBatch}`); + errcode = 1; + } + + if (!skipCtpTest && belowTarget(ctpConfluent, TARGET_CTP)) { + console.log(`Confluent CTP rate is below target: ${ctpConfluent}`); + errcode = 1; + } + + process.exit(errcode); +} + +if (require.main === module) + main(); \ No newline at end of file diff --git a/ci/tests/run_perf_test.sh b/ci/tests/run_perf_test.sh deleted file mode 100755 index c55a67ab..00000000 --- a/ci/tests/run_perf_test.sh +++ /dev/null @@ -1,64 +0,0 @@ -#!/bin/bash - -testresultConfluentProducerConsumer=$(mktemp) -testresultConfluentCtp=$(mktemp) -testresultKjsProducerConsumer=$(mktemp) -testresultKjsCtp=$(mktemp) - -MODE=confluent MESSAGE_COUNT=500000 node performance-consolidated.js --create-topics --consumer --producer 2>&1 | tee "$testresultConfluentProducerConsumer" -MODE=kafkajs MESSAGE_COUNT=500000 node performance-consolidated.js --create-topics --consumer --producer 2>&1 | tee "$testresultKjsProducerConsumer" -MODE=confluent MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp 2>&1 | tee "$testresultConfluentCtp" -MODE=kafkajs MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp 2>&1 | tee "$testresultKjsCtp" - -producerConfluent=$(grep "=== Producer Rate:" "$testresultConfluentProducerConsumer" | cut -d':' -f2 | tr -d ' ') -consumerConfluent=$(grep "=== Consumer Rate:" "$testresultConfluentProducerConsumer" | cut -d':' -f2 | tr -d ' ') -ctpConfluent=$(grep "=== Consume-Transform-Produce Rate:" "$testresultConfluentCtp" | cut -d':' -f2 | tr -d ' ') -producerKjs=$(grep "=== Producer Rate:" "$testresultKjsProducerConsumer" | cut -d':' -f2 | tr -d ' ') -consumerKjs=$(grep "=== Consumer Rate:" "$testresultKjsProducerConsumer" | cut -d':' -f2 | tr -d ' ') -ctpKjs=$(grep "=== Consume-Transform-Produce Rate:" "$testresultKjsCtp" | cut -d':' -f2 | tr -d ' ') - -echo "Producer rates: confluent $producerConfluent, kafkajs $producerKjs" -echo "Consumer rates: confluent $consumerConfluent, kafkajs $consumerKjs" -echo "CTP rates: confluent $ctpConfluent, kafkajs $ctpKjs" - -errcode=0 - -# Compare against KJS -if [[ $(echo "$producerConfluent < $producerKjs * 70 / 100" | bc -l) -eq 1 ]]; then - echo "Producer rates differ by more than 30%: confluent $producerConfluent, kafkajs $producerKjs" - errcode=1 -fi - -if [[ $(echo "$consumerConfluent < $consumerKjs * 70 / 100" | bc -l) -eq 1 ]]; then - echo "Consumer rates differ by more than 30%: confluent $consumerConfluent, kafkajs $consumerKjs" - # FIXME: improve consumer performance at least to KafkaJS level - errcode=0 -fi - -if [[ $(echo "$ctpConfluent < $ctpKjs * 70 / 100" | bc -l) -eq 1 ]]; then - echo "CTP rates differ by more than 30%: confluent $ctpConfluent, kafkajs $ctpKjs" - errcode=1 -fi - -# Compare against numbers set within semaphore config -TARGET_PRODUCE="${TARGET_PRODUCE_PERFORMANCE:-35}" -TARGET_CONSUME="${TARGET_CONSUME_PERFORMANCE:-18}" -TARGET_CTP="${TARGET_CTP_PERFORMANCE:-0.02}" - -if [[ $(echo "$producerConfluent < $TARGET_PRODUCE" | bc -l) -eq 1 ]]; then - echo "Confluent producer rate is below target: $producerConfluent" - errcode=1 -fi - -if [[ $(echo "$consumerConfluent < $TARGET_CONSUME" | bc -l) -eq 1 ]]; then - echo "Confluent consumer rate is below target: $consumerConfluent" - errcode=1 -fi - -if [[ $(echo "$ctpConfluent < $TARGET_CTP" | bc -l) -eq 1 ]]; then - echo "Confluent CTP rate is below target: $ctpConfluent" - errcode=1 -fi - -exit $errcode - diff --git a/examples/performance/performance-consolidated.js b/examples/performance/performance-consolidated.js index 7ef9bce6..4684a709 100644 --- a/examples/performance/performance-consolidated.js +++ b/examples/performance/performance-consolidated.js @@ -1,84 +1,229 @@ +const fs = require('fs'); const mode = process.env.MODE ? process.env.MODE : 'confluent'; -let runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether; +let runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether, runProducerCKJS; if (mode === 'confluent') { - ({ runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether } = require('./performance-primitives')); + ({ runProducer, runConsumer, runConsumeTransformProduce, + runCreateTopics, runLagMonitoring, + runProducerConsumerTogether } = require('./performance-primitives')); + runProducerCKJS = runProducer; } else { - ({ runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether } = require('./performance-primitives-kafkajs')); + ({ runProducer, runConsumer, runConsumeTransformProduce, runProducerConsumerTogether } = require('./performance-primitives-kafkajs')); + /* createTopics is more reliable in CKJS */ + ({ runCreateTopics, runLagMonitoring, runProducer: runProducerCKJS } = require('./performance-primitives')); } const brokers = process.env.KAFKA_BROKERS || 'localhost:9092'; +const securityProtocol = process.env.SECURITY_PROTOCOL; +const saslUsername = process.env.SASL_USERNAME; +const saslPassword = process.env.SASL_PASSWORD; const topic = process.env.KAFKA_TOPIC || 'test-topic'; const topic2 = process.env.KAFKA_TOPIC2 || 'test-topic2'; const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 1000000; const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 256; const batchSize = process.env.BATCH_SIZE ? +process.env.BATCH_SIZE : 100; const compression = process.env.COMPRESSION || 'None'; +// Between 0 and 1, percentage of random bytes in each message +const randomness = process.env.RANDOMNESS ? +process.env.RANDOMNESS : 0.5; +const numPartitions = process.env.PARTITIONS ? +process.env.PARTITIONS : 3; +const partitionsConsumedConcurrently = process.env.PARTITIONS_CONSUMED_CONCURRENTLY ? +process.env.PARTITIONS_CONSUMED_CONCURRENTLY : 1; const warmupMessages = process.env.WARMUP_MESSAGES ? +process.env.WARMUP_MESSAGES : (batchSize * 10); const messageProcessTimeMs = process.env.MESSAGE_PROCESS_TIME_MS ? +process.env.MESSAGE_PROCESS_TIME_MS : 5; const ctpConcurrency = process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY ? +process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY : 1; const consumerProcessingTime = process.env.CONSUMER_PROCESSING_TIME ? +process.env.CONSUMER_PROCESSING_TIME : 100; const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.env.PRODUCER_PROCESSING_TIME : 100; +const limitRPS = process.env.LIMIT_RPS ? +process.env.LIMIT_RPS : null; +const useCKJSProducerEverywhere = process.env.USE_CKJS_PRODUCER_EVERYWHERE === 'true'; +const parameters = { + brokers, + securityProtocol, + saslUsername, + saslPassword, +} + +function logParameters(parameters) { + console.log(` Brokers: ${parameters.brokers}`); + if (parameters.securityProtocol && parameters.saslUsername && parameters.saslPassword) { + console.log(` Security Protocol: ${parameters.securityProtocol}`); + console.log(` SASL Username: ${parameters.saslUsername ? parameters.saslUsername : 'not set'}`); + console.log(` SASL Password: ${parameters.saslPassword ? '******' : 'not set'}`); + } else { + console.log(" No security protocol configured"); + } +} (async function () { const producer = process.argv.includes('--producer'); const consumer = process.argv.includes('--consumer'); + const consumerEachMessage = process.argv.includes('--consumer-each-message'); + const consumerEachBatch = process.argv.includes('--consumer-each-batch'); + const produceToSecondTopic = process.argv.includes('--produce-to-second-topic'); const ctp = process.argv.includes('--ctp'); const produceConsumeLatency = process.argv.includes('--latency'); const all = process.argv.includes('--all'); const createTopics = process.argv.includes('--create-topics'); + const monitorLag = process.argv.includes('--monitor-lag'); + let maxAverageRSSKB, maxMaxRSSKB; + const stats = {}; + + let measures = []; + let interval; + const startTrackingMemory = () => { + interval = setInterval(() => { + measures.push({ rss: process.memoryUsage().rss, + timestamp: Date.now() }); + }, 100); + }; + + const datapointToJSON = (m) => + ({ rss: m.rss.toString(), timestamp: m.timestamp.toString() }); + + const endTrackingMemory = (name, fileName) => { + clearInterval(interval); + interval = null; + const averageRSS = measures.reduce((sum, m) => sum + m.rss, 0) / measures.length; + const averageRSSKB = averageRSS / 1024; + maxAverageRSSKB = !maxAverageRSSKB ? averageRSSKB : Math.max(averageRSSKB, maxAverageRSSKB); + console.log(`=== Average ${name} RSS KB: ${averageRSSKB}`); + const max = measures.reduce((prev, current) => (prev.rss > current.rss) ? prev : current); + const maxRSSKB = max.rss / 1024; + maxMaxRSSKB = !maxMaxRSSKB ? maxRSSKB : Math.max(maxRSSKB, maxMaxRSSKB); + console.log(`=== Max ${name} RSS KB: ${maxRSSKB}`); + if (fileName) { + const measuresJSON = JSON.stringify({ + measures: measures.map(datapointToJSON), + averageRSS: averageRSS.toString(), + maxRSS: datapointToJSON(max) + }, null, 2); + fs.writeFileSync(fileName, measuresJSON); + } + measures = []; + } + + console.log(`=== Starting Performance Tests - Mode ${mode} ===`); if (createTopics || all) { console.log("=== Creating Topics (deleting if they exist already):"); - console.log(` Brokers: ${brokers}`); + logParameters(parameters); console.log(` Topic: ${topic}`); console.log(` Topic2: ${topic2}`); - await runCreateTopics(brokers, topic, topic2); + console.log(` Partitions: ${numPartitions}`); + await runCreateTopics(parameters, topic, topic2, numPartitions); + } + + if (monitorLag) { + console.log("=== Starting Lag Monitoring:"); + logParameters(parameters); + console.log(` Topic: ${topic}`); + const { + averageLag, + maxLag, + totalMeasurements + } = await runLagMonitoring(parameters, topic); + const monitoredGroupId = process.env.GROUPID_MONITOR; + console.log(`=== Average broker lag (${monitoredGroupId}): `, averageLag); + console.log(`=== Max broker lag (${monitoredGroupId}): `, maxLag); + console.log(`=== Sample size for broker lag measurement (${monitoredGroupId}): `, totalMeasurements); } if (producer || all) { console.log("=== Running Basic Producer Performance Test:") - console.log(` Brokers: ${brokers}`); + logParameters(parameters); console.log(` Topic: ${topic}`); console.log(` Message Count: ${messageCount}`); console.log(` Message Size: ${messageSize}`); console.log(` Batch Size: ${batchSize}`); console.log(` Compression: ${compression}`); + console.log(` Limit RPS: ${limitRPS}`); console.log(` Warmup Messages: ${warmupMessages}`); - const producerRate = await runProducer(brokers, topic, batchSize, warmupMessages, messageCount, messageSize, compression); + console.log(` Use CKJS Producer Everywhere: ${useCKJSProducerEverywhere}`); + startTrackingMemory(); + let runProducerFunction = runProducer; + if (useCKJSProducerEverywhere) { + runProducerFunction = runProducerCKJS; + } + const producerRate = await runProducerFunction(parameters, topic, batchSize, + warmupMessages, messageCount, messageSize, compression, + randomness, limitRPS); + endTrackingMemory('producer', `producer-memory-${mode}.json`); console.log("=== Producer Rate: ", producerRate); } - if (consumer || all) { + if (consumer || consumerEachMessage || all) { // If user runs this without --producer then they are responsible for seeding the topic. - console.log("=== Running Basic Consumer Performance Test:") - console.log(` Brokers: ${brokers}`); + console.log("=== Running Basic Consumer Performance Test (eachMessage):") + logParameters(parameters); console.log(` Topic: ${topic}`); console.log(` Message Count: ${messageCount}`); - const consumerRate = await runConsumer(brokers, topic, messageCount); - console.log("=== Consumer Rate: ", consumerRate); + console.log(` Partitions consumed concurrently: ${partitionsConsumedConcurrently}`); + startTrackingMemory(); + const consumerRate = await runConsumer(parameters, topic, + warmupMessages, messageCount, + false, partitionsConsumedConcurrently, stats, + produceToSecondTopic ? topic2 : null, compression, useCKJSProducerEverywhere); + endTrackingMemory('consumer-each-message', `consumer-memory-message-${mode}.json`); + console.log("=== Consumer Rate MB/s (eachMessage): ", consumerRate); + console.log("=== Consumer Rate msg/s (eachMessage): ", stats.messageRate); + console.log("=== Consumer average E2E latency T0-T1 (eachMessage): ", stats.avgLatencyT0T1); + console.log("=== Consumer max E2E latency T0-T1 (eachMessage): ", stats.maxLatencyT0T1); + if (produceToSecondTopic) { + console.log("=== Consumer average E2E latency T0-T2 (eachMessage): ", stats.avgLatencyT0T2); + console.log("=== Consumer max E2E latency T0-T2 (eachMessage): ", stats.maxLatencyT0T2); + } + console.log("=== Consumption time (eachMessage): ", stats.durationSeconds); + } + + if (consumer || consumerEachBatch || all) { + // If user runs this without --producer then they are responsible for seeding the topic. + console.log("=== Running Basic Consumer Performance Test (eachBatch):") + logParameters(parameters); + console.log(` Topic: ${topic}`); + console.log(` Message Count: ${messageCount}`); + console.log(` Partitions consumed concurrently: ${partitionsConsumedConcurrently}`); + startTrackingMemory(); + const consumerRate = await runConsumer(parameters, topic, + warmupMessages, messageCount, + true, partitionsConsumedConcurrently, stats, + produceToSecondTopic ? topic2 : null, compression, useCKJSProducerEverywhere); + endTrackingMemory('consumer-each-batch', `consumer-memory-batch-${mode}.json`); + console.log("=== Consumer Rate MB/s (eachBatch): ", consumerRate); + console.log("=== Consumer Rate msg/s (eachBatch): ", stats.messageRate); + console.log("=== Average eachBatch lag: ", stats.averageOffsetLag); + console.log("=== Max eachBatch lag: ", stats.maxOffsetLag); + console.log("=== Average eachBatch size: ", stats.averageBatchSize); + console.log("=== Consumer average E2E latency T0-T1 (eachBatch): ", stats.avgLatencyT0T1); + console.log("=== Consumer max E2E latency T0-T1 (eachBatch): ", stats.maxLatencyT0T1); + if (produceToSecondTopic) { + console.log("=== Consumer average E2E latency T0-T2 (eachBatch): ", stats.avgLatencyT0T2); + console.log("=== Consumer max E2E latency T0-T2 (eachBatch): ", stats.maxLatencyT0T2); + } + console.log("=== Consumption time (eachBatch): ", stats.durationSeconds); } if (ctp || all) { console.log("=== Running Consume-Transform-Produce Performance Test:") - console.log(` Brokers: ${brokers}`); + logParameters(parameters); console.log(` ConsumeTopic: ${topic}`); console.log(` ProduceTopic: ${topic2}`); console.log(` Message Count: ${messageCount}`); // Seed the topic with messages - await runProducer(brokers, topic, batchSize, warmupMessages, messageCount, messageSize, compression); - const ctpRate = await runConsumeTransformProduce(brokers, topic, topic2, warmupMessages, messageCount, messageProcessTimeMs, ctpConcurrency); + await runProducer(parameters, topic, batchSize, warmupMessages, messageCount, messageSize, compression); + startTrackingMemory(); + const ctpRate = await runConsumeTransformProduce(parameters, topic, topic2, warmupMessages, messageCount, messageProcessTimeMs, ctpConcurrency); + endTrackingMemory('consume-transform-produce', `consume-transform-produce-${mode}.json`); console.log("=== Consume-Transform-Produce Rate: ", ctpRate); } if (produceConsumeLatency || all) { console.log("=== Running Produce-To-Consume Latency Performance Test:") - console.log(` Brokers: ${brokers}`); + logParameters(parameters); console.log(` Topic: ${topic}`); console.log(` Message Count: ${messageCount}`); console.log(` Consumer Processing Time: ${consumerProcessingTime}`); console.log(` Producer Processing Time: ${producerProcessingTime}`); - const { mean, p50, p90, p95, outliers } = await runProducerConsumerTogether(brokers, topic, messageCount, messageSize, producerProcessingTime, consumerProcessingTime); + startTrackingMemory(); + const { mean, p50, p90, p95, outliers } = await runProducerConsumerTogether(parameters, topic, messageCount, messageSize, producerProcessingTime, consumerProcessingTime); + endTrackingMemory('producer-consumer-together', `producer-consumer-together-${mode}.json`); console.log(`=== Produce-To-Consume Latency (ms): Mean: ${mean}, P50: ${p50}, P90: ${p90}, P95: ${p95}`); // The presence of outliers invalidates the mean measurement, and rasies concerns as to why there are any. @@ -87,4 +232,9 @@ const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.e console.log("=== Outliers (ms): ", outliers); } } + + if (maxAverageRSSKB !== undefined && maxMaxRSSKB !== undefined) { + console.log(`=== Max Average RSS across tests: `, maxAverageRSSKB); + console.log(`=== Max RSS across tests: `, maxMaxRSSKB); + } })(); \ No newline at end of file diff --git a/examples/performance/performance-primitives-common.js b/examples/performance/performance-primitives-common.js new file mode 100644 index 00000000..1103a1fa --- /dev/null +++ b/examples/performance/performance-primitives-common.js @@ -0,0 +1,431 @@ +const { hrtime } = require('process'); +const { randomBytes } = require('crypto'); + +const TERMINATE_TIMEOUT_MS = process.env.TERMINATE_TIMEOUT_MS ? +process.env.TERMINATE_TIMEOUT_MS : 600000; +const AUTO_COMMIT = process.env.AUTO_COMMIT || 'false'; +const AUTO_COMMIT_ON_BATCH_END = process.env.AUTO_COMMIT_ON_BATCH_END === 'true'; +let autoCommit; +if (AUTO_COMMIT && AUTO_COMMIT === 'false') + autoCommit = null; +else { + autoCommit = Number(AUTO_COMMIT); + if (isNaN(autoCommit)) { + autoCommit = null; + } +} + +function installHandlers() { + const handlers = { + terminationRequested: false, + terminateTimeout: null, + }; + const terminationRequestedCallback = () => { + console.log('Termination requested, waiting for current operation to finish...'); + handlers.terminationRequested = true; + } + process.on('SIGINT', terminationRequestedCallback); + process.on('SIGTERM', terminationRequestedCallback); + handlers.terminationRequestedCallback = terminationRequestedCallback; + handlers.terminateTimeout = setTimeout(terminationRequestedCallback, + TERMINATE_TIMEOUT_MS); + return handlers; +} + +function removeHandlers(handlers) { + process.off('SIGINT', handlers.terminationRequestedCallback); + process.off('SIGTERM', handlers.terminationRequestedCallback); + clearTimeout(handlers.terminateTimeout); +} + +function getAutoCommit() { + return autoCommit; +} + +function genericProduceToTopic(producer, topic, messages) { + return producer.send({ + topic, + messages, + }).catch((err) => { + if (producer.isQueueFullError(err)) { + /* do just send them again */ + return genericProduceToTopic(producer, topic, messages); + } else { + console.error(err); + throw err; + } + }); +} + +async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats, actionOnMessages) { + const handlers = installHandlers(); + await consumer.connect(); + await consumer.subscribe({ topic }); + + let messagesReceived = 0; + let messagesMeasured = 0; + let totalMessageSize = 0; + let totalBatches = 0; + let totalOffsetLag = 0; + let maxOffsetLag = 0; + let durationSeconds = 0; + let startTime; + let rate; + let consumptionStopped = false; + const skippedMessages = warmupMessages; + const decoder = new TextDecoder('utf-8'); + + const updateLatency = (receivedAt, numMessages, message, isT0T2) => { + if (!stats) + return; + + const sentAt = Number(decoder.decode(message.value.slice(0, 13))); + const latency = receivedAt - sentAt; + + if (!isT0T2) { + if (!stats.maxLatencyT0T1) { + stats.maxLatencyT0T1 = latency; + stats.avgLatencyT0T1 = latency; + } else { + stats.maxLatencyT0T1 = Math.max(stats.maxLatencyT0T1, latency); + stats.avgLatencyT0T1 = ((stats.avgLatencyT0T1 * (numMessages - 1)) + latency) / numMessages; + } + } else { + if (!stats.maxLatencyT0T2) { + stats.maxLatencyT0T2 = latency; + stats.avgLatencyT0T2 = latency; + } else { + stats.maxLatencyT0T2 = Math.max(stats.maxLatencyT0T2, latency); + stats.avgLatencyT0T2 = ((stats.avgLatencyT0T2 * (numMessages - 1)) + latency) / numMessages; + } + } + }; + + const shouldTerminate = () => { + return handlers.terminationRequested || + (totalMessageCnt > 0 && messagesMeasured >= totalMessageCnt); + }; + + const stopConsuming = () => { + if (consumptionStopped) + return; + consumptionStopped = true; + let durationNanos = Number(hrtime.bigint() - startTime); + durationSeconds = durationNanos / 1e9; + rate = (totalMessageSize / durationNanos) * 1e9 / (1024 * 1024); /* MB/s */ + console.log(`Recvd ${messagesMeasured} messages in ${durationSeconds} seconds, ${totalMessageSize} bytes; rate is ${rate} MB/s`); + consumer.pause([{ topic }]); + } + + console.log("Starting consumer."); + let consumeMethod = { + partitionsConsumedConcurrently, + eachMessage: async ({ topic, partition, message }) => { + messagesReceived++; + if (messagesReceived >= skippedMessages) { + messagesMeasured++; + totalMessageSize += message.value.length; + updateLatency(Date.now(), messagesMeasured, message, false); + + if (messagesReceived === skippedMessages) { + startTime = hrtime.bigint(); + } else if (totalMessageCnt > 0 && messagesMeasured === totalMessageCnt) { + stopConsuming(); + } + } + + if (actionOnMessages) { + await actionOnMessages([message]); + updateLatency(Date.now(), messagesMeasured, message, true); + } + + if (autoCommit !== null && AUTO_COMMIT_ON_BATCH_END) { + await consumer.commitOffsetsOnBatchEnd([{ + topic, + partition, + offset: (Number(message.offset) + 1).toString(), + }]); + } + } + } + if (eachBatch) { + consumeMethod = { + partitionsConsumedConcurrently, + eachBatch: async ({ batch }) => { + const messagesBeforeBatch = messagesReceived; + const topic = batch.topic; + const partition = batch.partition; + let messagesBase; + let messages; + messagesReceived += batch.messages.length; + if (messagesReceived >= skippedMessages) { + const offsetLag = batch.offsetLag(); + totalOffsetLag += Number(offsetLag); + maxOffsetLag = Math.max(offsetLag, maxOffsetLag); + totalBatches++; + messagesMeasured = messagesReceived - skippedMessages; + messages = batch.messages; + if (messagesBeforeBatch < skippedMessages) { + messages = messages.slice(messages.length - messagesMeasured); + } + const now = Date.now(); + messagesBase = messagesMeasured - messages.length; + let i = 1; + for (const message of messages) { + totalMessageSize += message.value.length; + updateLatency(now, messagesBase + i, message, false); + i++; + } + + if (!startTime) { + startTime = hrtime.bigint(); + } else if (totalMessageCnt > 0 && messagesMeasured >= totalMessageCnt) { + let durationNanos = Number(hrtime.bigint() - startTime); + durationSeconds = durationNanos / 1e9; + rate = durationNanos === 0 ? Infinity : + (totalMessageSize / durationNanos) * 1e9 / (1024 * 1024); /* MB/s */ + console.log(`Recvd ${messagesMeasured} messages in ${durationSeconds} seconds, ${totalMessageSize} bytes; rate is ${rate} MB/s`); + consumer.pause([{ topic }]); + } + } + + if (actionOnMessages) { + await actionOnMessages(batch.messages); + if (messagesMeasured > 0) { + let i = 1; + const now = Date.now(); + for (const message of messages) { + totalMessageSize += message.value.length; + updateLatency(now, messagesBase + i, message, true); + i++; + } + } + } + + if (autoCommit !== null && AUTO_COMMIT_ON_BATCH_END) { + await consumer.commitOffsetsOnBatchEnd([{ + topic, + partition, + offset: (Number(batch.lastOffset()) + 1).toString(), + }]); + } + } + }; + } + + consumer.run({ + ...consumeMethod + }); + + await new Promise((resolve) => { + let interval = setInterval(() => { + if (shouldTerminate()) { + stopConsuming(); + clearInterval(interval); + resolve(); + } + }, 1000); + }); + + await consumer.disconnect(); + + if (stats) { + if (eachBatch) { + stats.averageOffsetLag = totalBatches > 0 ? (totalOffsetLag / totalBatches) : 0; + stats.maxOffsetLag = maxOffsetLag; + stats.averageBatchSize = totalBatches > 0 ? (messagesMeasured / totalBatches) : 0; + } + stats.messageRate = durationSeconds > 0 ? + (messagesMeasured / durationSeconds) : Infinity; + stats.durationSeconds = durationSeconds; + } + removeHandlers(handlers); + return rate; +} + +async function runProducer(producer, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression, randomness, limitRPS) { + const handlers = installHandlers(); + let totalMessagesSent = 0; + let totalBytesSent = 0; + + const encoder = new TextEncoder(); + let staticValueLength = Math.floor(msgSize * (1 - randomness)); + if (staticValueLength < 13) + staticValueLength = 13; + let staticValueRemainder = staticValueLength - 13; + if (staticValueRemainder > 0) { + staticValueRemainder = randomBytes(staticValueRemainder); + } else { + staticValueRemainder = Buffer.alloc(0); + } + + let messageCnt = totalMessageCnt; + if (messageCnt === -1) { + messageCnt = batchSize * 10000; + } + const messages = Array(messageCnt); + for (let i = 0; i < messageCnt; i++) { + /* Generate a different random value for each message */ + messages[i] = { + value: Buffer.concat([staticValueRemainder, randomBytes(msgSize - staticValueLength)]), + }; + } + + await producer.connect(); + + console.log('Sending ' + warmupMessages + ' warmup messages.'); + while (warmupMessages > 0) { + await producer.send({ + topic, + messages: messages.slice(0, batchSize) + }, compression); + warmupMessages -= batchSize; + } + console.log('Sent warmup messages'); + + // Now that warmup is done, start measuring... + let startTime; + let promises = []; + startTime = hrtime(); + let messagesDispatched = 0; + + // The double while-loop allows us to send a bunch of messages and then + // await them all at once. We need the second while loop to keep sending + // in case of queue full errors, which surface only on awaiting. + while (totalMessageCnt == -1 || messagesDispatched < totalMessageCnt) { + const startTimeBatch = hrtime.bigint(); + const maxToAwait = limitRPS ? limitRPS : 10000; + let messagesNotAwaited = 0; + while (totalMessageCnt == -1 || messagesDispatched < totalMessageCnt) { + const modifiedMessages = []; + const batchStart = messagesDispatched % messageCnt; + for (const msg of messages.slice(batchStart, batchStart + batchSize)) { + modifiedMessages.push({ + value: Buffer.concat([encoder.encode(Date.now().toString()), msg.value]) + }); + } + promises.push(producer.send({ + topic, + messages: modifiedMessages, + }, compression).then(() => { + totalMessagesSent += batchSize; + totalBytesSent += batchSize * msgSize; + }).catch((err) => { + if (producer.isQueueFullError(err)) { + /* do nothing, just send them again */ + messagesDispatched -= batchSize; + totalMessagesSent -= batchSize; + totalBytesSent -= batchSize * msgSize; + } else { + console.error(err); + throw err; + } + })); + messagesDispatched += batchSize; + messagesNotAwaited += batchSize; + if (handlers.terminationRequested || messagesNotAwaited >= maxToAwait) + break; + } + await Promise.all(promises); + promises = []; + if (handlers.terminationRequested) + break; + + const now = hrtime.bigint(); + const elapsedBatch = now - startTimeBatch; + if (limitRPS && elapsedBatch < 1000000000n) { + await new Promise(resolve => setTimeout(resolve, Number(1000000000n - elapsedBatch) / 1e6)); + } + } + let elapsed = hrtime(startTime); + let durationNanos = elapsed[0] * 1e9 + elapsed[1]; + let rate = (totalBytesSent / durationNanos) * 1e9 / (1024 * 1024); /* MB/s */ + console.log(`Sent ${totalMessagesSent} messages, ${totalBytesSent} bytes; rate is ${rate} MB/s`); + + await producer.disconnect(); + removeHandlers(handlers); + return rate; +} + +async function runLagMonitoring(admin, topic) { + const handlers = installHandlers(); + let groupId = process.env.GROUPID_MONITOR; + if (!groupId) { + throw new Error("GROUPID_MONITOR environment variable not set"); + } + + await admin.connect(); + + const fetchTotalLag = async () => { + const partitionCompleteLag = {}; + const partitionHWM = {}; + const partitionLag = {}; + let totalLag = 0n; + const operations = [ + admin.fetchTopicOffsets(topic, { timeout: 30000 }), + admin.fetchOffsets({ groupId, topics: [topic], timeout: 30000 }), + ] + let [topicOffsets, groupOffsets] = await Promise.all(operations); + groupOffsets = groupOffsets[0]; + + for (const partitionOffset of topicOffsets) { + partitionHWM[partitionOffset.partition] = BigInt(partitionOffset.high); + partitionCompleteLag[partitionOffset.partition] = BigInt(partitionOffset.high) - BigInt(partitionOffset.low); + } + + if (groupOffsets && groupOffsets.partitions) { + for (const partitionOffset of groupOffsets.partitions) { + const partition = partitionOffset.partition; + const hwm = partitionHWM[partition]; + if (hwm && partitionOffset.offset && hwm >= BigInt(partitionOffset.offset)) { + const currentLag = hwm - BigInt(partitionOffset.offset); + partitionLag[partition] = currentLag; + totalLag += currentLag; + } + } + } else { + throw new Error(`No offsets found for group ${groupId} on topic ${topic}`); + } + for (const partition of Object.keys(partitionHWM)) { + if (partitionLag[partition] === undefined) { + const currentLag = partitionCompleteLag[partition]; + partitionLag[partition] = currentLag; + totalLag += currentLag; + } + } + return totalLag; + } + + let totalAverageLag = 0n; + let maxLag = 0n; + let totalMeasurements = 0; + + while (!handlers.terminationRequested) { + try { + const lag = await fetchTotalLag(); + totalAverageLag += lag; + maxLag = lag > maxLag ? lag : maxLag; + totalMeasurements++; + } catch (e) { + console.error(`Error fetching lag: ${e}`); + } + await new Promise(resolve => setTimeout(resolve, 100)); + } + const averageLag = totalMeasurements > 0 ? (Number(totalAverageLag) / totalMeasurements) : NaN; + maxLag = Number(maxLag); + + await admin.disconnect(); + removeHandlers(handlers); + return { + averageLag, + maxLag, + totalMeasurements + }; +} + +module.exports = { + runConsumer, + runProducer, + runLagMonitoring, + genericProduceToTopic, + getAutoCommit, +}; \ No newline at end of file diff --git a/examples/performance/performance-primitives-kafkajs.js b/examples/performance/performance-primitives-kafkajs.js index 1dfc530e..1b302297 100644 --- a/examples/performance/performance-primitives-kafkajs.js +++ b/examples/performance/performance-primitives-kafkajs.js @@ -1,6 +1,16 @@ const { Kafka, CompressionTypes } = require('kafkajs'); const { randomBytes } = require('crypto'); const { hrtime } = require('process'); +const { + runConsumer: runConsumerCommon, + runProducer: runProducerCommon, + genericProduceToTopic, + getAutoCommit, + } = require('./performance-primitives-common'); + +const { + newCompatibleProducer: newCompatibleProducerCKJS, +} = require('./performance-primitives'); module.exports = { runProducer, @@ -10,21 +20,39 @@ module.exports = { runProducerConsumerTogether, }; -async function runCreateTopics(brokers, topic, topic2) { - const kafka = new Kafka({ +function baseConfiguration(parameters) { + let ret = { clientId: 'kafka-test-performance', - brokers: brokers.split(','), - }); + brokers: parameters.brokers.split(','), + }; + if (parameters.securityProtocol && + parameters.saslUsername && + parameters.saslPassword) { + ret = { + ...ret, + ssl: parameters.securityProtocol.toLowerCase().includes('ssl'), + sasl: { + mechanism: 'plain', + username: parameters.saslUsername, + password: parameters.saslPassword + } + }; + } + return ret; +} + +async function runCreateTopics(parameters, topic, topic2, numPartitions) { + const kafka = new Kafka(baseConfiguration(parameters)); const admin = kafka.admin(); await admin.connect(); for (let t of [topic, topic2]) { let topicCreated = await admin.createTopics({ - topics: [{ topic: t, numPartitions: 3 }], + topics: [{ topic: t, numPartitions }], }).catch(console.error); if (topicCreated) { - console.log(`Created topic ${t}`); + console.log(`Created topic ${t} with ${numPartitions} partitions`); continue; } @@ -33,142 +61,125 @@ async function runCreateTopics(brokers, topic, topic2) { await new Promise(resolve => setTimeout(resolve, 1000)); /* Propagate. */ await admin.createTopics({ topics: [ - { topic: t, numPartitions: 3 }, + { topic: t, numPartitions }, ], }).catch(console.error); - console.log(`Created topic ${t}`); + console.log(`Created topic ${t} with ${numPartitions} partitions`); } await admin.disconnect(); } -async function runProducer(brokers, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression) { - let totalMessagesSent = 0; - let totalBytesSent = 0; - - const message = { - value: randomBytes(msgSize), +class CompatibleProducer { + constructor(producer) { + this.producer = producer; } - const messages = Array(batchSize).fill(message); + async connect() { + return this.producer.connect(); + } - const kafka = new Kafka({ - clientId: 'kafka-test-performance', - brokers: brokers.split(','), - }); + async disconnect() { + return this.producer.disconnect(); + } - const producer = kafka.producer(); - await producer.connect(); + isQueueFullError(err) { + return false; + } - console.log('Sending ' + warmupMessages + ' warmup messages.'); - while (warmupMessages > 0) { - await producer.send({ - topic, - messages, + send(opts, compression) { + return this.producer.send({ + ...opts, compression: CompressionTypes[compression], }); - warmupMessages -= batchSize; - } - console.log('Sent warmup messages'); - - // Now that warmup is done, start measuring... - let startTime; - let promises = []; - startTime = hrtime(); - let messagesDispatched = 0; - - // The double while-loop allows us to send a bunch of messages and then - // await them all at once. We need the second while loop to keep sending - // in case of queue full errors, which surface only on awaiting. - while (totalMessageCnt == -1 || messagesDispatched < totalMessageCnt) { - while (totalMessageCnt == -1 || messagesDispatched < totalMessageCnt) { - promises.push(producer.send({ - topic, - messages, - compression: CompressionTypes[compression], - }).then(() => { - totalMessagesSent += batchSize; - totalBytesSent += batchSize * msgSize; - }).catch((err) => { - console.error(err); - throw err; - })); - messagesDispatched += batchSize; - } - await Promise.all(promises); } - let elapsed = hrtime(startTime); - let durationNanos = elapsed[0] * 1e9 + elapsed[1]; - let rate = (totalBytesSent / durationNanos) * 1e9 / (1024 * 1024); /* MB/s */ - console.log(`Sent ${totalMessagesSent} messages, ${totalBytesSent} bytes; rate is ${rate} MB/s`); +} +function newCompatibleProducer(parameters, compression) { + return new CompatibleProducer( + new Kafka(baseConfiguration(parameters) + ).producer()); +} - await producer.disconnect(); - return rate; +async function runProducer(parameters, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression, randomness, limitRPS) { + return runProducerCommon(newCompatibleProducer(parameters, compression), topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression, randomness, limitRPS); } -async function runConsumer(brokers, topic, totalMessageCnt) { - const kafka = new Kafka({ - clientId: 'kafka-test-performance', - brokers: brokers.split(','), - }); +class CompatibleConsumer { + constructor(consumer) { + this.consumer = consumer; + } - const consumer = kafka.consumer({ - groupId: 'test-group' + Math.random(), - }); - await consumer.connect(); - await consumer.subscribe({ topic, fromBeginning: true }); + async connect() { + return this.consumer.connect(); + } - let messagesReceived = 0; - let messagesMeasured = 0; - let totalMessageSize = 0; - let startTime; - let rate; - const skippedMessages = 100; + async disconnect() { + return this.consumer.disconnect(); + } - console.log("Starting consumer."); + async subscribe(opts) { + return this.consumer.subscribe({ + ...opts, + fromBeginning: true + }); + } - consumer.run({ - autoCommit: false, - eachMessage: async ({ topic, partition, message }) => { - messagesReceived++; + pause(topics) { + return this.consumer.pause(topics); + } - if (messagesReceived >= skippedMessages) { - messagesMeasured++; - totalMessageSize += message.value.length; + commitOffsetsOnBatchEnd(offsets) { + // do nothing, done by KafkaJS after each* if autoCommit is enabled + } - if (messagesReceived === skippedMessages) { - startTime = hrtime(); - } else if (messagesMeasured === totalMessageCnt) { - let elapsed = hrtime(startTime); - let durationNanos = elapsed[0] * 1e9 + elapsed[1]; - rate = (totalMessageSize / durationNanos) * 1e9 / (1024 * 1024); /* MB/s */ - console.log(`Recvd ${messagesMeasured} messages, ${totalMessageSize} bytes; rate is ${rate} MB/s`); - consumer.pause([{ topic }]); - } - } - } - }); + run(opts) { + const autoCommit = getAutoCommit(); + const autoCommitOpts = autoCommit > 0 ? + { autoCommit: true, autoCommitInterval: autoCommit } : + { autoCommit: false }; + return this.consumer.run({ + ...opts, + ...autoCommitOpts, + }); + } +} - totalMessageSize = 0; - await new Promise((resolve) => { - let interval = setInterval(() => { - if (messagesReceived >= totalMessageCnt) { - clearInterval(interval); - resolve(); - } - }, 1000); +function newCompatibleConsumer(parameters, eachBatch) { + const kafka = new Kafka(baseConfiguration(parameters)); + + let groupId = eachBatch ? process.env.GROUPID_BATCH : process.env.GROUPID_MESSAGE; + if (!groupId) { + groupId = 'test-group' + Math.random(); + } + console.log(`New KafkaJS group id: ${groupId}`); + const consumer = kafka.consumer({ + groupId, }); + return new CompatibleConsumer(consumer); +} - await consumer.disconnect(); - return rate; + +async function runConsumer(parameters, topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats, produceToTopic, produceCompression, useCKJSProducerEverywhere) { + let actionOnMessages = null; + let producer; + if (produceToTopic) { + const newCompatibleProducerFunction = useCKJSProducerEverywhere ? + newCompatibleProducerCKJS : newCompatibleProducer; + producer = newCompatibleProducerFunction(parameters, produceCompression); + await producer.connect(); + actionOnMessages = (messages) => + genericProduceToTopic(producer, produceToTopic, messages); + } + const ret = await runConsumerCommon(newCompatibleConsumer(parameters, eachBatch), topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats, actionOnMessages); + if (producer) { + await producer.disconnect(); + } + return ret; } -async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, warmupMessages, totalMessageCnt, messageProcessTimeMs, ctpConcurrency) { - const kafka = new Kafka({ - clientId: 'kafka-test-performance', - brokers: brokers.split(','), - }); +async function runConsumeTransformProduce(parameters, consumeTopic, produceTopic, warmupMessages, totalMessageCnt, messageProcessTimeMs, ctpConcurrency) { + const kafka = new Kafka(baseConfiguration(parameters)); const producer = kafka.producer({}); await producer.connect(); @@ -213,8 +224,9 @@ async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, w if (messagesMeasured === totalMessageCnt) { let elapsed = hrtime(startTime); let durationNanos = elapsed[0] * 1e9 + elapsed[1]; + let durationSeconds = durationNanos / 1e9; rate = (totalMessageSize / durationNanos) * 1e9 / (1024 * 1024); /* MB/s */ - console.log(`Recvd, transformed and sent ${messagesMeasured} messages, ${totalMessageSize} bytes; rate is ${rate} MB/s`); + console.log(`Recvd, transformed and sent ${messagesMeasured} messages in ${durationSeconds}, ${totalMessageSize} bytes; rate is ${rate} MB/s`); consumer.pause([{ topic }]); } } else { @@ -242,11 +254,8 @@ async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, w } -async function runProducerConsumerTogether(brokers, topic, totalMessageCnt, msgSize, produceMessageProcessTimeMs, consumeMessageProcessTimeMs) { - const kafka = new Kafka({ - clientId: 'kafka-test-performance', - brokers: brokers.split(','), - }); +async function runProducerConsumerTogether(parameters, topic, totalMessageCnt, msgSize, produceMessageProcessTimeMs, consumeMessageProcessTimeMs) { + const kafka = new Kafka(baseConfiguration(parameters)); const producer = kafka.producer({}); await producer.connect(); diff --git a/examples/performance/performance-primitives.js b/examples/performance/performance-primitives.js index 8dce6b54..84d56383 100644 --- a/examples/performance/performance-primitives.js +++ b/examples/performance/performance-primitives.js @@ -1,6 +1,13 @@ const { Kafka, ErrorCodes, CompressionTypes } = require('../../').KafkaJS; const { randomBytes } = require('crypto'); const { hrtime } = require('process'); +const { + runConsumer: runConsumerCommon, + runProducer: runProducerCommon, + runLagMonitoring: runLagMonitoringCommon, + genericProduceToTopic, + getAutoCommit, + } = require('./performance-primitives-common'); module.exports = { runProducer, @@ -8,23 +15,41 @@ module.exports = { runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether, + runLagMonitoring, + newCompatibleProducer, }; -async function runCreateTopics(brokers, topic, topic2) { - const kafka = new Kafka({ +function baseConfiguration(parameters) { + let ret = { 'client.id': 'kafka-test-performance', - "metadata.broker.list": brokers, - }); + 'metadata.broker.list': parameters.brokers, + }; + if (parameters.securityProtocol && + parameters.saslUsername && + parameters.saslPassword) { + ret = { + ...ret, + 'security.protocol': parameters.securityProtocol, + 'sasl.mechanism': 'PLAIN', + 'sasl.username': parameters.saslUsername, + 'sasl.password': parameters.saslPassword, + }; + } + return ret; +} + +async function runCreateTopics(parameters, topic, topic2, numPartitions) { + const kafka = new Kafka(baseConfiguration(parameters)); const admin = kafka.admin(); await admin.connect(); for (let t of [topic, topic2]) { let topicCreated = await admin.createTopics({ - topics: [{ topic: t, numPartitions: 3 }], + topics: [{ topic: t, numPartitions }], }).catch(console.error); if (topicCreated) { - console.log(`Created topic ${t}`); + console.log(`Created topic ${t} with ${numPartitions} partitions`); continue; } @@ -33,148 +58,128 @@ async function runCreateTopics(brokers, topic, topic2) { await new Promise(resolve => setTimeout(resolve, 1000)); /* Propagate. */ await admin.createTopics({ topics: [ - { topic: t, numPartitions: 3 }, + { topic: t, numPartitions }, ], }).catch(console.error); - console.log(`Created topic ${t}`); + console.log(`Created topic ${t} with ${numPartitions} partitions`); await new Promise(resolve => setTimeout(resolve, 1000)); /* Propagate. */ } await admin.disconnect(); } -async function runProducer(brokers, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression) { - let totalMessagesSent = 0; - let totalBytesSent = 0; +function runLagMonitoring(parameters, topic) { + const kafka = new Kafka(baseConfiguration(parameters)); + const admin = kafka.admin(); - const message = { - value: randomBytes(msgSize), - } + return runLagMonitoringCommon(admin, topic); +} - const messages = Array(batchSize).fill(message); +class CompatibleProducer { + constructor(producer) { + this.producer = producer; + } - const kafka = new Kafka({ - 'client.id': 'kafka-test-performance', - 'metadata.broker.list': brokers, - 'compression.codec': CompressionTypes[compression], - }); + async connect() { + return this.producer.connect(); + } - const producer = kafka.producer(); - await producer.connect(); + async disconnect() { + return this.producer.disconnect(); + } - console.log('Sending ' + warmupMessages + ' warmup messages.'); - while (warmupMessages > 0) { - await producer.send({ - topic, - messages, - }); - warmupMessages -= batchSize; + isQueueFullError(err) { + return err.code === ErrorCodes.ERR__QUEUE_FULL; } - console.log('Sent warmup messages'); - // Now that warmup is done, start measuring... - let startTime; - let promises = []; - startTime = hrtime(); - let messagesDispatched = 0; - - // The double while-loop allows us to send a bunch of messages and then - // await them all at once. We need the second while loop to keep sending - // in case of queue full errors, which surface only on awaiting. - while (totalMessageCnt == -1 || messagesDispatched < totalMessageCnt) { - while (totalMessageCnt == -1 || messagesDispatched < totalMessageCnt) { - promises.push(producer.send({ - topic, - messages, - }).then(() => { - totalMessagesSent += batchSize; - totalBytesSent += batchSize * msgSize; - }).catch((err) => { - if (err.code === ErrorCodes.ERR__QUEUE_FULL) { - /* do nothing, just send them again */ - messagesDispatched -= batchSize; - } else { - console.error(err); - throw err; - } - })); - messagesDispatched += batchSize; - } - await Promise.all(promises); + send(opts) { + return this.producer.send(opts); } - let elapsed = hrtime(startTime); - let durationNanos = elapsed[0] * 1e9 + elapsed[1]; - let rate = (totalBytesSent / durationNanos) * 1e9 / (1024 * 1024); /* MB/s */ - console.log(`Sent ${totalMessagesSent} messages, ${totalBytesSent} bytes; rate is ${rate} MB/s`); +} +function newCompatibleProducer(parameters, compression) { + return new CompatibleProducer( + new Kafka({ + ...baseConfiguration(parameters), + 'compression.codec': CompressionTypes[compression], + }).producer()); +} - await producer.disconnect(); - return rate; +async function runProducer(parameters, topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression, randomness, limitRPS) { + return runProducerCommon(newCompatibleProducer(parameters, compression), topic, batchSize, warmupMessages, totalMessageCnt, msgSize, compression, randomness, limitRPS); } -async function runConsumer(brokers, topic, totalMessageCnt) { - const kafka = new Kafka({ - 'client.id': 'kafka-test-performance', - 'metadata.broker.list': brokers, - }); +class CompatibleConsumer { + constructor(consumer) { + this.consumer = consumer; + } - const consumer = kafka.consumer({ - 'group.id': 'test-group' + Math.random(), - 'enable.auto.commit': false, - 'auto.offset.reset': 'earliest', - 'fetch.queue.backoff.ms': '100', - }); - await consumer.connect(); - await consumer.subscribe({ topic }); + async connect() { + return this.consumer.connect(); + } - let messagesReceived = 0; - let messagesMeasured = 0; - let totalMessageSize = 0; - let startTime; - let rate; - const skippedMessages = 100; + async disconnect() { + return this.consumer.disconnect(); + } - console.log("Starting consumer."); + async subscribe(opts) { + return this.consumer.subscribe(opts); + } - consumer.run({ - eachMessage: async ({ topic, partition, message }) => { - messagesReceived++; + pause(topics) { + return this.consumer.pause(topics); + } - if (messagesReceived >= skippedMessages) { - messagesMeasured++; - totalMessageSize += message.value.length; + commitOffsetsOnBatchEnd(offsets) { + return this.consumer.commitOffsets(offsets); + } - if (messagesReceived === skippedMessages) { - startTime = hrtime(); - } else if (messagesMeasured === totalMessageCnt) { - let elapsed = hrtime(startTime); - let durationNanos = elapsed[0] * 1e9 + elapsed[1]; - rate = (totalMessageSize / durationNanos) * 1e9 / (1024 * 1024); /* MB/s */ - console.log(`Recvd ${messagesMeasured} messages, ${totalMessageSize} bytes; rate is ${rate} MB/s`); - consumer.pause([{ topic }]); - } - } - } - }); + run(opts) { + return this.consumer.run({ + ...opts + }); + } +} - totalMessageSize = 0; - await new Promise((resolve) => { - let interval = setInterval(() => { - if (messagesMeasured >= totalMessageCnt) { - clearInterval(interval); - resolve(); - } - }, 1000); +function newCompatibleConsumer(parameters, eachBatch) { + const kafka = new Kafka(baseConfiguration(parameters)); + const autoCommit = getAutoCommit(); + const autoCommitOpts = autoCommit > 0 ? + { 'enable.auto.commit': true, 'auto.commit.interval.ms': autoCommit } : + { 'enable.auto.commit': false }; + + let groupId = eachBatch ? process.env.GROUPID_BATCH : process.env.GROUPID_MESSAGE; + if (!groupId) { + groupId = 'test-group' + Math.random(); + } + console.log(`New Confluent group id: ${groupId}`); + const consumer = kafka.consumer({ + 'group.id': groupId, + 'auto.offset.reset': 'earliest', + 'fetch.queue.backoff.ms': '100', + ...autoCommitOpts, }); + return new CompatibleConsumer(consumer); +} - await consumer.disconnect(); - return rate; + +async function runConsumer(parameters, topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats, produceToTopic, produceCompression, useCKJSProducerEverywhere) { + let actionOnMessages = null; + let producer; + if (produceToTopic) { + producer = newCompatibleProducer(parameters, produceCompression); + await producer.connect(); + actionOnMessages = (messages) => + genericProduceToTopic(producer, produceToTopic, messages); + } + const ret = await runConsumerCommon(newCompatibleConsumer(parameters, eachBatch), topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats, actionOnMessages); + if (producer) { + await producer.disconnect(); + } + return ret; } -async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, warmupMessages, totalMessageCnt, messageProcessTimeMs, ctpConcurrency) { - const kafka = new Kafka({ - 'client.id': 'kafka-test-performance', - 'metadata.broker.list': brokers, - }); +async function runConsumeTransformProduce(parameters, consumeTopic, produceTopic, warmupMessages, totalMessageCnt, messageProcessTimeMs, ctpConcurrency) { + const kafka = new Kafka(baseConfiguration(parameters)); const producer = kafka.producer({ /* We want things to be flushed immediately as we'll be awaiting this. */ @@ -260,11 +265,8 @@ async function runConsumeTransformProduce(brokers, consumeTopic, produceTopic, w return rate; } -async function runProducerConsumerTogether(brokers, topic, totalMessageCnt, msgSize, produceMessageProcessTimeMs, consumeMessageProcessTimeMs) { - const kafka = new Kafka({ - 'client.id': 'kafka-test-performance', - 'metadata.broker.list': brokers, - }); +async function runProducerConsumerTogether(parameters, topic, totalMessageCnt, msgSize, produceMessageProcessTimeMs, consumeMessageProcessTimeMs) { + const kafka = new Kafka(baseConfiguration(parameters)); const producer = kafka.producer({ /* We want things to be flushed immediately as we'll be awaiting this. */