From fb1a6da01c27ca32d98207b20150b767cb9b68f6 Mon Sep 17 00:00:00 2001 From: "Colin P. Mccabe" Date: Wed, 7 Jun 2017 14:24:19 -0700 Subject: [PATCH] KAFKA-5404: Add more AdminClient checks to ClientCompatibilityTest --- .../client_compatibility_features_test.py | 16 ++- .../kafka/tools/ClientCompatibilityTest.java | 121 ++++++++++++++++-- 2 files changed, 119 insertions(+), 18 deletions(-) diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py index 978b72e99cdb9..a10c376b90f4e 100644 --- a/tests/kafkatest/tests/client/client_compatibility_features_test.py +++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py @@ -23,18 +23,24 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from ducktape.tests.test import Test -from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, V_0_10_1_0, KafkaVersion +from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, V_0_11_0_0, V_0_10_1_0, KafkaVersion def get_broker_features(broker_version): features = {} if (broker_version < V_0_10_1_0): + features["create-topics-supported"] = False features["offsets-for-times-supported"] = False features["cluster-id-supported"] = False features["expect-record-too-large-exception"] = True else: + features["create-topics-supported"] = True features["offsets-for-times-supported"] = True features["cluster-id-supported"] = True features["expect-record-too-large-exception"] = False + if (broker_version < V_0_11_0_0): + features["describe-acls-supported"] = False + else: + features["describe-acls-supported"] = True return features def run_command(node, cmd, ssh_log_file): @@ -74,17 +80,13 @@ def invoke_compatibility_program(self, features): node = self.zk.nodes[0] cmd = ("%s org.apache.kafka.tools.ClientCompatibilityTest " "--bootstrap-server %s " - "--offsets-for-times-supported %s " - "--cluster-id-supported %s " - "--expect-record-too-large-exception %s " "--num-cluster-nodes %d " "--topic %s " % (self.zk.path.script("kafka-run-class.sh", node), self.kafka.bootstrap_servers(), - features["offsets-for-times-supported"], - features["cluster-id-supported"], - features["expect-record-too-large-exception"], len(self.kafka.nodes), self.topics.keys()[0])) + for k, v in features.iteritems(): + cmd = cmd + ("--%s %s " % (k, v)) results_dir = TestContext.results_dir(self.test_context, 0) os.makedirs(results_dir) ssh_log_file = "%s/%s" % (results_dir, "client_compatibility_test_output.txt") diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java index 4fce8ced94c27..431b53b4e8043 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java @@ -22,6 +22,8 @@ import net.sourceforge.argparse4j.inf.Namespace; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -37,7 +39,10 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SecurityDisabledException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; @@ -49,6 +54,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -56,6 +62,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import static net.sourceforge.argparse4j.impl.Arguments.store; @@ -75,6 +82,8 @@ static class TestConfig { final boolean expectClusterId; final boolean expectRecordTooLargeException; final int numClusterNodes; + final boolean createTopicsSupported; + final boolean describeAclsSupported; TestConfig(Namespace res) { this.bootstrapServer = res.getString("bootstrapServer"); @@ -83,6 +92,8 @@ static class TestConfig { this.expectClusterId = res.getBoolean("clusterIdSupported"); this.expectRecordTooLargeException = res.getBoolean("expectRecordTooLargeException"); this.numClusterNodes = res.getInt("numClusterNodes"); + this.createTopicsSupported = res.getBoolean("createTopicsSupported"); + this.describeAclsSupported = res.getBoolean("describeAclsSupported"); } } @@ -135,6 +146,20 @@ public static void main(String[] args) throws Exception { .dest("numClusterNodes") .metavar("NUM_CLUSTER_NODES") .help("The number of cluster nodes we should expect to see from the AdminClient."); + parser.addArgument("--create-topics-supported") + .action(store()) + .required(true) + .type(Boolean.class) + .dest("createTopicsSupported") + .metavar("CREATE_TOPICS_SUPPORTED") + .help("Whether we should be able to create topics via the AdminClient."); + parser.addArgument("--describe-acls-supported") + .action(store()) + .required(true) + .type(Boolean.class) + .dest("describeAclsSupported") + .metavar("DESCRIBE_ACLS_SUPPORTED") + .help("Whether describeAcls is supported in the AdminClient."); Namespace res = null; try { @@ -196,7 +221,7 @@ private static void compareArrays(byte[] a, byte[] b) { this.message2 = buf2.array(); } - void run() throws Exception { + void run() throws Throwable { long prodTimeMs = Time.SYSTEM.milliseconds(); testAdminClient(); testProduce(); @@ -218,10 +243,10 @@ public void testProduce() throws Exception { producer.close(); } - void testAdminClient() throws Exception { + void testAdminClient() throws Throwable { Properties adminProps = new Properties(); adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer); - try (AdminClient client = AdminClient.create(adminProps)) { + try (final AdminClient client = AdminClient.create(adminProps)) { while (true) { Collection nodes = client.describeCluster().nodes().get(); if (nodes.size() == testConfig.numClusterNodes) { @@ -234,6 +259,63 @@ void testAdminClient() throws Exception { log.info("Saw only {} cluster nodes. Waiting to see {}.", nodes.size(), testConfig.numClusterNodes); } + tryFeature("createTopics", testConfig.createTopicsSupported, + new Invoker() { + @Override + public void invoke() throws Throwable { + try { + client.createTopics(Collections.singleton( + new NewTopic("newtopic", 1, (short) 1))).all().get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + }, + new ResultTester() { + @Override + public void test() throws Throwable { + while (true) { + try { + client.describeTopics(Collections.singleton("newtopic")).all().get(); + break; + } catch (ExecutionException e) { + if (e.getCause() instanceof UnknownTopicOrPartitionException) + continue; + throw e; + } + } + } + }); + while (true) { + Collection listings = client.listTopics().descriptions().get(); + if (!testConfig.createTopicsSupported) + break; + boolean foundNewTopic = false; + for (TopicListing listing : listings) { + if (listing.name().equals("newtopic")) { + if (listing.internal()) + throw new KafkaException("Did not expect newtopic to be an internal topic."); + foundNewTopic = true; + } + } + if (foundNewTopic) + break; + Thread.sleep(1); + log.info("Did not see newtopic. Retrying listTopics..."); + } + tryFeature("describeAclsSupported", testConfig.describeAclsSupported, + new Invoker() { + @Override + public void invoke() throws Throwable { + try { + client.describeAcls(AclBindingFilter.ANY).all().get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof SecurityDisabledException) + return; + throw e.getCause(); + } + } + }); } } @@ -282,7 +364,7 @@ public void onUpdate(ClusterResource clusterResource) { } } - public void testConsume(final long prodTimeMs) throws Exception { + public void testConsume(final long prodTimeMs) throws Throwable { Properties consumerProps = new Properties(); consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer); consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 512); @@ -301,15 +383,15 @@ public void testConsume(final long prodTimeMs) throws Exception { } final OffsetsForTime offsetsForTime = new OffsetsForTime(); tryFeature("offsetsForTimes", testConfig.offsetsForTimesSupported, - new Runnable() { + new Invoker() { @Override - public void run() { + public void invoke() { offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch); } }, - new Runnable() { + new ResultTester() { @Override - public void run() { + public void test() { log.info("offsetsForTime = {}", offsetsForTime.result); } }); @@ -393,9 +475,26 @@ public void remove() { log.info("Closed consumer."); } - private void tryFeature(String featureName, boolean supported, Runnable invoker, Runnable resultTester) { + private interface Invoker { + void invoke() throws Throwable; + } + + private interface ResultTester { + void test() throws Throwable; + } + + private void tryFeature(String featureName, boolean supported, Invoker invoker) throws Throwable { + tryFeature(featureName, supported, invoker, new ResultTester() { + @Override + public void test() { + } + }); + } + + private void tryFeature(String featureName, boolean supported, Invoker invoker, ResultTester resultTester) + throws Throwable { try { - invoker.run(); + invoker.invoke(); log.info("Successfully used feature {}", featureName); } catch (UnsupportedVersionException e) { log.info("Got UnsupportedVersionException when attempting to use feature {}", featureName); @@ -407,6 +506,6 @@ private void tryFeature(String featureName, boolean supported, Runnable invoker, if (!supported) { throw new RuntimeException("Did not expect " + featureName + " to be supported, but it was."); } - resultTester.run(); + resultTester.test(); } }