From a5b56f12ff9894e65966ff20794b8e060bab85da Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Fri, 3 Jun 2016 11:35:05 -0700 Subject: [PATCH] KAFKA-3111: Fix ConsumerPerformance reporting to use time-based instead of message-based intervals --- .../kafka/tools/ConsumerPerformance.scala | 49 ++++++++++--------- .../main/scala/kafka/tools/PerfConfig.scala | 4 +- 2 files changed, 29 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 6480ff5ae4dd..9680ffadc63f 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -58,7 +58,7 @@ object ConsumerPerformance { } var startMs, endMs = 0L - if(config.useNewConsumer) { + if (config.useNewConsumer) { val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props) consumer.subscribe(List(config.topic)) startMs = System.currentTimeMillis @@ -95,7 +95,7 @@ object ConsumerPerformance { totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, totalMessagesRead.get / elapsedSecs)) } } - + def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], topics: List[String], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) { var bytesRead = 0L var messagesRead = 0L @@ -125,32 +125,34 @@ object ConsumerPerformance { val startMs = System.currentTimeMillis var lastReportTime: Long = startMs var lastConsumedTime = System.currentTimeMillis - - while(messagesRead < count && System.currentTimeMillis() - lastConsumedTime <= timeout) { + var currentTimeMillis = lastConsumedTime + + while (messagesRead < count && currentTimeMillis - lastConsumedTime <= timeout) { val records = consumer.poll(100) - if(records.count() > 0) - lastConsumedTime = System.currentTimeMillis - for(record <- records) { + currentTimeMillis = System.currentTimeMillis + if (records.count() > 0) + lastConsumedTime = currentTimeMillis + for (record <- records) { messagesRead += 1 - if(record.key != null) + if (record.key != null) bytesRead += record.key.size - if(record.value != null) - bytesRead += record.value.size - - if (messagesRead % config.reportingInterval == 0) { + if (record.value != null) + bytesRead += record.value.size + + if (currentTimeMillis - lastReportTime >= config.reportingInterval) { if (config.showDetailedStats) - printProgressMessage(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis, config.dateFormat) - lastReportTime = System.currentTimeMillis + printProgressMessage(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, currentTimeMillis, config.dateFormat) + lastReportTime = currentTimeMillis lastMessagesRead = messagesRead lastBytesRead = bytesRead } } } - + totalMessagesRead.set(messagesRead) totalBytesRead.set(bytesRead) } - + def printProgressMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long, startMs: Long, endMs: Long, dateFormat: SimpleDateFormat) = { val elapsedMs: Double = endMs - startMs @@ -210,14 +212,14 @@ object ConsumerPerformance { val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt) - + val useNewConsumer = options.has(useNewConsumerOpt) - + val props = if (options.has(consumerConfigOpt)) Utils.loadProps(options.valueOf(consumerConfigOpt)) else new Properties - if(useNewConsumer) { + if (useNewConsumer) { import org.apache.kafka.clients.consumer.ConsumerConfig props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServersOpt)) props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt)) @@ -241,6 +243,8 @@ object ConsumerPerformance { val topic = options.valueOf(topicOpt) val numMessages = options.valueOf(numMessagesOpt).longValue val reportingInterval = options.valueOf(reportingIntervalOpt).intValue + if (reportingInterval <= 0) + throw new IllegalArgumentException("Reporting interval must be greater than 0.") val showDetailedStats = options.has(showDetailedStatsOpt) val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) val hideHeader = options.has(hideHeaderOpt) @@ -264,11 +268,12 @@ object ConsumerPerformance { val messageAndMetadata = iter.next messagesRead += 1 bytesRead += messageAndMetadata.message.length + val currentTimeMillis = System.currentTimeMillis - if (messagesRead % config.reportingInterval == 0) { + if (currentTimeMillis - lastReportTime >= config.reportingInterval) { if (config.showDetailedStats) - printProgressMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis, config.dateFormat) - lastReportTime = System.currentTimeMillis + printProgressMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, currentTimeMillis, config.dateFormat) + lastReportTime = currentTimeMillis lastMessagesRead = messagesRead lastBytesRead = bytesRead } diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala index 298bb29fe797..26704c277a58 100644 --- a/core/src/main/scala/kafka/tools/PerfConfig.scala +++ b/core/src/main/scala/kafka/tools/PerfConfig.scala @@ -26,9 +26,9 @@ class PerfConfig(args: Array[String]) { .withRequiredArg .describedAs("count") .ofType(classOf[java.lang.Long]) - val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.") + val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in milliseconds at which to print progress info.") .withRequiredArg - .describedAs("size") + .describedAs("interval_ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(5000) val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " +