From 9808c15b6d232e120f80e72cd5c3b90beba196f9 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 2 Jun 2017 00:21:56 +0100 Subject: [PATCH 1/6] KAFKA-5272: Add AlterConfigPolicy --- .../server/policy/AlterConfigPolicy.java | 84 +++++++++++++++++++ .../scala/kafka/server/AdminManager.scala | 34 ++++++-- .../main/scala/kafka/server/KafkaConfig.scala | 5 ++ 3 files changed, 115 insertions(+), 8 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java diff --git a/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java b/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java new file mode 100644 index 0000000000000..18536d7d40487 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.policy; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.PolicyViolationException; + +import java.util.Map; + +/** + * An interface for enforcing a policy on alter configs requests. + * + * Common use cases are requiring that the replication factor, min.insync.replicas and/or retention settings for a + * topic remain within an allowable range. + * + * If alter.config.policy.class.name is defined, Kafka will create an instance of the specified class + * using the default constructor and will then pass the broker configs to its configure() method. During + * broker shutdown, the close() method will be invoked so that resources can be released (if necessary). + */ +public interface AlterConfigPolicy extends Configurable, AutoCloseable { + + /** + * Class containing the create request parameters. + */ + class RequestMetadata { + + private final ConfigResource resource; + private final Map configs; + + /** + * Create an instance of this class with the provided parameters. + * + * This constructor is public to make testing of AlterConfigPolicy implementations easier. + */ + public RequestMetadata(ConfigResource resource, Map configs) { + this.resource = resource; + this.configs = configs; + } + + /** + * Return the configs in the request. + */ + public Map configs() { + return configs; + } + + public ConfigResource resource() { + return resource; + } + + @Override + public String toString() { + return "AlterConfigPolicy.RequestMetadata(resource=" + resource + + ", configs=" + configs + ")"; + } + } + + /** + * Validate the request parameters and throw a PolicyViolationException with a suitable error + * message if the alter configs request parameters for the provided resource do not satisfy this policy. + * + * Clients will receive the POLICY_VIOLATION error code along with the exception's message. Note that validation + * failure only affects the relevant resource, other resources in the request will still be processed. + * + * @param requestMetadata the alter configs request parameters for the provided resource. + * @throws PolicyViolationException if the request parameters do not satisfy this policy. + */ + void validate(RequestMetadata requestMetadata) throws PolicyViolationException; +} diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index c147593d0b51b..33c6b77d41007 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -23,14 +23,14 @@ import kafka.common.TopicAlreadyMarkedForDeletionException import kafka.log.LogConfig import kafka.metrics.KafkaMetricsGroup import kafka.utils._ -import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException} +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource} import org.apache.kafka.common.errors.{ApiException, InvalidRequestException, PolicyViolationException} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.CreateTopicsRequest._ import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse, Resource, ResourceType} -import org.apache.kafka.server.policy.CreateTopicPolicy +import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy} import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata import scala.collection._ @@ -47,6 +47,9 @@ class AdminManager(val config: KafkaConfig, private val createTopicPolicy = Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[CreateTopicPolicy])) + private val alterConfigPolicy = + Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy])) + def hasDelayedTopicOperations = topicPurgatory.delayed != 0 /** @@ -255,14 +258,28 @@ class AdminManager(val config: KafkaConfig, resource.`type` match { case ResourceType.TOPIC => val topic = resource.name + val properties = new Properties config.entries.asScala.foreach { configEntry => properties.setProperty(configEntry.name(), configEntry.value()) } - if (validateOnly) - AdminUtils.validateTopicConfig(zkUtils, topic, properties) - else - AdminUtils.changeTopicConfig(zkUtils, topic, properties) + + alterConfigPolicy match { + case Some(policy) => + AdminUtils.validateTopicConfig(zkUtils, topic, properties) + + val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap + policy.validate(new AlterConfigPolicy.RequestMetadata( + new ConfigResource(ConfigResource.Type.TOPIC, resource.name), configEntriesMap.asJava)) + + if (!validateOnly) + AdminUtils.changeTopicConfig(zkUtils, topic, properties) + case None => + if (validateOnly) + AdminUtils.validateTopicConfig(zkUtils, topic, properties) + else + AdminUtils.changeTopicConfig(zkUtils, topic, properties) + } resource -> new ApiError(Errors.NONE, null) case resourceType => throw new InvalidRequestException(s"AlterConfigs is only supported for topics, but resource type is $resourceType") @@ -274,8 +291,8 @@ class AdminManager(val config: KafkaConfig, resource -> ApiError.fromThrowable(new InvalidRequestException(message, e)) case e: Throwable => // Log client errors at a lower level than unexpected exceptions - val message = s"Error processing alter configs request for resource $resource" - if (e.isInstanceOf[ApiException]) + val message = s"Error processing alter configs request for resource $resource, config $config" + if (e.isInstanceOf[ApiException] || e.isInstanceOf[PolicyViolationException]) info(message, e) else error(message, e) @@ -287,5 +304,6 @@ class AdminManager(val config: KafkaConfig, def shutdown() { topicPurgatory.shutdown() CoreUtils.swallow(createTopicPolicy.foreach(_.close())) + CoreUtils.swallow(alterConfigPolicy.foreach(_.close())) } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 6e94043c4fb08..fe47fd0f8cb86 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -301,6 +301,7 @@ object KafkaConfig { val AutoCreateTopicsEnableProp = "auto.create.topics.enable" val MinInSyncReplicasProp = "min.insync.replicas" val CreateTopicPolicyClassNameProp = "create.topic.policy.class.name" + val AlterConfigPolicyClassNameProp = "alter.config.policy.class.name" /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms" val DefaultReplicationFactorProp = "default.replication.factor" @@ -529,6 +530,9 @@ object KafkaConfig { val CreateTopicPolicyClassNameDoc = "The create topic policy class that should be used for validation. The class should " + "implement the org.apache.kafka.server.policy.CreateTopicPolicy interface." + val AlterConfigPolicyClassNameDoc = "The alter configs policy class that should be used for validation. The class should " + + "implement the org.apache.kafka.server.policy.AlterConfigPolicy interface." + /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels" val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels" @@ -748,6 +752,7 @@ object KafkaConfig { .define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc) .define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc) .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc) + .define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, AlterConfigPolicyClassNameDoc) /** ********* Replication configuration ***********/ .define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc) From bd0b516ed31ebd4a151617cef14f55e2e0205814 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 2 Jun 2017 17:29:17 +0100 Subject: [PATCH 2/6] Initial tests --- ...minClientWithPoliciesIntegrationTest.scala | 92 ++++++++ .../api/KafkaAdminClientIntegrationTest.scala | 203 ++++++++++-------- 2 files changed, 201 insertions(+), 94 deletions(-) create mode 100644 core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala new file mode 100644 index 0000000000000..e42481c2667c6 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala @@ -0,0 +1,92 @@ +package kafka.api + +import java.util + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{Logging, TestUtils} +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.server.policy.AlterConfigPolicy +import org.junit.{After, Before, Rule, Test} +import org.junit.rules.Timeout + +import scala.collection.JavaConverters._ + +/** + * Tests AdminClient calls when the broker is configured with policies like AlterConfigPolicy, CreateTopicPolicy, etc. + */ +class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with Logging { + + import AdminClientWithPoliciesIntegrationTest._ + + var client: AdminClient = null + val brokerCount = 2 + + @Rule + def globalTimeout = Timeout.millis(120000) + + @Before + override def setUp(): Unit = { + super.setUp + TestUtils.waitUntilBrokerMetadataIsPropagated(servers) + } + + @After + override def tearDown(): Unit = { + if (client != null) + Utils.closeQuietly(client, "AdminClient") + super.tearDown() + } + + def createConfig: util.Map[String, Object] = + Map[String, Object](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList).asJava + + override def generateConfigs = { + val configs = TestUtils.createBrokerConfigs(brokerCount, zkConnect) + configs.foreach(props => props.put(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[Policy])) + configs.map(KafkaConfig.fromProps) + } + + @Test + def testValidDescribeAlterConfigs(): Unit = { + client = AdminClient.create(createConfig) + KafkaAdminClientIntegrationTest.checkValidDescribeAlterConfigs(zkUtils, servers, client) + } + + @Test + def testInvalidAlterConfigs(): Unit = { + client = AdminClient.create(createConfig) + KafkaAdminClientIntegrationTest.checkInvalidAlterConfigs(zkUtils, servers, client) + } + + @Test + def testInvalidAlterConfigsDueToPolicy(): Unit = { + client = AdminClient.create(createConfig) + } + + +} + +object AdminClientWithPoliciesIntegrationTest { + + class Policy extends AlterConfigPolicy { + + var configs: Map[String, _] = _ + var closed = false + + def configure(configs: util.Map[String, _]): Unit = { + this.configs = configs.asScala.toMap + } + + def validate(requestMetadata: AlterConfigPolicy.RequestMetadata): Unit = { + require(!closed, "Policy should not be closed") + require(!configs.isEmpty, "configure should have been called with non empty configs") + require(!requestMetadata.configs.isEmpty, "request configs should not be empty") + require(requestMetadata.resource.name.nonEmpty, "resource name should not be empty") + } + + def close(): Unit = closed = true + + } +} diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala index 0e21da7fd012d..b8cf92a6700a5 100644 --- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala @@ -23,9 +23,9 @@ import java.util.concurrent.{ExecutionException, TimeUnit} import org.apache.kafka.common.utils.{Time, Utils} import kafka.integration.KafkaServerTestHarness import kafka.log.LogConfig -import kafka.server.{Defaults, KafkaConfig} +import kafka.server.{Defaults, KafkaConfig, KafkaServer} import org.apache.kafka.clients.admin._ -import kafka.utils.{Logging, TestUtils} +import kafka.utils.{Logging, TestUtils, ZkUtils} import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.common.KafkaFuture import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} @@ -47,6 +47,8 @@ import scala.collection.JavaConverters._ */ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Logging { + import KafkaAdminClientIntegrationTest._ + @Rule def globalTimeout = Timeout.millis(120000) @@ -179,7 +181,111 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin @Test def testDescribeAndAlterConfigs(): Unit = { client = AdminClient.create(createConfig) + checkValidDescribeAlterConfigs(zkUtils, servers, client) + } + + @Test + def testInvalidAlterConfigs(): Unit = { + client = AdminClient.create(createConfig) + checkInvalidAlterConfigs(zkUtils, servers, client) + } + + val ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) + + /** + * Test that ACL operations are not possible when the authorizer is disabled. + * Also see {@link kafka.api.SaslSslAdminClientIntegrationTest} for tests of ACL operations + * when the authorizer is enabled. + */ + @Test + def testAclOperations(): Unit = { + client = AdminClient.create(createConfig()) + assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).all(), classOf[SecurityDisabledException]) + assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(ACL1)).all(), + classOf[SecurityDisabledException]) + assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(ACL1.toFilter())).all(), + classOf[SecurityDisabledException]) + client.close() + } + + /** + * Test closing the AdminClient with a generous timeout. Calls in progress should be completed, + * since they can be done within the timeout. New calls should receive timeouts. + */ + @Test + def testDelayedClose(): Unit = { + client = AdminClient.create(createConfig()) + val topics = Seq("mytopic", "mytopic2") + val newTopics = topics.map(new NewTopic(_, 1, 1)) + val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all() + client.close(2, TimeUnit.HOURS) + val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all() + assertFutureExceptionTypeEquals(future2, classOf[TimeoutException]) + future.get + client.close(30, TimeUnit.MINUTES) // multiple close-with-timeout should have no effect + } + + /** + * Test closing the AdminClient with a timeout of 0, when there are calls with extremely long + * timeouts in progress. The calls should be aborted after the hard shutdown timeout elapses. + */ + @Test + def testForceClose(): Unit = { + val config = createConfig() + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22") + client = AdminClient.create(config) + // Because the bootstrap servers are set up incorrectly, this call will not complete, but must be + // cancelled by the close operation. + val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava, + new CreateTopicsOptions().timeoutMs(900000)).all() + client.close(0, TimeUnit.MILLISECONDS) + assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) + } + + /** + * Check that a call with a timeout does not complete before the minimum timeout has elapsed, + * even when the default request timeout is shorter. + */ + @Test + def testMinimumRequestTimeouts(): Unit = { + val config = createConfig() + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22") + config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0") + client = AdminClient.create(config) + val startTimeMs = Time.SYSTEM.milliseconds() + val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava, + new CreateTopicsOptions().timeoutMs(2)).all() + assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) + val endTimeMs = Time.SYSTEM.milliseconds() + assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs); + client.close() + } + + override def generateConfigs() = { + val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), + trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties) + cfgs.foreach { config => + config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}") + config.remove(KafkaConfig.InterBrokerSecurityProtocolProp) + config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value) + config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}") + config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true") + // We set this in order to test that we don't expose sensitive data via describe configs. This will already be + // set for subclasses with security enabled and we don't want to overwrite it. + if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp)) + config.setProperty(KafkaConfig.SslTruststorePasswordProp, "some.invalid.pass") + } + cfgs.foreach(_.putAll(serverConfig)) + cfgs.map(KafkaConfig.fromProps) + } +} + +object KafkaAdminClientIntegrationTest { + import org.scalatest.Assertions._ + + def checkValidDescribeAlterConfigs(zkUtils: ZkUtils, servers: Seq[KafkaServer], client: AdminClient): Unit = { // Create topics val topic1 = "describe-alter-configs-topic-1" val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) @@ -305,9 +411,7 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin assertEquals("0.9", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value) } - @Test - def testInvalidAlterConfigs(): Unit = { - client = AdminClient.create(createConfig) + def checkInvalidAlterConfigs(zkUtils: ZkUtils, servers: Seq[KafkaServer], client: AdminClient): Unit = { // Create topics val topic1 = "invalid-alter-configs-topic-1" @@ -383,93 +487,4 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(LogConfig.CompressionTypeProp).value) } - val ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), - new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)); - - /** - * Test that ACL operations are not possible when the authorizer is disabled. - * Also see {@link kafka.api.SaslSslAdminClientIntegrationTest} for tests of ACL operations - * when the authorizer is enabled. - */ - @Test - def testAclOperations(): Unit = { - client = AdminClient.create(createConfig()) - assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).all(), classOf[SecurityDisabledException]) - assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(ACL1)).all(), - classOf[SecurityDisabledException]) - assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(ACL1.toFilter())).all(), - classOf[SecurityDisabledException]) - client.close() - } - - /** - * Test closing the AdminClient with a generous timeout. Calls in progress should be completed, - * since they can be done within the timeout. New calls should receive timeouts. - */ - @Test - def testDelayedClose(): Unit = { - client = AdminClient.create(createConfig()) - val topics = Seq("mytopic", "mytopic2") - val newTopics = topics.map(new NewTopic(_, 1, 1)) - val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all() - client.close(2, TimeUnit.HOURS) - val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all() - assertFutureExceptionTypeEquals(future2, classOf[TimeoutException]) - future.get - client.close(30, TimeUnit.MINUTES) // multiple close-with-timeout should have no effect - } - - /** - * Test closing the AdminClient with a timeout of 0, when there are calls with extremely long - * timeouts in progress. The calls should be aborted after the hard shutdown timeout elapses. - */ - @Test - def testForceClose(): Unit = { - val config = createConfig() - config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22") - client = AdminClient.create(config) - // Because the bootstrap servers are set up incorrectly, this call will not complete, but must be - // cancelled by the close operation. - val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava, - new CreateTopicsOptions().timeoutMs(900000)).all() - client.close(0, TimeUnit.MILLISECONDS) - assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) - } - - /** - * Check that a call with a timeout does not complete before the minimum timeout has elapsed, - * even when the default request timeout is shorter. - */ - @Test - def testMinimumRequestTimeouts(): Unit = { - val config = createConfig() - config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22") - config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0") - client = AdminClient.create(config) - val startTimeMs = Time.SYSTEM.milliseconds() - val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava, - new CreateTopicsOptions().timeoutMs(2)).all() - assertFutureExceptionTypeEquals(future, classOf[TimeoutException]) - val endTimeMs = Time.SYSTEM.milliseconds() - assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs); - client.close() - } - - override def generateConfigs() = { - val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), - trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties) - cfgs.foreach { config => - config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}") - config.remove(KafkaConfig.InterBrokerSecurityProtocolProp) - config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value) - config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}") - config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true") - // We set this in order to test that we don't expose sensitive data via describe configs. This will already be - // set for subclasses with security enabled and we don't want to overwrite it. - if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp)) - config.setProperty(KafkaConfig.SslTruststorePasswordProp, "some.invalid.pass") - } - cfgs.foreach(_.putAll(serverConfig)) - cfgs.map(KafkaConfig.fromProps) - } } From e06abd2eaaef6a89ee68a249b6e41c733757a8d7 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 2 Jun 2017 18:01:04 +0100 Subject: [PATCH 3/6] Add license --- .../AdminClientWithPoliciesIntegrationTest.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala index e42481c2667c6..4dda88f34c138 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala @@ -1,3 +1,16 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + package kafka.api import java.util From e53c0aad840dfa75af0ede55e955aac5d8c2977d Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sat, 3 Jun 2017 00:00:06 +0100 Subject: [PATCH 4/6] Add tests for policy and rename KafkaAdminClientIntegrationTest to AdminClientIntegrationTest --- .../clients/admin/KafkaAdminClientTest.java | 2 +- ...scala => AdminClientIntegrationTest.scala} | 149 +++++++++--------- ...minClientWithPoliciesIntegrationTest.scala | 116 +++++++++++++- .../SaslSslAdminClientIntegrationTest.scala | 2 +- 4 files changed, 187 insertions(+), 82 deletions(-) rename core/src/test/scala/integration/kafka/api/{KafkaAdminClientIntegrationTest.scala => AdminClientIntegrationTest.scala} (96%) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 6f9e6af814b06..c0e86e9ef97db 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -67,7 +67,7 @@ /** * A unit test for KafkaAdminClient. * - * See KafkaAdminClientIntegrationTest for an integration test of the KafkaAdminClient. + * See AdminClientIntegrationTest for an integration test. */ public class KafkaAdminClientTest { @Rule diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala similarity index 96% rename from core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala rename to core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index b8cf92a6700a5..0a1d229563cbb 100644 --- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -45,9 +45,9 @@ import scala.collection.JavaConverters._ * * Also see {@link org.apache.kafka.clients.admin.KafkaAdminClientTest} for a unit test of the admin client. */ -class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Logging { +class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging { - import KafkaAdminClientIntegrationTest._ + import AdminClientIntegrationTest._ @Rule def globalTimeout = Timeout.millis(120000) @@ -181,7 +181,72 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin @Test def testDescribeAndAlterConfigs(): Unit = { client = AdminClient.create(createConfig) - checkValidDescribeAlterConfigs(zkUtils, servers, client) + + // Create topics + val topic1 = "describe-alter-configs-topic-1" + val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) + val topicConfig1 = new Properties + topicConfig1.setProperty(LogConfig.MaxMessageBytesProp, "500000") + topicConfig1.setProperty(LogConfig.RetentionMsProp, "60000000") + TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, topicConfig1) + + val topic2 = "describe-alter-configs-topic-2" + val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2) + TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties) + + // Describe topics and broker + val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, servers(1).config.brokerId.toString) + val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, servers(2).config.brokerId.toString) + val configResources = Seq(topicResource1, topicResource2, brokerResource1, brokerResource2) + var describeResult = client.describeConfigs(configResources.asJava) + var configs = describeResult.all.get + + assertEquals(4, configs.size) + + val maxMessageBytes1 = configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp) + assertEquals(LogConfig.MaxMessageBytesProp, maxMessageBytes1.name) + assertEquals(topicConfig1.get(LogConfig.MaxMessageBytesProp), maxMessageBytes1.value) + assertFalse(maxMessageBytes1.isDefault) + assertFalse(maxMessageBytes1.isSensitive) + assertFalse(maxMessageBytes1.isReadOnly) + + assertEquals(topicConfig1.get(LogConfig.RetentionMsProp), + configs.get(topicResource1).get(LogConfig.RetentionMsProp).value) + + val maxMessageBytes2 = configs.get(topicResource2).get(LogConfig.MaxMessageBytesProp) + assertEquals(Defaults.MessageMaxBytes.toString, maxMessageBytes2.value) + assertEquals(LogConfig.MaxMessageBytesProp, maxMessageBytes2.name) + assertTrue(maxMessageBytes2.isDefault) + assertFalse(maxMessageBytes2.isSensitive) + assertFalse(maxMessageBytes2.isReadOnly) + + assertEquals(servers(1).config.values.size, configs.get(brokerResource1).entries.size) + assertEquals(servers(1).config.brokerId.toString, configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value) + val listenerSecurityProtocolMap = configs.get(brokerResource1).get(KafkaConfig.ListenerSecurityProtocolMapProp) + assertEquals(servers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp), listenerSecurityProtocolMap.value) + assertEquals(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityProtocolMap.name) + assertFalse(listenerSecurityProtocolMap.isDefault) + assertFalse(listenerSecurityProtocolMap.isSensitive) + assertTrue(listenerSecurityProtocolMap.isReadOnly) + val truststorePassword = configs.get(brokerResource1).get(KafkaConfig.SslTruststorePasswordProp) + assertEquals(KafkaConfig.SslTruststorePasswordProp, truststorePassword.name) + assertNull(truststorePassword.value) + assertFalse(truststorePassword.isDefault) + assertTrue(truststorePassword.isSensitive) + assertTrue(truststorePassword.isReadOnly) + val compressionType = configs.get(brokerResource1).get(KafkaConfig.CompressionTypeProp) + assertEquals(servers(1).config.compressionType.toString, compressionType.value) + assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name) + assertTrue(compressionType.isDefault) + assertFalse(compressionType.isSensitive) + assertTrue(compressionType.isReadOnly) + + assertEquals(servers(2).config.values.size, configs.get(brokerResource2).entries.size) + assertEquals(servers(2).config.brokerId.toString, configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value) + assertEquals(servers(2).config.logCleanerThreads.toString, + configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value) + + checkValidAlterConfigs(zkUtils, servers, client, topicResource1, topicResource2) } @Test @@ -281,75 +346,12 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin } } -object KafkaAdminClientIntegrationTest { +object AdminClientIntegrationTest { import org.scalatest.Assertions._ - def checkValidDescribeAlterConfigs(zkUtils: ZkUtils, servers: Seq[KafkaServer], client: AdminClient): Unit = { - // Create topics - val topic1 = "describe-alter-configs-topic-1" - val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) - val topicConfig1 = new Properties - topicConfig1.setProperty(LogConfig.MaxMessageBytesProp, "500000") - topicConfig1.setProperty(LogConfig.RetentionMsProp, "60000000") - TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, topicConfig1) - - val topic2 = "describe-alter-configs-topic-2" - val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2) - TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties) - - // Describe topics and broker - val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, servers(1).config.brokerId.toString) - val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, servers(2).config.brokerId.toString) - val configResources = Seq(topicResource1, topicResource2, brokerResource1, brokerResource2) - var describeResult = client.describeConfigs(configResources.asJava) - var configs = describeResult.all.get - - assertEquals(4, configs.size) - - val maxMessageBytes1 = configs.get(topicResource1).get(LogConfig.MaxMessageBytesProp) - assertEquals(LogConfig.MaxMessageBytesProp, maxMessageBytes1.name) - assertEquals(topicConfig1.get(LogConfig.MaxMessageBytesProp), maxMessageBytes1.value) - assertFalse(maxMessageBytes1.isDefault) - assertFalse(maxMessageBytes1.isSensitive) - assertFalse(maxMessageBytes1.isReadOnly) - - assertEquals(topicConfig1.get(LogConfig.RetentionMsProp), - configs.get(topicResource1).get(LogConfig.RetentionMsProp).value) - - val maxMessageBytes2 = configs.get(topicResource2).get(LogConfig.MaxMessageBytesProp) - assertEquals(Defaults.MessageMaxBytes.toString, maxMessageBytes2.value) - assertEquals(LogConfig.MaxMessageBytesProp, maxMessageBytes2.name) - assertTrue(maxMessageBytes2.isDefault) - assertFalse(maxMessageBytes2.isSensitive) - assertFalse(maxMessageBytes2.isReadOnly) - - assertEquals(servers(1).config.values.size, configs.get(brokerResource1).entries.size) - assertEquals(servers(1).config.brokerId.toString, configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value) - val listenerSecurityProtocolMap = configs.get(brokerResource1).get(KafkaConfig.ListenerSecurityProtocolMapProp) - assertEquals(servers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp), listenerSecurityProtocolMap.value) - assertEquals(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityProtocolMap.name) - assertFalse(listenerSecurityProtocolMap.isDefault) - assertFalse(listenerSecurityProtocolMap.isSensitive) - assertTrue(listenerSecurityProtocolMap.isReadOnly) - val truststorePassword = configs.get(brokerResource1).get(KafkaConfig.SslTruststorePasswordProp) - assertEquals(KafkaConfig.SslTruststorePasswordProp, truststorePassword.name) - assertNull(truststorePassword.value) - assertFalse(truststorePassword.isDefault) - assertTrue(truststorePassword.isSensitive) - assertTrue(truststorePassword.isReadOnly) - val compressionType = configs.get(brokerResource1).get(KafkaConfig.CompressionTypeProp) - assertEquals(servers(1).config.compressionType.toString, compressionType.value) - assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name) - assertTrue(compressionType.isDefault) - assertFalse(compressionType.isSensitive) - assertTrue(compressionType.isReadOnly) - - assertEquals(servers(2).config.values.size, configs.get(brokerResource2).entries.size) - assertEquals(servers(2).config.brokerId.toString, configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value) - assertEquals(servers(2).config.logCleanerThreads.toString, - configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value) - + def checkValidAlterConfigs(zkUtils: ZkUtils, servers: Seq[KafkaServer], client: AdminClient, + topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = { // Alter topics var topicConfigEntries1 = Seq( new ConfigEntry(LogConfig.FlushMsProp, "1000") @@ -369,8 +371,8 @@ object KafkaAdminClientIntegrationTest { alterResult.all.get // Verify that topics were updated correctly - describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava) - configs = describeResult.all.get + var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava) + var configs = describeResult.all.get assertEquals(2, configs.size) @@ -412,7 +414,6 @@ object KafkaAdminClientIntegrationTest { } def checkInvalidAlterConfigs(zkUtils: ZkUtils, servers: Seq[KafkaServer], client: AdminClient): Unit = { - // Create topics val topic1 = "invalid-alter-configs-topic-1" val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) @@ -456,7 +457,7 @@ object KafkaAdminClientIntegrationTest { assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value) - assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(LogConfig.CompressionTypeProp).value) + assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value) // Alter configs with validateOnly = true: first and third are invalid, second is valid topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip")).asJava @@ -484,7 +485,7 @@ object KafkaAdminClientIntegrationTest { assertEquals("snappy", configs.get(topicResource2).get(LogConfig.CompressionTypeProp).value) - assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(LogConfig.CompressionTypeProp).value) + assertEquals(Defaults.CompressionType.toString, configs.get(brokerResource).get(KafkaConfig.CompressionTypeProp).value) } } diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala index 4dda88f34c138..b2371f52e4414 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala @@ -14,13 +14,19 @@ package kafka.api import java.util +import java.util.Properties +import java.util.concurrent.ExecutionException import kafka.integration.KafkaServerTestHarness -import kafka.server.KafkaConfig +import kafka.log.LogConfig +import kafka.server.{Defaults, KafkaConfig} import kafka.utils.{Logging, TestUtils} -import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, AlterConfigsOptions, Config, ConfigEntry} +import org.apache.kafka.common.config.{ConfigResource, TopicConfig} +import org.apache.kafka.common.errors.{InvalidRequestException, PolicyViolationException} import org.apache.kafka.common.utils.Utils import org.apache.kafka.server.policy.AlterConfigPolicy +import org.junit.Assert.{assertEquals, assertNull, assertTrue} import org.junit.{After, Before, Rule, Test} import org.junit.rules.Timeout @@ -34,7 +40,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with import AdminClientWithPoliciesIntegrationTest._ var client: AdminClient = null - val brokerCount = 2 + val brokerCount = 3 @Rule def globalTimeout = Timeout.millis(120000) @@ -62,20 +68,115 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with } @Test - def testValidDescribeAlterConfigs(): Unit = { + def testValidAlterConfigs(): Unit = { client = AdminClient.create(createConfig) - KafkaAdminClientIntegrationTest.checkValidDescribeAlterConfigs(zkUtils, servers, client) + // Create topics + val topic1 = "describe-alter-configs-topic-1" + val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) + val topicConfig1 = new Properties + topicConfig1.setProperty(LogConfig.MaxMessageBytesProp, "500000") + topicConfig1.setProperty(LogConfig.RetentionMsProp, "60000000") + TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, topicConfig1) + + val topic2 = "describe-alter-configs-topic-2" + val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2) + TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties) + + AdminClientIntegrationTest.checkValidAlterConfigs(zkUtils, servers, client, topicResource1, topicResource2) } @Test def testInvalidAlterConfigs(): Unit = { client = AdminClient.create(createConfig) - KafkaAdminClientIntegrationTest.checkInvalidAlterConfigs(zkUtils, servers, client) + AdminClientIntegrationTest.checkInvalidAlterConfigs(zkUtils, servers, client) } @Test def testInvalidAlterConfigsDueToPolicy(): Unit = { client = AdminClient.create(createConfig) + + // Create topics + val topic1 = "invalid-alter-configs-due-to-policy-topic-1" + val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) + TestUtils.createTopic(zkUtils, topic1, 1, 1, servers, new Properties()) + + val topic2 = "invalid-alter-configs-due-to-policy-topic-2" + val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2) + TestUtils.createTopic(zkUtils, topic2, 1, 1, servers, new Properties) + + val topic3 = "invalid-alter-configs-due-to-policy-topic-3" + val topicResource3 = new ConfigResource(ConfigResource.Type.TOPIC, topic3) + TestUtils.createTopic(zkUtils, topic3, 1, 1, servers, new Properties) + + val topicConfigEntries1 = Seq( + new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"), + new ConfigEntry(LogConfig.CompressionTypeProp, "lz4") // policy doesn't allow this + ).asJava + + var topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.8")).asJava + + var topicConfigEntries3 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "invalid-compression")).asJava + + val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString) + val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.SslTruststorePasswordProp, "12313")).asJava + + // Alter configs: second is valid, the others are invalid + var alterResult = client.alterConfigs(Map( + topicResource1 -> new Config(topicConfigEntries1), + topicResource2 -> new Config(topicConfigEntries2), + topicResource3 -> new Config(topicConfigEntries3), + brokerResource -> new Config(brokerConfigEntries) + ).asJava) + + assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.results.keySet) + assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException]) + alterResult.results.get(topicResource2).get + assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException]) + assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) + + // Verify that the second resource was updated and the others were not + var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava) + var configs = describeResult.all.get + assertEquals(4, configs.size) + + assertEquals(Defaults.LogCleanerMinCleanRatio.toString, + configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value) + assertEquals(Defaults.CompressionType.toString, + configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value) + + assertEquals("0.8", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value) + + assertNull(configs.get(brokerResource).get(KafkaConfig.SslTruststorePasswordProp).value) + + // Alter configs with validateOnly = true: only second is valid + topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.7")).asJava + + alterResult = client.alterConfigs(Map( + topicResource1 -> new Config(topicConfigEntries1), + topicResource2 -> new Config(topicConfigEntries2), + brokerResource -> new Config(brokerConfigEntries), + topicResource3 -> new Config(topicConfigEntries3) + ).asJava, new AlterConfigsOptions().validateOnly(true)) + + assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.results.keySet) + assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException]) + alterResult.results.get(topicResource2).get + assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException]) + assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) + + // Verify that no resources are updated since validate_only = true + describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava) + configs = describeResult.all.get + assertEquals(4, configs.size) + + assertEquals(Defaults.LogCleanerMinCleanRatio.toString, + configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value) + assertEquals(Defaults.CompressionType.toString, + configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value) + + assertEquals("0.8", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value) + + assertNull(configs.get(brokerResource).get(KafkaConfig.SslTruststorePasswordProp).value) } @@ -97,6 +198,9 @@ object AdminClientWithPoliciesIntegrationTest { require(!configs.isEmpty, "configure should have been called with non empty configs") require(!requestMetadata.configs.isEmpty, "request configs should not be empty") require(requestMetadata.resource.name.nonEmpty, "resource name should not be empty") + require(requestMetadata.resource.name.contains("topic")) + if (requestMetadata.configs.containsKey(TopicConfig.COMPRESSION_TYPE_CONFIG)) + throw new PolicyViolationException("Compression type cannot be updated") } def close(): Unit = closed = true diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala index d27b0bfeafe54..9cd86c3b29dec 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala @@ -27,7 +27,7 @@ import org.junit.{After, Assert, Before, Test} import scala.collection.JavaConverters._ -class SaslSslAdminClientIntegrationTest extends KafkaAdminClientIntegrationTest with SaslSetup { +class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with SaslSetup { this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName()) this.serverConfig.setProperty(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true") From 8eddaf21be54ca909aed70e908becb7df3cd58da Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sat, 3 Jun 2017 00:24:48 +0100 Subject: [PATCH 5/6] Test fixes --- .../AdminClientWithPoliciesIntegrationTest.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala index b2371f52e4414..7d3c54cfef520 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala @@ -110,12 +110,12 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with val topicConfigEntries1 = Seq( new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"), - new ConfigEntry(LogConfig.CompressionTypeProp, "lz4") // policy doesn't allow this + new ConfigEntry(LogConfig.MinInSyncReplicasProp, "2") // policy doesn't allow this ).asJava var topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.8")).asJava - var topicConfigEntries3 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "invalid-compression")).asJava + val topicConfigEntries3 = Seq(new ConfigEntry(LogConfig.MinInSyncReplicasProp, "-1")).asJava val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString) val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.SslTruststorePasswordProp, "12313")).asJava @@ -141,8 +141,8 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with assertEquals(Defaults.LogCleanerMinCleanRatio.toString, configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value) - assertEquals(Defaults.CompressionType.toString, - configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value) + assertEquals(Defaults.MinInSyncReplicas.toString, + configs.get(topicResource1).get(LogConfig.MinInSyncReplicasProp).value) assertEquals("0.8", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value) @@ -171,8 +171,8 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with assertEquals(Defaults.LogCleanerMinCleanRatio.toString, configs.get(topicResource1).get(LogConfig.MinCleanableDirtyRatioProp).value) - assertEquals(Defaults.CompressionType.toString, - configs.get(topicResource1).get(LogConfig.CompressionTypeProp).value) + assertEquals(Defaults.MinInSyncReplicas.toString, + configs.get(topicResource1).get(LogConfig.MinInSyncReplicasProp).value) assertEquals("0.8", configs.get(topicResource2).get(LogConfig.MinCleanableDirtyRatioProp).value) @@ -199,8 +199,8 @@ object AdminClientWithPoliciesIntegrationTest { require(!requestMetadata.configs.isEmpty, "request configs should not be empty") require(requestMetadata.resource.name.nonEmpty, "resource name should not be empty") require(requestMetadata.resource.name.contains("topic")) - if (requestMetadata.configs.containsKey(TopicConfig.COMPRESSION_TYPE_CONFIG)) - throw new PolicyViolationException("Compression type cannot be updated") + if (requestMetadata.configs.containsKey(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) + throw new PolicyViolationException("Min in sync replicas cannot be updated") } def close(): Unit = closed = true From 9cc46e5dce2286ebbdd226861f9f6541280a4022 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sat, 3 Jun 2017 01:38:55 +0100 Subject: [PATCH 6/6] Mention that topic is the only resource type whose configs can be updated --- .../java/org/apache/kafka/server/policy/AlterConfigPolicy.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java b/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java index 18536d7d40487..ca47efa66458f 100644 --- a/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java +++ b/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java @@ -77,7 +77,8 @@ public String toString() { * Clients will receive the POLICY_VIOLATION error code along with the exception's message. Note that validation * failure only affects the relevant resource, other resources in the request will still be processed. * - * @param requestMetadata the alter configs request parameters for the provided resource. + * @param requestMetadata the alter configs request parameters for the provided resource (topic is the only resource + * type whose configs can be updated currently). * @throws PolicyViolationException if the request parameters do not satisfy this policy. */ void validate(RequestMetadata requestMetadata) throws PolicyViolationException;