From 515e971dd44e2be34253973d136e7e5d4c0fc174 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 6 Nov 2015 17:02:22 -0800 Subject: [PATCH 1/5] KAFKA-2274: verifiable consumer and integration testing --- bin/kafka-verifiable-consumer.sh | 20 + .../clients/consumer/ConsumerRecords.java | 24 +- .../clients/consumer/internals/Fetcher.java | 1 - .../kafka/coordinator/GroupCoordinator.scala | 1 + .../coordinator/GroupMetadataManager.scala | 3 + .../kafkatest/services/verifiable_consumer.py | 215 +++++++ .../kafkatest/services/verifiable_producer.py | 15 +- .../tests/hello_verifiable_consumer.py | 143 ++++ .../kafka/tools/VerifiableConsumer.java | 609 ++++++++++++++++++ 9 files changed, 1019 insertions(+), 12 deletions(-) create mode 100755 bin/kafka-verifiable-consumer.sh create mode 100644 tests/kafkatest/services/verifiable_consumer.py create mode 100644 tests/kafkatest/tests/hello_verifiable_consumer.py create mode 100644 tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java diff --git a/bin/kafka-verifiable-consumer.sh b/bin/kafka-verifiable-consumer.sh new file mode 100755 index 0000000000000..fae064eca6182 --- /dev/null +++ b/bin/kafka-verifiable-consumer.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then + export KAFKA_HEAP_OPTS="-Xmx512M" +fi +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer $@ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index 8e6fef45dbc41..8ee9be28dc742 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; /** * A container that holds the list {@link ConsumerRecord} per partition for a @@ -27,8 +28,7 @@ * partition returned by a {@link Consumer#poll(long)} operation. */ public class ConsumerRecords implements Iterable> { - public static final ConsumerRecords EMPTY = - new ConsumerRecords(Collections.EMPTY_MAP); + public static final ConsumerRecords EMPTY = new ConsumerRecords<>(Collections.EMPTY_MAP); private final Map>> records; @@ -41,12 +41,12 @@ public ConsumerRecords(Map>> records) * * @param partition The partition to get records for */ - public Iterable> records(TopicPartition partition) { + public List> records(TopicPartition partition) { List> recs = this.records.get(partition); if (recs == null) return Collections.emptyList(); else - return recs; + return Collections.unmodifiableList(recs); } /** @@ -55,19 +55,27 @@ public Iterable> records(TopicPartition partition) { public Iterable> records(String topic) { if (topic == null) throw new IllegalArgumentException("Topic must be non-null."); - List>> recs = new ArrayList>>(); + List>> recs = new ArrayList<>(); for (Map.Entry>> entry : records.entrySet()) { if (entry.getKey().topic().equals(topic)) recs.add(entry.getValue()); } - return new ConcatenatedIterable(recs); + return new ConcatenatedIterable<>(recs); + } + + /** + * Get the partitions which have records contained in this record set. + * @return the set of partitions with data in this record set (may be empty if no data was returned) + */ + public Set partitions() { + return Collections.unmodifiableSet(records.keySet()); } @Override public Iterator> iterator() { - return new ConcatenatedIterable(records.values()).iterator(); + return new ConcatenatedIterable<>(records.values()).iterator(); } - + /** * The number of records for all topics */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 5907acaea43a7..4f0fbedcbf181 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -596,7 +596,6 @@ private ConsumerRecord parseRecord(TopicPartition partition, LogEntry logE } catch (RuntimeException e) { throw new KafkaException("Error deserializing key/value for partition " + partition + " at offset " + logEntry.offset(), e); } - } private static class PartitionRecords { diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala index 2acc2237e1445..e9c3c0185b0bc 100644 --- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala @@ -273,6 +273,7 @@ class GroupCoordinator(val brokerId: Int, // if this is the leader, then we can attempt to persist state and transition to stable if (memberId == group.leaderId) { + info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}") // fill any missing members with an empty assignment val missing = group.allMembers -- groupAssignment.keySet diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index f98fc7402fe2d..8e7b5c1842bab 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -124,6 +124,9 @@ class GroupMetadataManager(val brokerId: Int, groupsCache.get(groupId) } + + + /** * Update the current cached metadata for the group with the given groupId or add the group if there is none. */ diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py new file mode 100644 index 0000000000000..2cb8357cfc266 --- /dev/null +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -0,0 +1,215 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.background_thread import BackgroundThreadService + +from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK +from kafkatest.services.kafka.version import TRUNK +from kafkatest.services.security.security_config import SecurityConfig + +from collections import namedtuple +import json +import os +import subprocess +import time +import signal + +TopicPartition = namedtuple('TopicPartition', ['topic', 'partition']) + +class VerifiableConsumer(BackgroundThreadService): + PERSISTENT_ROOT = "/mnt/verifiable_consumer" + STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.stdout") + STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.stderr") + LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs") + LOG_FILE = os.path.join(LOG_DIR, "verifiable_consumer.log") + LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") + CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.properties") + + logs = { + "verifiable_consumer_stdout": { + "path": STDOUT_CAPTURE, + "collect_default": False}, + "verifiable_consumer_stderr": { + "path": STDERR_CAPTURE, + "collect_default": False}, + "verifiable_consumer_log": { + "path": LOG_FILE, + "collect_default": True} + } + + def __init__(self, context, num_nodes, kafka, topic, group_id, + max_messages=-1, session_timeout=30000, version=TRUNK): + super(VerifiableConsumer, self).__init__(context, num_nodes) + self.log_level = "TRACE" + + self.kafka = kafka + self.topic = topic + self.group_id = group_id + self.max_messages = max_messages + self.session_timeout = session_timeout + + self.assignment = {} + self.joined = set() + self.total_records = 0 + self.consumed_positions = {} + self.committed_offsets = {} + self.revoked_count = 0 + self.assigned_count = 0 + + for node in self.nodes: + node.version = version + + self.prop_file = "" + self.security_config = kafka.security_config.client_config(self.prop_file) + self.prop_file += str(self.security_config) + + def _worker(self, idx, node): + node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False) + + # Create and upload log properties + log_config = self.render('tools_log4j.properties', log_file=VerifiableConsumer.LOG_FILE) + node.account.create_file(VerifiableConsumer.LOG4J_CONFIG, log_config) + + # Create and upload config file + self.logger.info("verifiable_consumer.properties:") + self.logger.info(self.prop_file) + node.account.create_file(VerifiableConsumer.CONFIG_FILE, self.prop_file) + self.security_config.setup_node(node) + + cmd = self.start_cmd(node) + self.logger.debug("VerifiableConsumer %d command: %s" % (idx, cmd)) + + for line in node.account.ssh_capture(cmd): + event = self.try_parse_json(line.strip()) + if event is not None: + with self.lock: + name = event["name"] + if name == "offsets_committed": + self._handle_offsets_committed(node, event) + elif name == "records_consumed": + self._handle_records_consumed(node, event) + elif name == "partitions_revoked": + self._handle_partitions_revoked(node, event) + elif name == "partitions_assigned": + self._handle_partitions_assigned(node, event) + + def _handle_offsets_committed(self, node, event): + if event["success"]: + for offset_commit in event["offsets"]: + topic = offset_commit["topic"] + partition = offset_commit["partition"] + tp = TopicPartition(topic, partition) + self.committed_offsets[tp] = offset_commit["offset"] + + def _handle_records_consumed(self, node, event): + for topic_partition in event["partitions"]: + topic = topic_partition["topic"] + partition = topic_partition["partition"] + tp = TopicPartition(topic, partition) + self.consumed_positions[tp] = topic_partition["maxOffset"] + 1 + self.total_records += event["count"] + + def _handle_partitions_revoked(self, node, event): + self.revoked_count += 1 + self.assignment[node] = [] + if node in self.joined: + self.joined.remove(node) + + def _handle_partitions_assigned(self, node, event): + self.assigned_count += 1 + self.joined.add(node) + assignment =[] + for topic_partition in event["partitions"]: + topic = topic_partition["topic"] + partition = topic_partition["partition"] + assignment.append(TopicPartition(topic, partition)) + self.assignment[node] = assignment + + def start_cmd(self, node): + cmd = "" + cmd += "export LOG_DIR=%s;" % VerifiableConsumer.LOG_DIR + cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts + cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG + cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer" \ + " --groupId %s --topic %s --broker-list %s --session-timeout %s" % \ + (self.group_id, self.topic, self.kafka.bootstrap_servers(), self.session_timeout) + if self.max_messages > 0: + cmd += " --max-messages %s" % str(self.max_messages) + + cmd += " --consumer.config %s" % VerifiableConsumer.CONFIG_FILE + cmd += " 2>> %s | tee -a %s &" % (VerifiableConsumer.STDOUT_CAPTURE, VerifiableConsumer.STDOUT_CAPTURE) + print(cmd) + return cmd + + def pids(self, node): + try: + cmd = "jps | grep -i VerifiableConsumer | awk '{print $1}'" + pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] + return pid_arr + except (subprocess.CalledProcessError, ValueError) as e: + return [] + + def try_parse_json(self, string): + """Try to parse a string as json. Return None if not parseable.""" + try: + record = json.loads(string) + return record + except ValueError: + self.logger.debug("Could not parse as json: %s" % str(string)) + return None + + def kill_consumer(self, node, clean_shutdown=True, allow_fail=False): + if clean_shutdown: + sig = signal.SIGTERM + else: + sig = signal.SIGKILL + for pid in self.pids(node): + node.account.signal(pid, sig, allow_fail) + + def stop_node(self, node): + self.kill_consumer(node) + + if self.worker_threads is None: + return + + # block until the corresponding thread exits + if len(self.worker_threads) >= self.idx(node): + # Need to guard this because stop is preemptively called before the worker threads are added and started + self.worker_threads[self.idx(node) - 1].join() + + def clean_node(self, node): + self.kill_consumer(node, clean_shutdown=False) + node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False) + self.security_config.clean_node(node) + + def current_assignment(self): + with self.lock: + return self.assignment + + def position(self, tp): + with self.lock: + return self.consumed_positions[tp] + + def owner(self, tp): + with self.lock: + for node, assignment in self.assignment.iteritems(): + if tp in assignment: + return node + return None + + def committed(self, tp): + with self.lock: + return self.committed_offsets[tp] + diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index bda6af2ccde06..9985bbd731040 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -21,6 +21,7 @@ import json import os +import signal import subprocess import time @@ -128,9 +129,17 @@ def start_cmd(self, node): cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE) return cmd + def kill_node(self, node, clean_shutdown=True, allow_fail=False): + if clean_shutdown: + sig = signal.SIGTERM + else: + sig = signal.SIGKILL + for pid in self.pids(node): + node.account.signal(pid, sig, allow_fail) + def pids(self, node): try: - cmd = "ps ax | grep -i VerifiableProducer | grep java | grep -v grep | awk '{print $1}'" + cmd = "jps | grep -i VerifiableProducer | awk '{print $1}'" pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] return pid_arr except (subprocess.CalledProcessError, ValueError) as e: @@ -160,7 +169,7 @@ def num_not_acked(self): return len(self.not_acked_values) def stop_node(self, node): - node.account.kill_process("VerifiableProducer", allow_fail=False) + self.kill_node(node, clean_shutdown=False, allow_fail=False) if self.worker_threads is None: return @@ -170,7 +179,7 @@ def stop_node(self, node): self.worker_threads[self.idx(node) - 1].join() def clean_node(self, node): - node.account.kill_process("VerifiableProducer", clean_shutdown=False, allow_fail=False) + self.kill_node(node, clean_shutdown=False, allow_fail=False) node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False) self.security_config.clean_node(node) diff --git a/tests/kafkatest/tests/hello_verifiable_consumer.py b/tests/kafkatest/tests/hello_verifiable_consumer.py new file mode 100644 index 0000000000000..41caaa505086f --- /dev/null +++ b/tests/kafkatest/tests/hello_verifiable_consumer.py @@ -0,0 +1,143 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafkatest.tests.kafka_test import KafkaTest +from ducktape.utils.util import wait_until + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.verifiable_consumer import VerifiableConsumer, TopicPartition + +import time + +def partitions_for(topic, num_partitions): + partitions = set() + for i in range(num_partitions): + partitions.add(TopicPartition(topic=topic, partition=i)) + return partitions + +class HelloVerifiableConsumerTest(KafkaTest): + + STOPIC = "simple_topic" + TOPIC = "test_topic" + NUM_PARTITIONS = 3 + PARTITIONS = partitions_for(TOPIC, NUM_PARTITIONS) + GROUP_ID = "test_group_id" + + def __init__(self, test_context): + super(HelloVerifiableConsumerTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={ + self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }, + self.STOPIC : { 'partitions': 1, 'replication-factor': 2 } + }) + self.num_producers = 1 + self.num_consumers = 2 + + def min_cluster_size(self): + """Override this since we're adding services outside of the constructor""" + return super(HelloVerifiableConsumerTest, self).min_cluster_size() + self.num_consumers + self.num_producers + + def _partitions(self, assignment): + partitions = [] + for parts in assignment.itervalues(): + partitions += parts + return partitions + + def _valid_assignment(self, assignment): + partitions = self._partitions(assignment) + return len(partitions) == self.NUM_PARTITIONS and set(partitions) == self.PARTITIONS + + def _setup_consumer(self, topic): + return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka, + topic, self.GROUP_ID, session_timeout=10000) + + def _setup_producer(self, topic, max_messages=-1): + return VerifiableProducer(self.test_context, self.num_producers, + self.kafka, topic, max_messages=max_messages) + + def _await_all_members(self, consumer): + # Wait until all members have joined the group + wait_until(lambda: len(consumer.joined) == self.num_consumers, timeout_sec=20, + err_msg="Consumers failed to join in a reasonable amount of time") + + def test_consumer_failure(self): + partition = TopicPartition(self.STOPIC, 0) + + consumer = self._setup_consumer(self.STOPIC) + producer = self._setup_producer(self.STOPIC) + + consumer.start() + self._await_all_members(consumer) + + partition_owner = consumer.owner(partition) + assert partition_owner is not None + + producer.start() + wait_until(lambda: producer.num_acked > 1000, timeout_sec=20, + err_msg="Producer failed waiting for messages to be written") + + consumer.kill_consumer(partition_owner) + time.sleep(10) + + # if the total records consumed matches the current position, + # we haven't seen any duplicates + assert consumer.position(partition) == consumer.total_records + assert consumer.committed(partition) <= consumer.total_records + + def test_broker_failure(self): + partition = TopicPartition(self.STOPIC, 0) + + consumer = self._setup_consumer(self.STOPIC) + producer = self._setup_producer(self.STOPIC) + + producer.start() + consumer.start() + self._await_all_members(consumer) + + # shutdown one of the brokers + self.kafka.signal_node(self.kafka.nodes[0]) + time.sleep(10) + + # if the total records consumed matches the current position, + # we haven't seen any duplicates + assert consumer.position(partition) == consumer.total_records + assert consumer.committed(partition) <= consumer.total_records + + def test_simple_consume(self): + total_records = 1000 + + consumer = self._setup_consumer(self.STOPIC) + producer = self._setup_producer(self.STOPIC, max_messages=total_records) + + partition = TopicPartition(self.STOPIC, 0) + + consumer.start() + self._await_all_members(consumer) + + producer.start() + wait_until(lambda: producer.num_acked == total_records, timeout_sec=20, + err_msg="Producer failed waiting for messages to be written") + + wait_until(lambda: consumer.committed(partition) == total_records, timeout_sec=10, + err_msg="Consumer failed to read all expected messages") + + assert consumer.position(partition) == total_records + + def test_valid_assignment(self): + consumer = self._setup_consumer(self.TOPIC) + consumer.start() + self._await_all_members(consumer) + assert self._valid_assignment(consumer.current_assignment()) + diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java new file mode 100644 index 0000000000000..9cad0c2dcd9e7 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -0,0 +1,609 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.io.Closeable; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +/** + * Command line consumer designed for system testing. It outputs consumer events to STDOUT as JSON + * formatted objects. The "name" field in each JSON event identifies the event type. The following + * events are currently supported: + * + *
    + *
  • partitions_revoked: outputs the partitions revoked through {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}. + * See {@link org.apache.kafka.tools.VerifiableConsumer.PartitionsRevoked}
  • + *
  • partitions_assigned: outputs the partitions assigned through {@link ConsumerRebalanceListener#onPartitionsAssigned(Collection)} + * See {@link org.apache.kafka.tools.VerifiableConsumer.PartitionsAssigned}.
  • + *
  • records_consumed: contains a summary of records consumed in a single call to {@link KafkaConsumer#poll(long)}. + * See {@link org.apache.kafka.tools.VerifiableConsumer.RecordsConsumed}.
  • + *
  • record_data: contains the key, value, and offset of an individual consumed record (only included if verbose + * output is enabled). See {@link org.apache.kafka.tools.VerifiableConsumer.RecordData}.
  • + *
  • offsets_committed: The result of every offset commit (only included if auto-commit is not enabled). + * See {@link org.apache.kafka.tools.VerifiableConsumer.OffsetsCommitted}
  • + *
+ */ +public class VerifiableConsumer implements Closeable, OffsetCommitCallback, ConsumerRebalanceListener { + + private final ObjectMapper mapper = new ObjectMapper(); + private final PrintStream out; + private final KafkaConsumer consumer; + private final String topic; + private final boolean useAutoCommit; + private final boolean useAsyncCommit; + private final boolean verbose; + private final int maxMessages; + private int consumedMessages = 0; + + private CountDownLatch shutdownLatch = new CountDownLatch(1); + + public VerifiableConsumer(KafkaConsumer consumer, + PrintStream out, + String topic, + int maxMessages, + boolean useAutoCommit, + boolean useAsyncCommit, + boolean verbose) { + this.consumer = consumer; + this.out = out; + this.topic = topic; + this.maxMessages = maxMessages; + this.useAutoCommit = useAutoCommit; + this.useAsyncCommit = useAsyncCommit; + this.verbose = verbose; + addKafkaSerializerModule(); + } + + private void addKafkaSerializerModule() { + SimpleModule kafka = new SimpleModule(); + kafka.addSerializer(TopicPartition.class, new JsonSerializer() { + @Override + public void serialize(TopicPartition tp, JsonGenerator gen, SerializerProvider serializers) throws IOException { + gen.writeStartObject(); + gen.writeObjectField("topic", tp.topic()); + gen.writeObjectField("partition", tp.partition()); + gen.writeEndObject(); + } + }); + mapper.registerModule(kafka); + } + + private boolean hasMessageLimit() { + return maxMessages >= 0; + } + + private boolean isFinished() { + return hasMessageLimit() && consumedMessages >= maxMessages; + } + + private Map onRecordsReceived(ConsumerRecords records) { + Map offsets = new HashMap<>(); + + List summaries = new ArrayList<>(); + for (TopicPartition tp : records.partitions()) { + List> partitionRecords = records.records(tp); + + if (hasMessageLimit() && consumedMessages + partitionRecords.size() > maxMessages) + partitionRecords = partitionRecords.subList(0, maxMessages - consumedMessages); + + if (partitionRecords.isEmpty()) + continue; + + long minOffset = partitionRecords.get(0).offset(); + long maxOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + + offsets.put(tp, new OffsetAndMetadata(maxOffset+1)); + summaries.add(new RecordSetSummary(tp.topic(), tp.partition(), + partitionRecords.size(), minOffset, maxOffset)); + + if (verbose) { + for (ConsumerRecord record : partitionRecords) + printJson(new RecordData(record)); + } + + consumedMessages += partitionRecords.size(); + if (isFinished()) + break; + } + + printJson(new RecordsConsumed(records.count(), summaries)); + return offsets; + } + + @Override + public void onComplete(Map offsets, Exception exception) { + List committedOffsets = new ArrayList<>(); + for (Map.Entry offsetEntry : offsets.entrySet()) { + TopicPartition tp = offsetEntry.getKey(); + committedOffsets.add(new CommitData(tp.topic(), tp.partition(), offsetEntry.getValue().offset())); + } + + boolean success = true; + String error = null; + if (exception != null) { + success = false; + error = exception.getMessage(); + } + printJson(new OffsetsCommitted(committedOffsets, error, success)); + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + printJson(new PartitionsAssigned(partitions)); + } + + @Override + public void onPartitionsRevoked(Collection partitions) { + printJson(new PartitionsRevoked(partitions)); + } + + private void printJson(Object data) { + try { + out.println(mapper.writeValueAsString(data)); + } catch (JsonProcessingException e) { + out.println("Bad data can't be written as json: " + e.getMessage()); + } + } + + public void commitSync(Map offsets) { + try { + consumer.commitSync(offsets); + onComplete(offsets, null); + } catch (WakeupException e) { + // we only call wakeup() once to close the consumer, so this recursion should be safe + commitSync(offsets); + throw e; + } catch (Exception e) { + onComplete(offsets, e); + } + } + + public void run() { + try { + consumer.subscribe(Arrays.asList(topic), this); + + while (true) { + ConsumerRecords records = consumer.poll(Long.MAX_VALUE); + Map offsets = onRecordsReceived(records); + + if (!useAutoCommit) { + if (useAsyncCommit) + consumer.commitAsync(offsets, this); + else + commitSync(offsets); + } + } + } catch (WakeupException e) { + // ignore, we are closing + } finally { + consumer.close(); + shutdownLatch.countDown(); + } + } + + public void close() { + boolean interrupted = false; + try { + consumer.wakeup(); + while (true) { + try { + shutdownLatch.await(); + return; + } catch (InterruptedException e) { + interrupted = true; + } + } + } finally { + if (interrupted) + Thread.currentThread().interrupt(); + } + } + + private static abstract class ConsumerEvent { + private static final String clazz = VerifiableConsumer.class.getName(); + + @JsonProperty + public abstract String name(); + + @JsonProperty("class") + public String clazz() { + return clazz; + } + } + + private static class PartitionsRevoked extends ConsumerEvent { + private final Collection partitions; + + public PartitionsRevoked(Collection partitions) { + this.partitions = partitions; + } + + @JsonProperty + public Collection partitions() { + return partitions; + } + + @Override + public String name() { + return "partitions_revoked"; + } + } + + private static class PartitionsAssigned extends ConsumerEvent { + private final Collection partitions; + + public PartitionsAssigned(Collection partitions) { + this.partitions = partitions; + } + + @JsonProperty + public Collection partitions() { + return partitions; + } + + @Override + public String name() { + return "partitions_assigned"; + } + } + + public static class RecordsConsumed extends ConsumerEvent { + private final long count; + private final List partitionSummaries; + + public RecordsConsumed(long count, List partitionSummaries) { + this.count = count; + this.partitionSummaries = partitionSummaries; + } + + @Override + public String name() { + return "records_consumed"; + } + + @JsonProperty + public long count() { + return count; + } + + @JsonProperty + public List partitions() { + return partitionSummaries; + } + } + + public static class RecordData extends ConsumerEvent { + + private final ConsumerRecord record; + + public RecordData(ConsumerRecord record) { + this.record = record; + } + + @Override + public String name() { + return "record_data"; + } + + @JsonProperty + public String topic() { + return record.topic(); + } + + @JsonProperty + public int partition() { + return record.partition(); + } + + @JsonProperty + public String key() { + return record.key(); + } + + @JsonProperty + public String value() { + return record.value(); + } + + @JsonProperty + public long offset() { + return record.offset(); + } + + } + + private static class PartitionData { + private final String topic; + private final int partition; + + public PartitionData(String topic, int partition) { + this.topic = topic; + this.partition = partition; + } + + @JsonProperty + public String topic() { + return topic; + } + + @JsonProperty + public int partition() { + return partition; + } + } + + private static class OffsetsCommitted extends ConsumerEvent { + + private final List offsets; + private final String error; + private final boolean success; + + public OffsetsCommitted(List offsets, String error, boolean success) { + this.offsets = offsets; + this.error = error; + this.success = success; + } + + @Override + public String name() { + return "offsets_committed"; + } + + @JsonProperty + public List offsets() { + return offsets; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public String error() { + return error; + } + + @JsonProperty + public boolean success() { + return success; + } + + } + + private static class CommitData extends PartitionData { + private final long offset; + + public CommitData(String topic, int partition, long offset) { + super(topic, partition); + this.offset = offset; + } + + @JsonProperty + public long offset() { + return offset; + } + } + + private static class RecordSetSummary extends PartitionData { + private final long count; + private final long minOffset; + private final long maxOffset; + + public RecordSetSummary(String topic, int partition, long count, long minOffset, long maxOffset) { + super(topic, partition); + this.count = count; + this.minOffset = minOffset; + this.maxOffset = maxOffset; + } + + @JsonProperty + public long count() { + return count; + } + + @JsonProperty + public long minOffset() { + return minOffset; + } + + @JsonProperty + public long maxOffset() { + return maxOffset; + } + + } + + private static ArgumentParser argParser() { + ArgumentParser parser = ArgumentParsers + .newArgumentParser("verifiable-consumer") + .defaultHelp(true) + .description("This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each \"send\" request, making externally visible which messages have been acked and which have not."); + + parser.addArgument("--topic") + .action(store()) + .required(true) + .type(String.class) + .metavar("TOPIC") + .help("Produce messages to this topic."); + + parser.addArgument("--groupId") + .action(store()) + .required(true) + .type(String.class) + .metavar("GROUP_ID") + .help("The groupId shared among members of the consumer group"); + + parser.addArgument("--max-messages") + .action(store()) + .required(false) + .type(Integer.class) + .setDefault(-1) + .metavar("MAX-MESSAGES") + .dest("maxMessages") + .help("Consume this many messages. If -1, the consumer will consume until the process is killed externally"); + + parser.addArgument("--session-timeout") + .action(store()) + .required(false) + .type(Integer.class) + .setDefault(30000) + .metavar("TIMEOUT_MS") + .dest("sessionTimeout") + .help("Set the consumer's session timeout"); + + parser.addArgument("--verbose") + .action(store()) + .required(false) + .setDefault(false) + .type(Boolean.class) + .metavar("VERBOSE") + .help("Set to false to only log rebalances and commits"); + + parser.addArgument("--enable-autocommit") + .action(store()) + .required(false) + .setDefault(false) + .type(Boolean.class) + .metavar("ENABLE-AUTOCOMMIT") + .dest("useAutoCommit") + .help("Enable offset auto-commit on consumer"); + + parser.addArgument("--broker-list") + .action(store()) + .required(true) + .type(String.class) + .metavar("HOST1:PORT1[,HOST2:PORT2[...]]") + .dest("brokerList") + .help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,..."); + + parser.addArgument("--consumer.config") + .action(store()) + .required(false) + .type(String.class) + .metavar("CONFIG_FILE") + .help("Consumer config properties file."); + + return parser; + } + + public static Properties loadProps(String filename) throws IOException { + Properties props = new Properties(); + InputStream propStream = null; + try { + propStream = new FileInputStream(filename); + props.load(propStream); + } finally { + if (propStream != null) + propStream.close(); + } + return props; + } + + public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] args) throws ArgumentParserException { + Namespace res = parser.parseArgs(args); + + String topic = res.getString("topic"); + int maxMessages = res.getInt("maxMessages"); + boolean useAutoCommit = res.getBoolean("useAutoCommit"); + boolean verbose = res.getBoolean("verbose"); + String configFile = res.getString("consumer.config"); + + Properties consumerProps = new Properties(); + if (configFile != null) { + try { + consumerProps.putAll(loadProps(configFile)); + } catch (IOException e) { + throw new ArgumentParserException(e.getMessage(), parser); + } + } + + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId")); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList")); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.toString(useAutoCommit)); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout"))); + + StringDeserializer deserializer= new StringDeserializer(); + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer); + + return new VerifiableConsumer( + consumer, + System.out, + topic, + maxMessages, + useAutoCommit, + false, + verbose); + } + + public static void main(String[] args) { + ArgumentParser parser = argParser(); + if (args.length == 0) { + parser.printHelp(); + System.exit(0); + } + + try { + final VerifiableConsumer consumer = createFromArgs(parser, args); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + consumer.close(); + } + }); + + consumer.run(); + } catch (ArgumentParserException e) { + parser.handleError(e); + System.exit(1); + } + } + +} From 95c179ffa7cde1b588879b8ce91686b32cc8f8f5 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Mon, 9 Nov 2015 10:34:11 -0800 Subject: [PATCH 2/5] rename consumer test class/filename --- .../main/scala/kafka/coordinator/GroupMetadataManager.scala | 3 --- .../{hello_verifiable_consumer.py => consumer_test.py} | 6 +++--- 2 files changed, 3 insertions(+), 6 deletions(-) rename tests/kafkatest/tests/{hello_verifiable_consumer.py => consumer_test.py} (95%) diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index 8e7b5c1842bab..f98fc7402fe2d 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -124,9 +124,6 @@ class GroupMetadataManager(val brokerId: Int, groupsCache.get(groupId) } - - - /** * Update the current cached metadata for the group with the given groupId or add the group if there is none. */ diff --git a/tests/kafkatest/tests/hello_verifiable_consumer.py b/tests/kafkatest/tests/consumer_test.py similarity index 95% rename from tests/kafkatest/tests/hello_verifiable_consumer.py rename to tests/kafkatest/tests/consumer_test.py index 41caaa505086f..4c554d1bcf15b 100644 --- a/tests/kafkatest/tests/hello_verifiable_consumer.py +++ b/tests/kafkatest/tests/consumer_test.py @@ -29,7 +29,7 @@ def partitions_for(topic, num_partitions): partitions.add(TopicPartition(topic=topic, partition=i)) return partitions -class HelloVerifiableConsumerTest(KafkaTest): +class VerifiableConsumerTest(KafkaTest): STOPIC = "simple_topic" TOPIC = "test_topic" @@ -38,7 +38,7 @@ class HelloVerifiableConsumerTest(KafkaTest): GROUP_ID = "test_group_id" def __init__(self, test_context): - super(HelloVerifiableConsumerTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={ + super(VerifiableConsumerTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={ self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 }, self.STOPIC : { 'partitions': 1, 'replication-factor': 2 } }) @@ -47,7 +47,7 @@ def __init__(self, test_context): def min_cluster_size(self): """Override this since we're adding services outside of the constructor""" - return super(HelloVerifiableConsumerTest, self).min_cluster_size() + self.num_consumers + self.num_producers + return super(VerifiableConsumerTest, self).min_cluster_size() + self.num_consumers + self.num_producers def _partitions(self, assignment): partitions = [] From 41cfb38d87e1a46bcbfdec0e4935db2aff29eb22 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Mon, 9 Nov 2015 11:29:57 -0800 Subject: [PATCH 3/5] fix checkstyle errors --- checkstyle/import-control.xml | 1 + .../java/org/apache/kafka/tools/VerifiableConsumer.java | 8 +++----- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 95ea3b7b28cf6..908fd351d792c 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -108,6 +108,7 @@ + diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index 9cad0c2dcd9e7..0210719b48cb9 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -142,7 +142,7 @@ private Map onRecordsReceived(ConsumerRecords long minOffset = partitionRecords.get(0).offset(); long maxOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); - offsets.put(tp, new OffsetAndMetadata(maxOffset+1)); + offsets.put(tp, new OffsetAndMetadata(maxOffset + 1)); summaries.add(new RecordSetSummary(tp.topic(), tp.partition(), partitionRecords.size(), minOffset, maxOffset)); @@ -250,14 +250,12 @@ public void close() { } private static abstract class ConsumerEvent { - private static final String clazz = VerifiableConsumer.class.getName(); - @JsonProperty public abstract String name(); @JsonProperty("class") public String clazz() { - return clazz; + return VerifiableConsumer.class.getName(); } } @@ -570,7 +568,7 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout"))); - StringDeserializer deserializer= new StringDeserializer(); + StringDeserializer deserializer = new StringDeserializer(); KafkaConsumer consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer); return new VerifiableConsumer( From ce6a3dfc396976d793d7f6066f79ee14344a3687 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Mon, 9 Nov 2015 17:22:39 -0800 Subject: [PATCH 4/5] address review comments --- tests/kafkatest/services/kafka/util.py | 18 +++++ .../kafkatest/services/verifiable_consumer.py | 25 +++--- tests/kafkatest/tests/consumer_test.py | 30 +++++-- .../kafka/tools/VerifiableConsumer.java | 78 ++++++++++--------- 4 files changed, 97 insertions(+), 54 deletions(-) create mode 100644 tests/kafkatest/services/kafka/util.py diff --git a/tests/kafkatest/services/kafka/util.py b/tests/kafkatest/services/kafka/util.py new file mode 100644 index 0000000000000..983f5881767ac --- /dev/null +++ b/tests/kafkatest/services/kafka/util.py @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import namedtuple + +TopicPartition = namedtuple('TopicPartition', ['topic', 'partition']) diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index 2cb8357cfc266..dc2b3cc681237 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -18,6 +18,7 @@ from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK from kafkatest.services.kafka.version import TRUNK from kafkatest.services.security.security_config import SecurityConfig +from kafkatest.services.kafka.util import TopicPartition from collections import namedtuple import json @@ -26,8 +27,6 @@ import time import signal -TopicPartition = namedtuple('TopicPartition', ['topic', 'partition']) - class VerifiableConsumer(BackgroundThreadService): PERSISTENT_ROOT = "/mnt/verifiable_consumer" STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.stdout") @@ -96,6 +95,8 @@ def _worker(self, idx, node): if event is not None: with self.lock: name = event["name"] + if name == "shutdown_complete": + self._handle_shutdown_complete(node) if name == "offsets_committed": self._handle_offsets_committed(node, event) elif name == "records_consumed": @@ -105,6 +106,10 @@ def _worker(self, idx, node): elif name == "partitions_assigned": self._handle_partitions_assigned(node, event) + def _handle_shutdown_complete(self, node): + if node in self.joined: + self.joined.remove(node) + def _handle_offsets_committed(self, node, event): if event["success"]: for offset_commit in event["offsets"]: @@ -143,7 +148,7 @@ def start_cmd(self, node): cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer" \ - " --groupId %s --topic %s --broker-list %s --session-timeout %s" % \ + " --group-id %s --topic %s --broker-list %s --session-timeout %s" % \ (self.group_id, self.topic, self.kafka.bootstrap_servers(), self.session_timeout) if self.max_messages > 0: cmd += " --max-messages %s" % str(self.max_messages) @@ -164,13 +169,12 @@ def pids(self, node): def try_parse_json(self, string): """Try to parse a string as json. Return None if not parseable.""" try: - record = json.loads(string) - return record + return json.loads(string) except ValueError: self.logger.debug("Could not parse as json: %s" % str(string)) return None - def kill_consumer(self, node, clean_shutdown=True, allow_fail=False): + def kill_node(self, node, clean_shutdown=True, allow_fail=False): if clean_shutdown: sig = signal.SIGTERM else: @@ -178,8 +182,11 @@ def kill_consumer(self, node, clean_shutdown=True, allow_fail=False): for pid in self.pids(node): node.account.signal(pid, sig, allow_fail) - def stop_node(self, node): - self.kill_consumer(node) + if not clean_shutdown: + self._handle_shutdown_complete(node) + + def stop_node(self, node, clean_shutdown=True, allow_fail=False): + self.kill_node(node, clean_shutdown, allow_fail) if self.worker_threads is None: return @@ -190,7 +197,7 @@ def stop_node(self, node): self.worker_threads[self.idx(node) - 1].join() def clean_node(self, node): - self.kill_consumer(node, clean_shutdown=False) + self.kill_node(node, clean_shutdown=False) node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False) self.security_config.clean_node(node) diff --git a/tests/kafkatest/tests/consumer_test.py b/tests/kafkatest/tests/consumer_test.py index 4c554d1bcf15b..fa74cb982ad25 100644 --- a/tests/kafkatest/tests/consumer_test.py +++ b/tests/kafkatest/tests/consumer_test.py @@ -13,15 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from kafkatest.tests.kafka_test import KafkaTest +from ducktape.mark import matrix from ducktape.utils.util import wait_until +from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.verifiable_consumer import VerifiableConsumer, TopicPartition - -import time +from kafkatest.services.verifiable_consumer import VerifiableConsumer +from kafkatest.services.kafka.util import TopicPartition def partitions_for(topic, num_partitions): partitions = set() @@ -29,6 +29,7 @@ def partitions_for(topic, num_partitions): partitions.add(TopicPartition(topic=topic, partition=i)) return partitions + class VerifiableConsumerTest(KafkaTest): STOPIC = "simple_topic" @@ -44,6 +45,7 @@ def __init__(self, test_context): }) self.num_producers = 1 self.num_consumers = 2 + self.session_timeout = 10000 def min_cluster_size(self): """Override this since we're adding services outside of the constructor""" @@ -61,7 +63,7 @@ def _valid_assignment(self, assignment): def _setup_consumer(self, topic): return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka, - topic, self.GROUP_ID, session_timeout=10000) + topic, self.GROUP_ID, session_timeout=self.session_timeout) def _setup_producer(self, topic, max_messages=-1): return VerifiableProducer(self.test_context, self.num_producers, @@ -84,12 +86,20 @@ def test_consumer_failure(self): partition_owner = consumer.owner(partition) assert partition_owner is not None + # startup the producer and ensure that some records have been written producer.start() wait_until(lambda: producer.num_acked > 1000, timeout_sec=20, err_msg="Producer failed waiting for messages to be written") - consumer.kill_consumer(partition_owner) - time.sleep(10) + # stop the partition owner and await its shutdown + consumer.kill_node(partition_owner, clean_shutdown=True) + wait_until(lambda: len(consumer.joined) == 1, timeout_sec=20, + err_msg="Timed out waiting for consumer to close") + + # ensure that the remaining consumer does some work after rebalancing + current_total_records = consumer.total_records + wait_until(lambda: consumer.total_records > current_total_records + 1000, timeout_sec=20, + err_msg="Timed out waiting for additional records to be consumed after first consumer failed") # if the total records consumed matches the current position, # we haven't seen any duplicates @@ -108,7 +118,11 @@ def test_broker_failure(self): # shutdown one of the brokers self.kafka.signal_node(self.kafka.nodes[0]) - time.sleep(10) + + # ensure that the remaining consumer does some work after broker failure + current_total_records = consumer.total_records + wait_until(lambda: consumer.total_records > current_total_records + 1000, timeout_sec=20, + err_msg="Timed out waiting for additional records to be consumed after first consumer failed") # if the total records consumed matches the current position, # we haven't seen any duplicates diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index 0210719b48cb9..93c0bc669d08e 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -38,11 +38,10 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Utils; import java.io.Closeable; -import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStream; import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; @@ -54,6 +53,8 @@ import java.util.concurrent.CountDownLatch; import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; + /** * Command line consumer designed for system testing. It outputs consumer events to STDOUT as JSON @@ -71,6 +72,8 @@ * output is enabled). See {@link org.apache.kafka.tools.VerifiableConsumer.RecordData}. *
  • offsets_committed: The result of every offset commit (only included if auto-commit is not enabled). * See {@link org.apache.kafka.tools.VerifiableConsumer.OffsetsCommitted}
  • + *
  • shutdown_complete: emitted after the consumer returns from {@link KafkaConsumer#close()}. + * See {@link org.apache.kafka.tools.VerifiableConsumer.ShutdownComplete}.
  • * */ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, ConsumerRebalanceListener { @@ -227,6 +230,7 @@ public void run() { // ignore, we are closing } finally { consumer.close(); + printJson(new ShutdownComplete()); shutdownLatch.countDown(); } } @@ -259,6 +263,14 @@ public String clazz() { } } + private static class ShutdownComplete extends ConsumerEvent { + + @Override + public String name() { + return "shutdown_complete"; + } + } + private static class PartitionsRevoked extends ConsumerEvent { private final Collection partitions; @@ -462,20 +474,29 @@ private static ArgumentParser argParser() { ArgumentParser parser = ArgumentParsers .newArgumentParser("verifiable-consumer") .defaultHelp(true) - .description("This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each \"send\" request, making externally visible which messages have been acked and which have not."); + .description("This tool consumes messages from a specific topic and emits consumer events (e.g. group rebalances, received messages, and offsets committed) as JSON objects to STDOUT."); + + parser.addArgument("--broker-list") + .action(store()) + .required(true) + .type(String.class) + .metavar("HOST1:PORT1[,HOST2:PORT2[...]]") + .dest("brokerList") + .help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,..."); parser.addArgument("--topic") .action(store()) .required(true) .type(String.class) .metavar("TOPIC") - .help("Produce messages to this topic."); + .help("Consumes messages from this topic."); - parser.addArgument("--groupId") + parser.addArgument("--group-id") .action(store()) .required(true) .type(String.class) .metavar("GROUP_ID") + .dest("groupId") .help("The groupId shared among members of the consumer group"); parser.addArgument("--max-messages") @@ -485,78 +506,61 @@ private static ArgumentParser argParser() { .setDefault(-1) .metavar("MAX-MESSAGES") .dest("maxMessages") - .help("Consume this many messages. If -1, the consumer will consume until the process is killed externally"); + .help("Consume this many messages. If -1 (the default), the consumer will consume until the process is killed externally"); parser.addArgument("--session-timeout") .action(store()) .required(false) - .type(Integer.class) .setDefault(30000) + .type(Integer.class) .metavar("TIMEOUT_MS") .dest("sessionTimeout") .help("Set the consumer's session timeout"); parser.addArgument("--verbose") - .action(store()) - .required(false) - .setDefault(false) + .action(storeTrue()) .type(Boolean.class) .metavar("VERBOSE") - .help("Set to false to only log rebalances and commits"); + .help("Enable to log individual consumed records"); parser.addArgument("--enable-autocommit") - .action(store()) - .required(false) - .setDefault(false) + .action(storeTrue()) .type(Boolean.class) .metavar("ENABLE-AUTOCOMMIT") .dest("useAutoCommit") .help("Enable offset auto-commit on consumer"); - parser.addArgument("--broker-list") + parser.addArgument("--reset-policy") .action(store()) - .required(true) + .required(false) + .setDefault("earliest") .type(String.class) - .metavar("HOST1:PORT1[,HOST2:PORT2[...]]") - .dest("brokerList") - .help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,..."); + .dest("resetPolicy") + .help("Set reset policy (must be either 'earliest', 'latest', or 'none'"); parser.addArgument("--consumer.config") .action(store()) .required(false) .type(String.class) .metavar("CONFIG_FILE") - .help("Consumer config properties file."); + .help("Consumer config properties file (config options shared with command line parameters will be overridden)."); return parser; } - public static Properties loadProps(String filename) throws IOException { - Properties props = new Properties(); - InputStream propStream = null; - try { - propStream = new FileInputStream(filename); - props.load(propStream); - } finally { - if (propStream != null) - propStream.close(); - } - return props; - } - public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] args) throws ArgumentParserException { Namespace res = parser.parseArgs(args); String topic = res.getString("topic"); - int maxMessages = res.getInt("maxMessages"); boolean useAutoCommit = res.getBoolean("useAutoCommit"); + int maxMessages = res.getInt("maxMessages"); boolean verbose = res.getBoolean("verbose"); String configFile = res.getString("consumer.config"); Properties consumerProps = new Properties(); if (configFile != null) { try { - consumerProps.putAll(loadProps(configFile)); + consumerProps.putAll(Utils.loadProps(configFile)); } catch (IOException e) { throw new ArgumentParserException(e.getMessage(), parser); } @@ -564,8 +568,8 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId")); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList")); - consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.toString(useAutoCommit)); - consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy")); consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout"))); StringDeserializer deserializer = new StringDeserializer(); From 4de5d22ce8babef11bd265f679ce6167335c861e Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Mon, 9 Nov 2015 18:05:20 -0800 Subject: [PATCH 5/5] address geoff's comments --- tests/kafkatest/services/kafka/__init__.py | 1 + tests/kafkatest/services/verifiable_consumer.py | 2 +- tests/kafkatest/tests/consumer_test.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/kafkatest/services/kafka/__init__.py b/tests/kafkatest/services/kafka/__init__.py index 6408b5904e8a7..cd29e4b4ce5cf 100644 --- a/tests/kafkatest/services/kafka/__init__.py +++ b/tests/kafkatest/services/kafka/__init__.py @@ -14,3 +14,4 @@ # limitations under the License. from kafka import KafkaService +from util import TopicPartition diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index dc2b3cc681237..7d7616638ea69 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -18,7 +18,7 @@ from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK from kafkatest.services.kafka.version import TRUNK from kafkatest.services.security.security_config import SecurityConfig -from kafkatest.services.kafka.util import TopicPartition +from kafkatest.services.kafka import TopicPartition from collections import namedtuple import json diff --git a/tests/kafkatest/tests/consumer_test.py b/tests/kafkatest/tests/consumer_test.py index fa74cb982ad25..707ad2f75478d 100644 --- a/tests/kafkatest/tests/consumer_test.py +++ b/tests/kafkatest/tests/consumer_test.py @@ -21,7 +21,7 @@ from kafkatest.services.kafka import KafkaService from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.verifiable_consumer import VerifiableConsumer -from kafkatest.services.kafka.util import TopicPartition +from kafkatest.services.kafka import TopicPartition def partitions_for(topic, num_partitions): partitions = set()