From 71b11fac33a67ef9f0ac8ac09bcbb5305a56047f Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Fri, 21 Aug 2015 17:44:48 +0100 Subject: [PATCH 01/12] KAFKA-2453: migrated EndToEndLatencyTest to new consumer API. Added feature for configuring message size. Added inline assertion. --- .../scala/kafka/tools/EndToEndLatency.scala | 107 ++++++++++++------ 1 file changed, 73 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala index 7bb69b71e9b6..2a53a1dcfb3e 100755 --- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,37 +19,48 @@ package kafka.tools import java.util.{Arrays, Properties} -import kafka.consumer._ +import org.apache.kafka.clients.consumer.{CommitType, ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} -import scala.Option.option2Iterable + +/** + * This class records the average end to end latency for a single message to travel through Kafka + * + * broker_list = location of the bootstrap broker for both the producer and the consumer + * num_messages = # messages to send + * producer_acks = See ProducerConfig.ACKS_DOC + * message_size_bytes = size of each message in bytes + * use_busy_wait = if False the client will sleep for 1ms (ConsumerConfig.RETRY_BACKOFF_MS_CONFIG) between requests. This limits the lower granularity. Setting to true provides finer grained results (useful when running locally). + * + * e.g. [localhost:9092 test 10000 1 20 true] + */ object EndToEndLatency { def main(args: Array[String]) { if (args.length != 6) { - System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks") + System.err.println("USAGE: java " + getClass.getName + " broker_list topic num_messages producer_acks message_size_bytes use_busy_wait") System.exit(1) } val brokerList = args(0) - val zkConnect = args(1) - val topic = args(2) - val numMessages = args(3).toInt - val consumerFetchMaxWait = args(4).toInt - val producerAcks = args(5).toInt + val topic = args(1) + val numMessages = args(2).toInt + val producerAcks = args(3).toInt + val messageLen = if (args.length > 4) args(4).toInt else 20 + val busyWait = args(5).toBoolean val consumerProps = new Properties() - consumerProps.put("group.id", topic) - consumerProps.put("auto.commit.enable", "false") - consumerProps.put("auto.offset.reset", "largest") - consumerProps.put("zookeeper.connect", zkConnect) - consumerProps.put("fetch.wait.max.ms", consumerFetchMaxWait.toString) - consumerProps.put("socket.timeout.ms", 1201000.toString) - - val config = new ConsumerConfig(consumerProps) - val connector = Consumer.create(config) - val stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head - val iter = stream.iterator + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, topic) + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") + consumerProps.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, "1") + consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") + + val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps) + consumer.subscribe(topic) val producerProps = new Properties() producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) @@ -58,35 +69,63 @@ object EndToEndLatency { producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString) producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) + val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps) + + def finalise() { + consumer.commit(CommitType.SYNC) + producer.close() + consumer.close() + } - // make sure the consumer fetcher has started before sending data since otherwise - // the consumption from the tail will skip the first message and hence be blocked - Thread.sleep(5000) + //Ensure we are at latest offset + var recordIter = consumer.poll(0).iterator + Thread.sleep(2000) + consumer.seekToEnd() - val message = "hello there beautiful".getBytes var totalTime = 0.0 val latencies = new Array[Long](numMessages) + for (i <- 0 until numMessages) { + val message = arrayOfLen(messageLen) val begin = System.nanoTime - producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message)) - val received = iter.next + + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, message)) + + if (busyWait) + while (!recordIter.hasNext) + recordIter = consumer.poll(0).iterator + else + recordIter = consumer.poll(Long.MaxValue).iterator + val elapsed = System.nanoTime - begin - // poor man's progress bar + + //Check result matches original record + val sent = new String(message) + val read = new String(recordIter.next().value()) + if (!read.equals(sent)) { + finalise() + throw new RuntimeException(s"The message read [$read] did not match the message sent [$sent]") + } + + //Report progress if (i % 1000 == 0) println(i + "\t" + elapsed / 1000.0 / 1000.0) totalTime += elapsed - latencies(i) = (elapsed / 1000 / 1000) + latencies(i) = elapsed / 1000 / 1000 } + + //Results println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0)) Arrays.sort(latencies) val p50 = latencies((latencies.length * 0.5).toInt) - val p99 = latencies((latencies.length * 0.99).toInt) + val p99 = latencies((latencies.length * 0.99).toInt) val p999 = latencies((latencies.length * 0.999).toInt) println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999)) - producer.close() - connector.commitOffsets(true) - connector.shutdown() - System.exit(0) + + finalise() + } + + def arrayOfLen(len: Int): Array[Byte] = { + Array.fill(len)((scala.util.Random.nextInt(26) + 65).toByte) } } \ No newline at end of file From 43d6a0678fd37d7f382d5f89c898571ae4e3cfbb Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Fri, 21 Aug 2015 17:45:19 +0100 Subject: [PATCH 02/12] KAFKA-2453: small change which prevents the ConsoleConsumer from throwing an exception when the Finalizer thread tries to close it. --- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index a9c542725b0f..28c8d228a6f6 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -345,3 +345,4 @@ class ChecksumMessageFormatter extends MessageFormatter { output.println(topicStr + "checksum:" + chksum) } } + From cac85029c7d802da29d68073545c118804bd41cb Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Fri, 21 Aug 2015 18:08:29 +0100 Subject: [PATCH 03/12] KAFKA-2453: Added additional arguments to call to EndToEndLatency from Performance tests --- tests/kafkatest/services/performance.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/kafkatest/services/performance.py b/tests/kafkatest/services/performance.py index 65c1a4d6ea07..2e8e80be1ac2 100644 --- a/tests/kafkatest/services/performance.py +++ b/tests/kafkatest/services/performance.py @@ -127,8 +127,8 @@ def _worker(self, idx, node): 'bootstrap_servers': self.kafka.bootstrap_servers(), }) cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\ - "%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d "\ - "%(consumer_fetch_max_wait)d %(acks)d" % args + "%(bootstrap_servers)s %(topic)s %(num_records)d "\ + "%(acks)d 20 true" % args self.logger.debug("End-to-end latency %d command: %s", idx, cmd) results = {} for line in node.account.ssh_capture(cmd): From 119a6fa545bcaf586c9cb110f0e13c1cfee1f56c Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Tue, 1 Sep 2015 11:20:14 +0100 Subject: [PATCH 04/12] KAFKA-2453: Rebased to trunk KAFKA-2453: removed whitespace KAFKA-2453: Formatting only --- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 1 - core/src/main/scala/kafka/tools/EndToEndLatency.scala | 4 +++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 28c8d228a6f6..a9c542725b0f 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -345,4 +345,3 @@ class ChecksumMessageFormatter extends MessageFormatter { output.println(topicStr + "checksum:" + chksum) } } - diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala index 2a53a1dcfb3e..5c16c609cec5 100755 --- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala @@ -22,6 +22,8 @@ import java.util.{Arrays, Properties} import org.apache.kafka.clients.consumer.{CommitType, ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import scala.collection.JavaConversions._ + /** * This class records the average end to end latency for a single message to travel through Kafka @@ -60,7 +62,7 @@ object EndToEndLatency { consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps) - consumer.subscribe(topic) + consumer.subscribe(List(topic)) val producerProps = new Properties() producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) From 954f076701dd8961e3f835b08a022fc31ae74943 Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Tue, 1 Sep 2015 16:30:17 +0100 Subject: [PATCH 05/12] KAFKA-2453: Incorporate changes from KAFKA-2486 Previous version used an optional busy loop to get better performance by avoiding sleeps inside the API. These turned out to be a bug fixed in KAFKA-2486 so the optional busy loop has been removed. --- .../scala/kafka/tools/EndToEndLatency.scala | 19 ++++++------------- tests/kafkatest/services/performance.py | 2 +- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala index 5c16c609cec5..796228d2e982 100755 --- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala @@ -32,15 +32,14 @@ import scala.collection.JavaConversions._ * num_messages = # messages to send * producer_acks = See ProducerConfig.ACKS_DOC * message_size_bytes = size of each message in bytes - * use_busy_wait = if False the client will sleep for 1ms (ConsumerConfig.RETRY_BACKOFF_MS_CONFIG) between requests. This limits the lower granularity. Setting to true provides finer grained results (useful when running locally). * - * e.g. [localhost:9092 test 10000 1 20 true] + * e.g. [localhost:9092 test 10000 1 20] */ object EndToEndLatency { def main(args: Array[String]) { - if (args.length != 6) { - System.err.println("USAGE: java " + getClass.getName + " broker_list topic num_messages producer_acks message_size_bytes use_busy_wait") + if (args.length != 5) { + System.err.println("USAGE: java " + getClass.getName + " broker_list topic num_messages producer_acks message_size_bytes") System.exit(1) } @@ -48,12 +47,11 @@ object EndToEndLatency { val topic = args(1) val numMessages = args(2).toInt val producerAcks = args(3).toInt - val messageLen = if (args.length > 4) args(4).toInt else 20 - val busyWait = args(5).toBoolean + val messageLen = args(4).toInt val consumerProps = new Properties() consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, topic) + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + System.currentTimeMillis()) consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") @@ -92,12 +90,7 @@ object EndToEndLatency { val begin = System.nanoTime producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, message)) - - if (busyWait) - while (!recordIter.hasNext) - recordIter = consumer.poll(0).iterator - else - recordIter = consumer.poll(Long.MaxValue).iterator + recordIter = consumer.poll(Long.MaxValue).iterator val elapsed = System.nanoTime - begin diff --git a/tests/kafkatest/services/performance.py b/tests/kafkatest/services/performance.py index 2e8e80be1ac2..34892e0b7ebd 100644 --- a/tests/kafkatest/services/performance.py +++ b/tests/kafkatest/services/performance.py @@ -128,7 +128,7 @@ def _worker(self, idx, node): }) cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\ "%(bootstrap_servers)s %(topic)s %(num_records)d "\ - "%(acks)d 20 true" % args + "%(acks)d 20" % args self.logger.debug("End-to-end latency %d command: %s", idx, cmd) results = {} for line in node.account.ssh_capture(cmd): From 4b96fab3037e6ed2ee00a0ff370168000fddcd09 Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Tue, 1 Sep 2015 19:17:29 +0100 Subject: [PATCH 06/12] KAFKA-2453: removed sleep which I believe is not longer needed now we have consumer.seekToEnd() --- core/src/main/scala/kafka/tools/EndToEndLatency.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala index 796228d2e982..9419b649f3fb 100755 --- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala @@ -79,7 +79,6 @@ object EndToEndLatency { //Ensure we are at latest offset var recordIter = consumer.poll(0).iterator - Thread.sleep(2000) consumer.seekToEnd() var totalTime = 0.0 From 3e52bca1f0f711b7abdee680253f808bb871057e Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Tue, 1 Sep 2015 23:04:17 +0100 Subject: [PATCH 07/12] KAFKA-2453: Producer acks can be a string --- core/src/main/scala/kafka/tools/EndToEndLatency.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala index 9419b649f3fb..bd174bdcb668 100755 --- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala @@ -46,7 +46,7 @@ object EndToEndLatency { val brokerList = args(0) val topic = args(1) val numMessages = args(2).toInt - val producerAcks = args(3).toInt + val producerAcks = args(3) val messageLen = args(4).toInt val consumerProps = new Properties() From 29ce7cd9c95980f391bf3315099d285c120ace6e Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Wed, 2 Sep 2015 14:13:06 +0100 Subject: [PATCH 08/12] KAFKA-2453: Feedback from Gwen + fix to seek problem - Fixed issue with seekToEnd evaluating lazily (i.e. when poll is called) meaning messages can be missed in slower environments (discovered when I ran this on EC2). Detailed in comments. - Removed redundant retry backoff override (this was an artifact of KAFKA-2486) - Forced producer acks to be 1 or all (i.e. synchronous) - Reduced poll to a reasonable timeout to avoid hangs in erroneous situations - Added check for results being non zero - Added check that there is only a single message returned --- .../scala/kafka/tools/EndToEndLatency.scala | 35 ++++++++++++++----- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala index bd174bdcb668..bc20529c8bb6 100755 --- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala @@ -20,7 +20,7 @@ package kafka.tools import java.util.{Arrays, Properties} import org.apache.kafka.clients.consumer.{CommitType, ConsumerConfig, KafkaConsumer} -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.clients.producer._ import scala.collection.JavaConversions._ @@ -49,6 +49,9 @@ object EndToEndLatency { val producerAcks = args(3) val messageLen = args(4).toInt + if(!List("1","all").contains(producerAcks)) + throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all") + val consumerProps = new Properties() consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + System.currentTimeMillis()) @@ -56,8 +59,7 @@ object EndToEndLatency { consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - consumerProps.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, "1") - consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") + consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") //ensure we have no temporal batching val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps) consumer.subscribe(List(topic)) @@ -77,23 +79,31 @@ object EndToEndLatency { consumer.close() } - //Ensure we are at latest offset - var recordIter = consumer.poll(0).iterator + //Ensure we are at latest offset. seekToEnd evaluates lazily, that is to say actually performs the seek only when + //a poll() or position() request is issued. Hence we need to poll after we seek to ensure we see our first write. consumer.seekToEnd() + consumer.poll(0).iterator var totalTime = 0.0 val latencies = new Array[Long](numMessages) for (i <- 0 until numMessages) { - val message = arrayOfLen(messageLen) + val message = randomBytesOfLen(messageLen) val begin = System.nanoTime + //Send message (of random bytes) synchronously then immediately poll for it producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, message)) - recordIter = consumer.poll(Long.MaxValue).iterator + val recordIter = consumer.poll(60000).iterator val elapsed = System.nanoTime - begin - //Check result matches original record + //Check we got results + if(!recordIter.hasNext){ + finalise() + throw new RuntimeException("poll() timed out before finding a result") + } + + //Check result matches the original record val sent = new String(message) val read = new String(recordIter.next().value()) if (!read.equals(sent)) { @@ -101,6 +111,13 @@ object EndToEndLatency { throw new RuntimeException(s"The message read [$read] did not match the message sent [$sent]") } + //Check we only got the one message + if(recordIter.hasNext){ + val additional = recordIter.next() + finalise() + throw new RuntimeException(s"Only one result was expected during this test. We found both [$read] and [$additional]") + } + //Report progress if (i % 1000 == 0) println(i + "\t" + elapsed / 1000.0 / 1000.0) @@ -119,7 +136,7 @@ object EndToEndLatency { finalise() } - def arrayOfLen(len: Int): Array[Byte] = { + def randomBytesOfLen(len: Int): Array[Byte] = { Array.fill(len)((scala.util.Random.nextInt(26) + 65).toByte) } } \ No newline at end of file From d2d7378c7fdaf0a095141ece1dff8c52aa72a9ac Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Wed, 2 Sep 2015 15:09:33 +0100 Subject: [PATCH 09/12] KAFKA-2453: downgrade duplicate message exception to warning --- core/src/main/scala/kafka/tools/EndToEndLatency.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala index bc20529c8bb6..310ef4dbc7a2 100755 --- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala @@ -66,7 +66,7 @@ object EndToEndLatency { val producerProps = new Properties() producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") //ensure writes are synchronous producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString) producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") @@ -113,9 +113,9 @@ object EndToEndLatency { //Check we only got the one message if(recordIter.hasNext){ - val additional = recordIter.next() - finalise() - throw new RuntimeException(s"Only one result was expected during this test. We found both [$read] and [$additional]") + var count = 1 + for (elem <- recordIter) count+=1 + Console.err.println(s"Only one result was expected during this test. We found [$count]") } //Report progress From a6544d19cb8d1da58a67f878d8204c10a3c42c1a Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Wed, 2 Sep 2015 17:05:12 +0100 Subject: [PATCH 10/12] KAFKA-2453: Added in support for ssl properties file. Ismael's changes --- .../scala/kafka/tools/EndToEndLatency.scala | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala index 310ef4dbc7a2..8fce94251fac 100755 --- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala @@ -21,6 +21,7 @@ import java.util.{Arrays, Properties} import org.apache.kafka.clients.consumer.{CommitType, ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer._ +import org.apache.kafka.common.utils.Utils import scala.collection.JavaConversions._ @@ -33,13 +34,14 @@ import scala.collection.JavaConversions._ * producer_acks = See ProducerConfig.ACKS_DOC * message_size_bytes = size of each message in bytes * - * e.g. [localhost:9092 test 10000 1 20] + * e.g. [localhost:9092 test 10000 1 20] */ object EndToEndLatency { def main(args: Array[String]) { - if (args.length != 5) { - System.err.println("USAGE: java " + getClass.getName + " broker_list topic num_messages producer_acks message_size_bytes") + println(args.length) + if (args.length != 5 && args.length != 6) { + System.err.println("USAGE: java " + getClass.getName + " broker_list topic num_messages producer_acks message_size_bytes [optional] ssl_properties_file") System.exit(1) } @@ -48,11 +50,12 @@ object EndToEndLatency { val numMessages = args(2).toInt val producerAcks = args(3) val messageLen = args(4).toInt + val sslPropsFile = if (args.length == 6) args(5) else "" - if(!List("1","all").contains(producerAcks)) + if (!List("1", "all").contains(producerAcks)) throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all") - val consumerProps = new Properties() + val consumerProps = if (sslPropsFile.equals("")) new Properties() else Utils.loadProps(sslPropsFile) consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + System.currentTimeMillis()) consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") @@ -64,7 +67,7 @@ object EndToEndLatency { val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps) consumer.subscribe(List(topic)) - val producerProps = new Properties() + val producerProps = if (sslPropsFile.equals("")) new Properties() else Utils.loadProps(sslPropsFile) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") //ensure writes are synchronous producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") @@ -82,7 +85,7 @@ object EndToEndLatency { //Ensure we are at latest offset. seekToEnd evaluates lazily, that is to say actually performs the seek only when //a poll() or position() request is issued. Hence we need to poll after we seek to ensure we see our first write. consumer.seekToEnd() - consumer.poll(0).iterator + consumer.poll(0) var totalTime = 0.0 val latencies = new Array[Long](numMessages) @@ -98,7 +101,7 @@ object EndToEndLatency { val elapsed = System.nanoTime - begin //Check we got results - if(!recordIter.hasNext){ + if (!recordIter.hasNext) { finalise() throw new RuntimeException("poll() timed out before finding a result") } @@ -112,9 +115,9 @@ object EndToEndLatency { } //Check we only got the one message - if(recordIter.hasNext){ + if (recordIter.hasNext) { var count = 1 - for (elem <- recordIter) count+=1 + for (elem <- recordIter) count += 1 Console.err.println(s"Only one result was expected during this test. We found [$count]") } From d5a46a0da5695bc6f4d35c87e9562c03edeea15d Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Tue, 8 Sep 2015 14:41:30 -0700 Subject: [PATCH 11/12] KAFKA-2453: incorporated feedback - improved documentation of seekToEnd() - added test to describe behaviour - added .get() to end of send() call in EndToEndLatency to ensure call is synchronous - made timeout a constant - added timeout to error message - upgraded warning for additional messages returned to be an error --- .../kafka/clients/consumer/KafkaConsumer.java | 3 +- .../scala/kafka/tools/EndToEndLatency.scala | 10 +++-- .../integration/kafka/api/ConsumerTest.scala | 44 +++++++++++++------ 3 files changed, 38 insertions(+), 19 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 8cd285c8852e..93e091a538e1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -909,7 +909,8 @@ public void seekToBeginning(TopicPartition... partitions) { } /** - * Seek to the last offset for each of the given partitions + * Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the + * final offset in all partitions only when poll() or position() are called. */ public void seekToEnd(TopicPartition... partitions) { acquire(); diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala index 8fce94251fac..cbaed0aca6ee 100755 --- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala @@ -38,6 +38,8 @@ import scala.collection.JavaConversions._ */ object EndToEndLatency { + private val timeout: Long = 60000 + def main(args: Array[String]) { println(args.length) if (args.length != 5 && args.length != 6) { @@ -95,15 +97,15 @@ object EndToEndLatency { val begin = System.nanoTime //Send message (of random bytes) synchronously then immediately poll for it - producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, message)) - val recordIter = consumer.poll(60000).iterator + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, message)).get() + val recordIter = consumer.poll(timeout).iterator val elapsed = System.nanoTime - begin //Check we got results if (!recordIter.hasNext) { finalise() - throw new RuntimeException("poll() timed out before finding a result") + throw new RuntimeException(s"poll() timed out before finding a result (timeout:[$timeout])") } //Check result matches the original record @@ -118,7 +120,7 @@ object EndToEndLatency { if (recordIter.hasNext) { var count = 1 for (elem <- recordIter) count += 1 - Console.err.println(s"Only one result was expected during this test. We found [$count]") + throw new RuntimeException(s"Only one result was expected during this test. We found [$count]") } //Report progress diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index c1e5d02206c2..2fba9a73ea62 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -74,7 +74,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertEquals(0, this.consumers(0).assignment.size) this.consumers(0).assign(List(tp)) assertEquals(1, this.consumers(0).assignment.size) - + this.consumers(0).seek(tp, 0) consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0) @@ -99,7 +99,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { this.consumers(0).poll(50) val pos1 = this.consumers(0).position(tp) val pos2 = this.consumers(0).position(tp2) - this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp, 3L)).asJava, CommitType.SYNC) + this.consumers(0).commit(Map[TopicPartition, java.lang.Long]((tp, 3L)).asJava, CommitType.SYNC) assertEquals(3, this.consumers(0).committed(tp)) intercept[NoOffsetForPartitionException] { this.consumers(0).committed(tp2) @@ -107,13 +107,13 @@ class ConsumerTest extends IntegrationTestHarness with Logging { // positions should not change assertEquals(pos1, this.consumers(0).position(tp)) assertEquals(pos2, this.consumers(0).position(tp2)) - this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 5L)).asJava, CommitType.SYNC) + this.consumers(0).commit(Map[TopicPartition, java.lang.Long]((tp2, 5L)).asJava, CommitType.SYNC) assertEquals(3, this.consumers(0).committed(tp)) assertEquals(5, this.consumers(0).committed(tp2)) // Using async should pick up the committed changes after commit completes val commitCallback = new CountConsumerCommitCallback() - this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 7L)).asJava, CommitType.ASYNC, commitCallback) + this.consumers(0).commit(Map[TopicPartition, java.lang.Long]((tp2, 7L)).asJava, CommitType.ASYNC, commitCallback) awaitCommitCallback(this.consumers(0), commitCallback) assertEquals(7, this.consumers(0).committed(tp2)) } @@ -146,6 +146,22 @@ class ConsumerTest extends IntegrationTestHarness with Logging { consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt) } + @Test + def testSeekShouldEvaluateLazily() { + val consumer = this.consumers(0) + + //Given + sendRecords(5) + consumer.assign(List(tp)) + consumer.seekToEnd(tp) + + //When + sendRecords(1) + + //Then - should include record written after seekToEnd() + assertEquals(5 + 1, consumer.position(tp)) + } + @Test def testGroupConsumption() { sendRecords(10) @@ -221,24 +237,24 @@ class ConsumerTest extends IntegrationTestHarness with Logging { this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30"); val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) consumer0.subscribe(List(topic), listener) - + // the initial subscription should cause a callback execution - while(listener.callsToAssigned == 0) + while (listener.callsToAssigned == 0) consumer0.poll(50) - + // get metadata for the topic var parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala - while(parts == null) + while (parts == null) parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala assertEquals(1, parts.size) assertNotNull(parts(0).leader()) - + // shutdown the coordinator val coordinator = parts(0).leader().id() this.servers(coordinator).shutdown() - + // this should cause another callback execution - while(listener.callsToAssigned < 2) + while (listener.callsToAssigned < 2) consumer0.poll(50) assertEquals(2, listener.callsToAssigned) @@ -341,11 +357,11 @@ class ConsumerTest extends IntegrationTestHarness with Logging { private class TestConsumerReassignmentListener extends ConsumerRebalanceListener { var callsToAssigned = 0 var callsToRevoked = 0 - def onPartitionsAssigned(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) { + def onPartitionsAssigned(consumer: Consumer[_, _], partitions: java.util.Collection[TopicPartition]) { info("onPartitionsAssigned called.") callsToAssigned += 1 } - def onPartitionsRevoked(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) { + def onPartitionsRevoked(consumer: Consumer[_, _], partitions: java.util.Collection[TopicPartition]) { info("onPartitionsRevoked called.") callsToRevoked += 1 } @@ -369,7 +385,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { while (records.size < numRecords) { for (record <- consumer.poll(50).asScala) records.add(record) - if(iters > maxIters) + if (iters > maxIters) throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.") iters += 1 } From aae920e25ab1b2de20e2bc77d956b2e9d9cc1494 Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Tue, 8 Sep 2015 15:04:11 -0700 Subject: [PATCH 12/12] KAFKA-2453: reverting changes to KafkaConsumer/ConsumerTest -> will put in their own PR --- .../kafka/clients/consumer/KafkaConsumer.java | 3 +- .../integration/kafka/api/ConsumerTest.scala | 44 ++++++------------- 2 files changed, 15 insertions(+), 32 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 93e091a538e1..8cd285c8852e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -909,8 +909,7 @@ public void seekToBeginning(TopicPartition... partitions) { } /** - * Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the - * final offset in all partitions only when poll() or position() are called. + * Seek to the last offset for each of the given partitions */ public void seekToEnd(TopicPartition... partitions) { acquire(); diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 2fba9a73ea62..c1e5d02206c2 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -74,7 +74,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertEquals(0, this.consumers(0).assignment.size) this.consumers(0).assign(List(tp)) assertEquals(1, this.consumers(0).assignment.size) - + this.consumers(0).seek(tp, 0) consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0) @@ -99,7 +99,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { this.consumers(0).poll(50) val pos1 = this.consumers(0).position(tp) val pos2 = this.consumers(0).position(tp2) - this.consumers(0).commit(Map[TopicPartition, java.lang.Long]((tp, 3L)).asJava, CommitType.SYNC) + this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp, 3L)).asJava, CommitType.SYNC) assertEquals(3, this.consumers(0).committed(tp)) intercept[NoOffsetForPartitionException] { this.consumers(0).committed(tp2) @@ -107,13 +107,13 @@ class ConsumerTest extends IntegrationTestHarness with Logging { // positions should not change assertEquals(pos1, this.consumers(0).position(tp)) assertEquals(pos2, this.consumers(0).position(tp2)) - this.consumers(0).commit(Map[TopicPartition, java.lang.Long]((tp2, 5L)).asJava, CommitType.SYNC) + this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 5L)).asJava, CommitType.SYNC) assertEquals(3, this.consumers(0).committed(tp)) assertEquals(5, this.consumers(0).committed(tp2)) // Using async should pick up the committed changes after commit completes val commitCallback = new CountConsumerCommitCallback() - this.consumers(0).commit(Map[TopicPartition, java.lang.Long]((tp2, 7L)).asJava, CommitType.ASYNC, commitCallback) + this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 7L)).asJava, CommitType.ASYNC, commitCallback) awaitCommitCallback(this.consumers(0), commitCallback) assertEquals(7, this.consumers(0).committed(tp2)) } @@ -146,22 +146,6 @@ class ConsumerTest extends IntegrationTestHarness with Logging { consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt) } - @Test - def testSeekShouldEvaluateLazily() { - val consumer = this.consumers(0) - - //Given - sendRecords(5) - consumer.assign(List(tp)) - consumer.seekToEnd(tp) - - //When - sendRecords(1) - - //Then - should include record written after seekToEnd() - assertEquals(5 + 1, consumer.position(tp)) - } - @Test def testGroupConsumption() { sendRecords(10) @@ -237,24 +221,24 @@ class ConsumerTest extends IntegrationTestHarness with Logging { this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30"); val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) consumer0.subscribe(List(topic), listener) - + // the initial subscription should cause a callback execution - while (listener.callsToAssigned == 0) + while(listener.callsToAssigned == 0) consumer0.poll(50) - + // get metadata for the topic var parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala - while (parts == null) + while(parts == null) parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala assertEquals(1, parts.size) assertNotNull(parts(0).leader()) - + // shutdown the coordinator val coordinator = parts(0).leader().id() this.servers(coordinator).shutdown() - + // this should cause another callback execution - while (listener.callsToAssigned < 2) + while(listener.callsToAssigned < 2) consumer0.poll(50) assertEquals(2, listener.callsToAssigned) @@ -357,11 +341,11 @@ class ConsumerTest extends IntegrationTestHarness with Logging { private class TestConsumerReassignmentListener extends ConsumerRebalanceListener { var callsToAssigned = 0 var callsToRevoked = 0 - def onPartitionsAssigned(consumer: Consumer[_, _], partitions: java.util.Collection[TopicPartition]) { + def onPartitionsAssigned(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) { info("onPartitionsAssigned called.") callsToAssigned += 1 } - def onPartitionsRevoked(consumer: Consumer[_, _], partitions: java.util.Collection[TopicPartition]) { + def onPartitionsRevoked(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) { info("onPartitionsRevoked called.") callsToRevoked += 1 } @@ -385,7 +369,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging { while (records.size < numRecords) { for (record <- consumer.poll(50).asScala) records.add(record) - if (iters > maxIters) + if(iters > maxIters) throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.") iters += 1 }