From 3ed2cb295a2a3a1c91d267b36291743842862202 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 26 Jan 2017 13:36:25 +0000 Subject: [PATCH 1/3] Close `createTopicPolicy` in `AdminManager.shutdown()` --- core/src/main/scala/kafka/server/AdminManager.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index 6dc224dc54603..98a4b1897d635 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -191,5 +191,6 @@ class AdminManager(val config: KafkaConfig, def shutdown() { topicPurgatory.shutdown() + CoreUtils.swallow(createTopicPolicy.foreach(_.close())) } } From ec0ee690fcf7c04a49ff5d1d1a5caac1839732fd Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 26 Jan 2017 13:36:52 +0000 Subject: [PATCH 2/3] Add a few more tests in `CreateTopicsRequestWithPolicyTest` --- .../CreateTopicsRequestWithPolicyTest.scala | 49 ++++++++++++++++--- 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala index 9affea47e3c48..4bebcc5031c69 100644 --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala @@ -20,6 +20,7 @@ package kafka.server import java.util import java.util.Properties +import kafka.log.LogConfig import kafka.utils.TestUtils import org.apache.kafka.common.errors.PolicyViolationException import org.apache.kafka.common.protocol.Errors @@ -48,9 +49,13 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder( Map("topic2" -> new CreateTopicsRequest.TopicDetails(5, 3.toShort)).asJava, timeout, true).build()) + val configs = Map(LogConfig.RetentionMsProp -> 4999.toString) + validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder( + Map("topic3" -> new CreateTopicsRequest.TopicDetails(11, 2.toShort, configs.asJava)).asJava, timeout, true).build()) + val assignments = replicaAssignmentToJava(Map(0 -> List(1, 0), 1 -> List(0, 1))) validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder( - Map("topic3" -> new CreateTopicsRequest.TopicDetails(assignments)).asJava, timeout).build()) + Map("topic4" -> new CreateTopicsRequest.TopicDetails(assignments)).asJava, timeout).build()) } @Test @@ -65,14 +70,23 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest Map("policy-topic1" -> error(Errors.POLICY_VIOLATION, Some("Topics should have at least 5 partitions, received 4")))) validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder( - Map("policy-topic2" -> new CreateTopicsRequest.TopicDetails(4, 1.toShort)).asJava, timeout, true).build(), + Map("policy-topic2" -> new CreateTopicsRequest.TopicDetails(4, 3.toShort)).asJava, timeout, true).build(), Map("policy-topic2" -> error(Errors.POLICY_VIOLATION, Some("Topics should have at least 5 partitions, received 4")))) + val configs = Map(LogConfig.RetentionMsProp -> 5001.toString) + validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder( + Map("policy-topic3" -> new CreateTopicsRequest.TopicDetails(11, 2.toShort, configs.asJava)).asJava, timeout, true).build(), + Map("policy-topic3" -> error(Errors.POLICY_VIOLATION, Some("RetentionMs should be less than 5000ms if replicationFactor > 5")))) + + validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder( + Map("policy-topic4" -> new CreateTopicsRequest.TopicDetails(11, 3.toShort, Map.empty.asJava)).asJava, timeout, true).build(), + Map("policy-topic4" -> error(Errors.POLICY_VIOLATION, Some("RetentionMs should be less than 5000ms if replicationFactor > 5")))) + val assignments = replicaAssignmentToJava(Map(0 -> List(1), 1 -> List(0))) validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder( - Map("policy-topic3" -> new CreateTopicsRequest.TopicDetails(assignments)).asJava, timeout).build(), - Map("policy-topic3" -> error(Errors.POLICY_VIOLATION, - Some("""Topic partitions should have at least 2 partitions, received 1 for partition 0"""))), checkErrorMessage = true) + Map("policy-topic5" -> new CreateTopicsRequest.TopicDetails(assignments)).asJava, timeout).build(), + Map("policy-topic5" -> error(Errors.POLICY_VIOLATION, + Some("Topic partitions should have at least 2 partitions, received 1 for partition 0"))), checkErrorMessage = true) // Check that basic errors still work validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder( @@ -94,29 +108,48 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest object CreateTopicsRequestWithPolicyTest { class Policy extends CreateTopicPolicy { - def configure(configs: util.Map[String, _]): Unit = () + + var configs: Map[String, _] = _ + var closed = false + + def configure(configs: util.Map[String, _]): Unit = { + this.configs = configs.asScala.toMap + } def validate(requestMetadata: RequestMetadata): Unit = { + require(!closed, "Policy should not be closed") + require(!configs.isEmpty, "configure should have been called with non empty configs") + import requestMetadata._ - require(configs.isEmpty, s"Topic configs should be empty, but it is $configs") if (numPartitions != null || replicationFactor != null) { require(numPartitions != null, s"numPartitions should not be null, but it is $numPartitions") require(replicationFactor != null, s"replicationFactor should not be null, but it is $replicationFactor") require(replicasAssignments == null, s"replicaAssigments should be null, but it is $replicasAssignments") + if (numPartitions < 5) throw new PolicyViolationException(s"Topics should have at least 5 partitions, received $numPartitions") + + if (numPartitions > 10) { + if (requestMetadata.configs.asScala.get(LogConfig.RetentionMsProp).fold(true)(_.toInt > 5000)) + throw new PolicyViolationException("RetentionMs should be less than 5000ms if replicationFactor > 5") + } else + require(requestMetadata.configs.isEmpty, s"Topic configs should be empty, but it is ${requestMetadata.configs}") + } else { require(numPartitions == null, s"numPartitions should be null, but it is $numPartitions") require(replicationFactor == null, s"replicationFactor should be null, but it is $replicationFactor") require(replicasAssignments != null, s"replicaAssigments should not be null, but it is $replicasAssignments") + replicasAssignments.asScala.foreach { case (partitionId, assignment) => if (assignment.size < 2) throw new PolicyViolationException("Topic partitions should have at least 2 partitions, received " + s"${assignment.size} for partition $partitionId") } } + } - def close(): Unit = () + def close(): Unit = closed = true + } } From 47164a5814704296d1ade057ef9815b99d197bfa Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 27 Jan 2017 01:30:23 +0000 Subject: [PATCH 3/3] Remove unnecessary `checkErrorMessage = true` --- .../unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala index 4bebcc5031c69..f2020cf8bb41e 100644 --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala @@ -86,7 +86,7 @@ class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder( Map("policy-topic5" -> new CreateTopicsRequest.TopicDetails(assignments)).asJava, timeout).build(), Map("policy-topic5" -> error(Errors.POLICY_VIOLATION, - Some("Topic partitions should have at least 2 partitions, received 1 for partition 0"))), checkErrorMessage = true) + Some("Topic partitions should have at least 2 partitions, received 1 for partition 0")))) // Check that basic errors still work validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(