From 41b91083d17e8ad4d6e3068de7d1fb958a842a20 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 19 Oct 2023 15:16:11 +0300 Subject: [PATCH] [aiven] feat: add topic id tagged field to create topic api --- .../apache/kafka/clients/admin/NewTopic.java | 18 +++++++++++++- .../common/message/CreateTopicsRequest.json | 4 +++- .../scala/kafka/server/ZkAdminManager.scala | 13 +++++++--- .../main/scala/kafka/zk/AdminZkClient.scala | 15 ++++++++---- .../unit/kafka/zk/AdminZkClientTest.scala | 17 ++++++++++++- .../controller/ReplicationControlManager.java | 10 +++++++- .../ReplicationControlManagerTest.java | 24 ++++++++++++++++++- .../org/apache/kafka/tools/TopicCommand.java | 4 +++- 8 files changed, 92 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java index 2f335d02f2f2..2854703ba6b4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java @@ -18,6 +18,8 @@ package org.apache.kafka.clients.admin; import java.util.Optional; + +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig; @@ -35,6 +37,7 @@ */ public class NewTopic { + private final Uuid id; private final String name; private final Optional numPartitions; private final Optional replicationFactor; @@ -48,12 +51,17 @@ public NewTopic(String name, int numPartitions, short replicationFactor) { this(name, Optional.of(numPartitions), Optional.of(replicationFactor)); } + public NewTopic(String name, Optional numPartitions, Optional replicationFactor) { + this(null, name, numPartitions, replicationFactor); + } + /** * A new topic that optionally defaults {@code numPartitions} and {@code replicationFactor} to * the broker configurations for {@code num.partitions} and {@code default.replication.factor} * respectively. */ - public NewTopic(String name, Optional numPartitions, Optional replicationFactor) { + public NewTopic(Uuid id, String name, Optional numPartitions, Optional replicationFactor) { + this.id = id; this.name = name; this.numPartitions = numPartitions; this.replicationFactor = replicationFactor; @@ -68,12 +76,17 @@ public NewTopic(String name, Optional numPartitions, Optional re * generally a good idea for all partitions to have the same number of replicas. */ public NewTopic(String name, Map> replicasAssignments) { + this.id = null; this.name = name; this.numPartitions = Optional.empty(); this.replicationFactor = Optional.empty(); this.replicasAssignments = Collections.unmodifiableMap(replicasAssignments); } + public Uuid id() { + return id; + } + /** * The name of the topic to be created. */ @@ -126,6 +139,9 @@ CreatableTopic convertToCreatableTopic() { setName(name). setNumPartitions(numPartitions.orElse(CreateTopicsRequest.NO_NUM_PARTITIONS)). setReplicationFactor(replicationFactor.orElse(CreateTopicsRequest.NO_REPLICATION_FACTOR)); + if (id != null) { + creatableTopic.setId(id); + } if (replicasAssignments != null) { for (Entry> entry : replicasAssignments.entrySet()) { creatableTopic.assignments().add( diff --git a/clients/src/main/resources/common/message/CreateTopicsRequest.json b/clients/src/main/resources/common/message/CreateTopicsRequest.json index bc9dbb2438b9..4daceaa1d18c 100644 --- a/clients/src/main/resources/common/message/CreateTopicsRequest.json +++ b/clients/src/main/resources/common/message/CreateTopicsRequest.json @@ -54,7 +54,9 @@ "about": "The configuration name." }, { "name": "Value", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The configuration value." } - ]} + ]}, + { "name": "Id", "type": "uuid", "tag": 0, "taggedVersions": "5+", "versions": "5+", "ignorable": true, + "about": "Optional topic id."} ]}, { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000", "about": "How long to wait in milliseconds before timing out the request." }, diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala index ac7a82dc0dfa..3470474842ba 100644 --- a/core/src/main/scala/kafka/server/ZkAdminManager.scala +++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala @@ -29,8 +29,7 @@ import org.apache.kafka.admin.AdminUtils import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism} import org.apache.kafka.common.Uuid import org.apache.kafka.common.config.{ConfigDef, ConfigException, ConfigResource} -import org.apache.kafka.common.errors.ThrottlingQuotaExceededException -import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedVersionException} +import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, ThrottlingQuotaExceededException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedVersionException} import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic @@ -167,6 +166,14 @@ class ZkAdminManager(val config: KafkaConfig, try { if (metadataCache.contains(topic.name)) throw new TopicExistsException(s"Topic '${topic.name}' already exists.") + val maybeUuid = topic.id() match { + case Uuid.ZERO_UUID => None + case id => + if (metadataCache.topicNamesToIds().containsValue(id)) { + throw new TopicExistsException(s"Topic id '$id' already exists.") + } + Some(id) + } val nullConfigs = topic.configs.asScala.filter(_.value == null).map(_.name) if (nullConfigs.nonEmpty) @@ -211,7 +218,7 @@ class ZkAdminManager(val config: KafkaConfig, CreatePartitionsMetadata(topic.name, assignments.keySet) } else { controllerMutationQuota.record(assignments.size) - adminZkClient.createTopicWithAssignment(topic.name, configs, assignments, validate = false, config.usesTopicId) + adminZkClient.createTopicWithAssignment(topic.name, configs, assignments, validate = false, config.usesTopicId, maybeUuid) populateIds(includeConfigsAndMetadata, topic.name) CreatePartitionsMetadata(topic.name, assignments.keySet) } diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index 25d4e17788e5..dd8c91982fe3 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -103,7 +103,8 @@ class AdminZkClient(zkClient: KafkaZkClient, config: Properties, partitionReplicaAssignment: Map[Int, Seq[Int]], validate: Boolean = true, - usesTopicId: Boolean = false): Unit = { + usesTopicId: Boolean = false, + maybeTopicId: Option[Uuid] = None): Unit = { if (validate) validateTopicCreate(topic, partitionReplicaAssignment, config) @@ -115,7 +116,7 @@ class AdminZkClient(zkClient: KafkaZkClient, // create the partition assignment writeTopicPartitionAssignment(topic, partitionReplicaAssignment.map { case (k, v) => k -> ReplicaAssignment(v) }, - isUpdate = false, usesTopicId) + isUpdate = false, usesTopicId, maybeTopicId) } /** @@ -167,12 +168,18 @@ class AdminZkClient(zkClient: KafkaZkClient, } private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, ReplicaAssignment], - isUpdate: Boolean, usesTopicId: Boolean = false): Unit = { + isUpdate: Boolean, usesTopicId: Boolean = false, + maybeTopicId: Option[Uuid] = None): Unit = { try { val assignment = replicaAssignment.map { case (partitionId, replicas) => (new TopicPartition(topic,partitionId), replicas) }.toMap if (!isUpdate) { - val topicIdOpt = if (usesTopicId) Some(Uuid.randomUuid()) else None + val topicIdOpt = if (usesTopicId) { + maybeTopicId match { + case None => Some(Uuid.randomUuid()) + case _ => maybeTopicId + } + } else None zkClient.createTopicAssignment(topic, topicIdOpt, assignment.map { case (k, v) => k -> v.replicas }) } else { val topicIds = zkClient.getTopicIdsForTopics(Set(topic)) diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala index e3171647ff18..696691d6caa4 100644 --- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala @@ -26,7 +26,7 @@ import kafka.utils.CoreUtils._ import kafka.utils.TestUtils._ import kafka.utils.{Logging, TestUtils} import kafka.zk.{AdminZkClient, ConfigEntityTypeZNode, KafkaZkClient} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{TopicPartition, Uuid} import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.internals.QuotaConfigs import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException} @@ -53,6 +53,21 @@ class AdminZkClientTest extends QuorumTestHarness with Logging with RackAwareTes super.tearDown() } + @Test + def testCreateTopicWithId(): Unit = { + val brokers = List(0, 1, 2, 3, 4) + TestUtils.createBrokersInZk(zkClient, brokers) + + val topicConfig = new Properties() + + val assignment = Map(0 -> List(0, 1, 2), + 1 -> List(1, 2, 3)) + val uuid = Uuid.randomUuid + adminZkClient.createTopicWithAssignment("test", topicConfig, assignment, usesTopicId = true, maybeTopicId = Some(uuid)) + val found = zkClient.getTopicIdsForTopics(Set("test")) + assertEquals(uuid, found("test")) + } + @Test def testManualReplicaAssignment(): Unit = { val brokers = List(0, 1, 2, 3, 4) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 759e3dfe5c4c..4aca2ef7a765 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -680,6 +680,15 @@ private ApiError createTopic(ControllerRequestContext context, boolean authorizedToReturnConfigs) { Map creationConfigs = translateCreationConfigs(topic.configs()); Map newParts = new HashMap<>(); + Uuid topicId; + if (topic.id() == null || topic.id() == Uuid.ZERO_UUID) { + topicId = Uuid.randomUuid(); + } else { + if (topics.containsKey(topic.id())) { + return ApiError.fromThrowable(new InvalidTopicException("Topic id " + topic.id() + " already exists")); + } + topicId = topic.id(); + } if (!topic.assignments().isEmpty()) { if (topic.replicationFactor() != -1) { return new ApiError(INVALID_REQUEST, @@ -778,7 +787,6 @@ private ApiError createTopic(ControllerRequestContext context, numPartitions, e.throttleTimeMs()); return ApiError.fromThrowable(e); } - Uuid topicId = Uuid.randomUuid(); CreatableTopicResult result = new CreatableTopicResult(). setName(topic.name()). setTopicId(topicId). diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index bf7f6c82e051..f923e74d9788 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -258,12 +258,14 @@ private ReplicationControlTestContext( clusterControl.activate(); } - CreatableTopicResult createTestTopic(String name, + CreatableTopicResult createTestTopic(Uuid id, + String name, int numPartitions, short replicationFactor, short expectedErrorCode) throws Exception { CreateTopicsRequestData request = new CreateTopicsRequestData(); CreatableTopic topic = new CreatableTopic().setName(name); + if (id != null) topic.setId(id); topic.setNumPartitions(numPartitions).setReplicationFactor(replicationFactor); request.topics().add(topic); ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); @@ -278,6 +280,13 @@ CreatableTopicResult createTestTopic(String name, return topicResult; } + CreatableTopicResult createTestTopic(String name, + int numPartitions, + short replicationFactor, + short expectedErrorCode) throws Exception { + return createTestTopic(null, name, numPartitions, replicationFactor, expectedErrorCode); + } + CreatableTopicResult createTestTopic(String name, int[][] replicas) throws Exception { return createTestTopic(name, replicas, Collections.emptyMap(), (short) 0); } @@ -814,6 +823,19 @@ public void testInvalidCreateTopicsWithValidateOnlyFlag() throws Exception { assertEquals(expectedResponse, result.response()); } + @Test + public void testCreateTopicsWithId() throws Exception { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(); + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + + Uuid id = Uuid.randomUuid(); + CreatableTopicResult initialTopic = ctx.createTestTopic(id, "foo.bar", 2, (short) 2, NONE.code()); + assertEquals(id, ctx.replicationControl.getTopic(initialTopic.topicId()).topicId()); + CreatableTopicResult resultWithErrors = ctx.createTestTopic(id, "foo.baz", 2, (short) 2, INVALID_TOPIC_EXCEPTION.code()); + assertEquals("Topic id " + id + " already exists", resultWithErrors.errorMessage()); + } + @Test public void testCreateTopicsWithPolicy() throws Exception { MockCreateTopicPolicy createTopicPolicy = new MockCreateTopicPolicy(asList( diff --git a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java index ae725012dc18..a753f35a95ea 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java @@ -252,6 +252,7 @@ static class CommandTopicPartition { private final Optional replicationFactor; private final Map> replicaAssignment; private final Properties configsToAdd; + private final Optional topicId; private final TopicCommandOptions opts; @@ -262,6 +263,7 @@ public CommandTopicPartition(TopicCommandOptions options) { replicationFactor = options.replicationFactor(); replicaAssignment = options.replicaAssignment().orElse(Collections.emptyMap()); configsToAdd = parseTopicConfigsToBeAdded(options); + topicId = options.topicId().map(Uuid::fromString); } public Boolean hasReplicaAssignment() { @@ -459,7 +461,7 @@ public void createTopic(CommandTopicPartition topic) throws Exception { if (topic.hasReplicaAssignment()) { newTopic = new NewTopic(topic.name, topic.replicaAssignment); } else { - newTopic = new NewTopic(topic.name, topic.partitions, topic.replicationFactor.map(Integer::shortValue)); + newTopic = new NewTopic(topic.topicId.orElse(null), topic.name, topic.partitions, topic.replicationFactor.map(Integer::shortValue)); } Map configsMap = topic.configsToAdd.stringPropertyNames().stream()