From b88939ac4ef082ba3108eee24fd230572941570b Mon Sep 17 00:00:00 2001 From: lipenghui Date: Wed, 6 Nov 2019 17:53:19 +0800 Subject: [PATCH 1/6] Fix create consumer on partitioned topic while disable topic auto creation. --- .../pulsar/broker/service/ServerCnx.java | 4 + .../client/api/ConsumerCreationTest.java | 78 +++++++++++++++++++ .../client/api/PulsarClientException.java | 4 +- 3 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index cb4e203d54732..4435806399698 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -692,6 +692,10 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { boolean createTopicIfDoesNotExist = forceTopicCreation && service.pulsar().getConfig().isAllowAutoTopicCreation(); + if (topicName.isPartitioned()) { + createTopicIfDoesNotExist = true; + } + service.getTopic(topicName.toString(), createTopicIfDoesNotExist) .thenCompose(optTopic -> { if (!optTopic.isPresent()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java new file mode 100644 index 0000000000000..dcb872039b728 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class ConsumerCreationTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setManagedLedgerCacheEvictionFrequency(0.1); + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testCreateConsumerForPartitionedTopicWhenDisableTopicAutoCreation() throws PulsarAdminException, PulsarClientException { + conf.setAllowAutoTopicCreation(false); + final String topic = "testCreateConsumerWhenDisableTopicAutoCreation"; + admin.topics().createPartitionedTopic(topic, 3); + Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe()); + } + + @Test + public void testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation() throws PulsarClientException { + conf.setAllowAutoTopicCreation(false); + final String topic = "testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation"; + try { + pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe(); + Assert.fail("should be failed"); + } catch (PulsarClientException.TopicDoesNotExistException e) { + //ok + } + } + + @Test + public void testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation() throws PulsarAdminException, PulsarClientException { + conf.setAllowAutoTopicCreation(true); + final String topic = "testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation"; + admin.topics().createPartitionedTopic(topic, 3); + Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe()); + } + + @Test + public void testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation() throws PulsarClientException { + conf.setAllowAutoTopicCreation(true); + final String topic = "testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation"; + Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe()); + } + +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index 9f9c38fb3abe2..b498753197c3b 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -644,7 +644,9 @@ public static PulsarClientException unwrap(Throwable t) { return new ChecksumException(msg); } else if (cause instanceof CryptoException) { return new CryptoException(msg); - } else { + } else if (cause instanceof TopicDoesNotExistException) { + return new TopicDoesNotExistException(msg); + }else { return new PulsarClientException(t); } } From dd281b8f1c15c9e34df3b3450d695fd7b7e035e2 Mon Sep 17 00:00:00 2001 From: lipenghui Date: Wed, 6 Nov 2019 17:59:40 +0800 Subject: [PATCH 2/6] fix checkstyle --- .../org/apache/pulsar/client/api/PulsarClientException.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index b498753197c3b..ac567f2f6091e 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -646,7 +646,7 @@ public static PulsarClientException unwrap(Throwable t) { return new CryptoException(msg); } else if (cause instanceof TopicDoesNotExistException) { return new TopicDoesNotExistException(msg); - }else { + } else { return new PulsarClientException(t); } } From d308dd9049090c6157a5aceb4e6b5971dfb44543 Mon Sep 17 00:00:00 2001 From: lipenghui Date: Tue, 12 Nov 2019 15:49:09 +0800 Subject: [PATCH 3/6] Try to create partitions when create partitioned topic. --- .../pulsar/broker/admin/AdminResource.java | 17 ++++ .../pulsar/broker/admin/ZkAdminPaths.java | 4 + .../admin/impl/PersistentTopicsBase.java | 8 ++ .../broker/admin/v2/PersistentTopics.java | 26 ++++- .../pulsar/broker/service/ServerCnx.java | 4 - .../client/api/ConsumerCreationTest.java | 94 ++++++++++++++++--- .../apache/pulsar/client/admin/Topics.java | 20 ++++ .../client/admin/internal/TopicsImpl.java | 21 +++++ .../apache/pulsar/admin/cli/CmdTopics.java | 16 ++++ site2/docs/admin-api-partitioned-topics.md | 28 ++++++ site2/docs/reference-pulsar-admin.md | 10 ++ 11 files changed, 229 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 02ebd064aa4d3..228e21ec295b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -233,6 +233,23 @@ protected List getListOfNamespaces(String property) throws Exception { return namespaces; } + protected void tryCreatePartitions(int numPartitions) { + if (!topicName.isPersistent()) { + return; + } + for (int i = 0; i < numPartitions; i++) { + try { + zkCreateOptimistic(ZkAdminPaths.managedLedgerPath(topicName.getPartition(i)), new byte[0]); + } catch (KeeperException.NodeExistsException e) { + log.warn("[{}] Topic partition is already existing, doing nothing {}", clientAppId(), topicName.getPartition(i)); + } catch (KeeperException.BadVersionException e) { + log.warn("[{}] Fail to create topic partition {}: concurrent modification", clientAppId(), topicName.getPartition(i)); + } catch (Exception e) { + log.error("[{}] Fail to create topic partition {}", clientAppId(), topicName.getPartition(i), e); + } + } + } + protected NamespaceName namespaceName; protected void validateNamespaceName(String property, String namespace) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java index c60422d16995f..95954f610d25e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/ZkAdminPaths.java @@ -31,6 +31,10 @@ public static String partitionedTopicPath(TopicName name) { name.getNamespace(), name.getDomain().value(), name.getEncodedLocalName()); } + public static String managedLedgerPath(TopicName name) { + return "/managed-ledgers/" + name.getPersistenceNamingEncoding(); + } + public static String namespacePoliciesPath(NamespaceName name) { return adminPath(POLICIES, name.toString()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index d8cde658d6ec2..38eb8343ac693 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -394,6 +394,7 @@ protected void internalCreatePartitionedTopic(int numPartitions) { String path = ZkAdminPaths.partitionedTopicPath(topicName); byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); zkCreateOptimistic(path, data); + tryCreatePartitions(numPartitions); // we wait for the data to be synced in all quorums and the observers Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS); log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName); @@ -513,6 +514,13 @@ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateL } } + protected void internalCreateMissedPartitions() { + PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, false, false); + if (metadata != null) { + tryCreatePartitions(metadata.partitions); + } + } + private CompletableFuture updatePartitionInOtherCluster(int numPartitions, Set clusters) { List> results = new ArrayList<>(clusters.size() -1); clusters.forEach(cluster -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 9eec8ae23ef84..6568687f97baa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -246,7 +246,7 @@ public void createNonPartitionedTopic( */ @POST @Path("/{tenant}/{namespace}/{topic}/partitions") - @ApiOperation(value = "Increment partitons of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic") + @ApiOperation(value = "Increment partitions of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic") @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), @@ -270,6 +270,30 @@ public void updatePartitionedTopic( internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly); } + + @POST + @Path("/{tenant}/{namespace}/{topic}/createMissedPartitions") + @ApiOperation(value = "Create missed partitions of an existing partitioned topic.", notes = "It only try to create missed partitions of existing non-global partitioned-topic") + @ApiResponses(value = { + @ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant does not exist"), + @ApiResponse(code = 409, message = "Partitioned topic does not exist"), + @ApiResponse(code = 412, message = "Partitioned topic name is invalid"), + @ApiResponse(code = 500, message = "Internal server error") + }) + public void createMissedPartitions( + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic) { + + validatePartitionedTopicName(tenant, namespace, encodedTopic); + internalCreateMissedPartitions(); + } + @GET @Path("/{tenant}/{namespace}/{topic}/partitions") @ApiOperation(value = "Get partitioned topic metadata.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4435806399698..cb4e203d54732 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -692,10 +692,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { boolean createTopicIfDoesNotExist = forceTopicCreation && service.pulsar().getConfig().isAllowAutoTopicCreation(); - if (topicName.isPartitioned()) { - createTopicIfDoesNotExist = true; - } - service.getTopic(topicName.toString(), createTopicIfDoesNotExist) .thenCompose(optTopic -> { if (!optTopic.isPresent()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java index dcb872039b728..52cf915b26267 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java @@ -18,14 +18,37 @@ */ package org.apache.pulsar.client.api; +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.pulsar.broker.admin.ZkAdminPaths; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.util.concurrent.TimeUnit; + +import static org.apache.pulsar.broker.admin.AdminResource.jsonMapper; + public class ConsumerCreationTest extends ProducerConsumerBase { + @DataProvider(name = "topicDomainProvider") + public Object[][] topicDomainProvider() { + return new Object[][] { + { TopicDomain.persistent }, + { TopicDomain.non_persistent } + }; + } + @BeforeClass @Override protected void setup() throws Exception { @@ -40,39 +63,82 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - @Test - public void testCreateConsumerForPartitionedTopicWhenDisableTopicAutoCreation() throws PulsarAdminException, PulsarClientException { + @Test(dataProvider = "topicDomainProvider") + public void testCreateConsumerForPartitionedTopicWhenDisableTopicAutoCreation(TopicDomain domain) throws PulsarAdminException, PulsarClientException { conf.setAllowAutoTopicCreation(false); - final String topic = "testCreateConsumerWhenDisableTopicAutoCreation"; + final String topic = domain.value() + "://public/default/testCreateConsumerWhenDisableTopicAutoCreation"; admin.topics().createPartitionedTopic(topic, 3); Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe()); } - @Test - public void testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation() throws PulsarClientException { + @Test(dataProvider = "topicDomainProvider") + public void testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation(TopicDomain domain) throws PulsarClientException { conf.setAllowAutoTopicCreation(false); - final String topic = "testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation"; + final String topic = domain.value() + "://public/default/testCreateConsumerForNonPartitionedTopicWhenDisableTopicAutoCreation"; try { - pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe(); - Assert.fail("should be failed"); + Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe(); + if (domain == TopicDomain.persistent) { + Assert.fail("should be failed"); + } else { + // passed non persistent topic here since we can not avoid auto creation on non persistent topic now. + Assert.assertNotNull(consumer); + } } catch (PulsarClientException.TopicDoesNotExistException e) { //ok } } - @Test - public void testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation() throws PulsarAdminException, PulsarClientException { + @Test(dataProvider = "topicDomainProvider") + public void testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation(TopicDomain domain) throws PulsarAdminException, PulsarClientException { conf.setAllowAutoTopicCreation(true); - final String topic = "testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation"; + final String topic = domain.value() + "://public/default/testCreateConsumerForPartitionedTopicWhenEnableTopicAutoCreation"; admin.topics().createPartitionedTopic(topic, 3); Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe()); } - @Test - public void testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation() throws PulsarClientException { + @Test(dataProvider = "topicDomainProvider") + public void testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation(TopicDomain domain) throws PulsarClientException { conf.setAllowAutoTopicCreation(true); - final String topic = "testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation"; + final String topic = domain.value() + "://public/default/testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation"; Assert.assertNotNull(pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe()); } + @Test + public void testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation() throws PulsarClientException, PulsarAdminException { + conf.setAllowAutoTopicCreation(false); + final String topic = "testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation"; + admin.topics().createPartitionedTopic(topic, 3); + MultiTopicsConsumerImpl consumer = (MultiTopicsConsumerImpl) pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe(); + Assert.assertNotNull(consumer); + Assert.assertEquals(consumer.getConsumers().size(), 3); + consumer.close(); + admin.topics().updatePartitionedTopic(topic, 5); + consumer = (MultiTopicsConsumerImpl) pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe(); + Assert.assertNotNull(consumer); + Assert.assertEquals(consumer.getConsumers().size(), 5); + } + + @Test + public void testCreateMissedPartitions() throws JsonProcessingException, KeeperException, InterruptedException, PulsarAdminException, PulsarClientException { + conf.setAllowAutoTopicCreation(false); + final String topic = "testCreateMissedPartitions"; + String path = ZkAdminPaths.partitionedTopicPath(TopicName.get(topic)); + int numPartitions = 3; + byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); + // simulate partitioned topic without partitions + ZkUtils.createFullPathOptimistic(pulsar.getGlobalZkCache().getZooKeeper(), path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Consumer consumer = null; + try { + consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribeAsync().get(3, TimeUnit.SECONDS); + } catch (Exception e) { + //ok here, consumer will create failed with 'Topic does not exist' + } + Assert.assertNull(consumer); + admin.topics().createMissedPartitions(topic); + consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe(); + Assert.assertNotNull(consumer); + Assert.assertTrue(consumer instanceof MultiTopicsConsumerImpl); + Assert.assertEquals(((MultiTopicsConsumerImpl)consumer).getConsumers().size(), 3); + } + } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index 39c007c7ffba0..c5ec04120c022 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -211,6 +211,16 @@ List getListInBundle(String namespace, String bundleRange) */ void createNonPartitionedTopic(String topic) throws PulsarAdminException; + /** + * Create missed partitions for partitioned topic. + *

+ * When disable topic auto creation, use this method to try create missed partitions while + * partitions create failed or users already have partitioned topic without partitions. + * + * @param topic partitioned topic name + */ + void createMissedPartitions(String topic) throws PulsarAdminException; + /** * Create a partitioned topic asynchronously. *

@@ -233,6 +243,16 @@ List getListInBundle(String namespace, String bundleRange) */ CompletableFuture createNonPartitionedTopicAsync(String topic); + /** + * Create missed partitions for partitioned topic asynchronously. + *

+ * When disable topic auto creation, use this method to try create missed partitions while + * partitions create failed or users already have partitioned topic without partitions. + * + * @param topic partitioned topic name + */ + CompletableFuture createMissedPartitionsAsync(String topic); + /** * Update number of partitions of a non-global partitioned topic. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 3c318453eefac..75c3d59a55d7c 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -224,6 +224,20 @@ public void createNonPartitionedTopic(String topic) throws PulsarAdminException } } + @Override + public void createMissedPartitions(String topic) throws PulsarAdminException { + try { + createMissedPartitionsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + @Override public CompletableFuture createNonPartitionedTopicAsync(String topic){ TopicName tn = validateTopic(topic); @@ -239,6 +253,13 @@ public CompletableFuture createPartitionedTopicAsync(String topic, int num return asyncPutRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON)); } + @Override + public CompletableFuture createMissedPartitionsAsync(String topic) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "createMissedPartitions"); + return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + @Override public void updatePartitionedTopic(String topic, int numPartitions) throws PulsarAdminException { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 9a2ae1b6149f0..af5aa02e7a2a3 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -82,6 +82,7 @@ public CmdTopics(PulsarAdmin admin) { jcommander.addCommand("expire-messages-all-subscriptions", new ExpireMessagesForAllSubscriptions()); jcommander.addCommand("create-partitioned-topic", new CreatePartitionedCmd()); + jcommander.addCommand("create-missed-partitions", new CreateMissedPartitionsCmd()); jcommander.addCommand("create", new CreateNonPartitionedCmd()); jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd()); jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd()); @@ -214,6 +215,21 @@ void run() throws Exception { } } + @Parameters(commandDescription = "Try to create partitions for partitioned topic. \n" + + "\t\t The partitions of partition topic has to be created, can be used by repair partitions when \n" + + "\t\t topic auto creation is disabled") + private class CreateMissedPartitionsCmd extends CliCommand { + + @Parameter(description = "persistent://tenant/namespace/topic\n", required = true) + private java.util.List params; + + @Override + void run() throws Exception { + String topic = validateTopicName(params); + topics.createMissedPartitions(topic); + } + } + @Parameters(commandDescription = "Create a non-partitioned topic.") private class CreateNonPartitionedCmd extends CliCommand { diff --git a/site2/docs/admin-api-partitioned-topics.md b/site2/docs/admin-api-partitioned-topics.md index fe0ab9853d35a..a6507e073b759 100644 --- a/site2/docs/admin-api-partitioned-topics.md +++ b/site2/docs/admin-api-partitioned-topics.md @@ -62,6 +62,34 @@ int numPartitions = 4; admin.persistentTopics().createPartitionedTopic(topicName, numPartitions); ``` +### Create missed partitions + +Try to create partitions for partitioned topic. The partitions of partition topic has to be created, +can be used by repair partitions when topic auto creation is disabled + +#### pulsar-admin + +You can create missed partitions using the [`create-missed-partitions`](reference-pulsar-admin.md#create-missed-partitions) +command and specifying the topic name as an argument. + +Here's an example: + +```shell +$ bin/pulsar-admin topics create-missed-partitions \ + persistent://my-tenant/my-namespace/my-topic \ +``` + +#### REST API + +{@inject: endpoint|POST|/admin/v2/persistent/:tenant/:namespace/:topic|operation/createMissedPartitions} + +#### Java + +```java +String topicName = "persistent://my-tenant/my-namespace/my-topic"; +admin.persistentTopics().createMissedPartitions(topicName); +``` + ### Get metadata Partitioned topics have metadata associated with them that you can fetch as a JSON object. diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index d9644270a84de..1ad8df78350e6 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -1603,6 +1603,7 @@ Subcommands * `offload` * `offload-status` * `create-partitioned-topic` +* `create-missed-partitions` * `delete-partitioned-topic` * `create` * `get-partitioned-topic-metadata` @@ -1704,6 +1705,15 @@ Options |---|---|---| |`-p`, `--partitions`|The number of partitions for the topic|0| +### `create-missed-partitions` +Try to create partitions for partitioned topic. The partitions of partition topic has to be created, +can be used by repair partitions when topic auto creation is disabled + +Usage +```bash +$ pulsar-admin topics create-missed-partitions persistent://tenant/namespace/topic +``` + ### `delete-partitioned-topic` Delete a partitioned topic. This will also delete all the partitions of the topic if they exist. From ff5379d4f0e78953fb2ecf7605f271720c73d193 Mon Sep 17 00:00:00 2001 From: lipenghui Date: Wed, 13 Nov 2019 11:36:00 +0800 Subject: [PATCH 4/6] Fix unit tests --- .../org/apache/zookeeper/MockZooKeeper.java | 2 +- .../admin/impl/PersistentTopicsBase.java | 34 +++++++++++-------- .../pulsar/broker/admin/AdminApiTest.java | 14 ++------ .../broker/admin/PersistentTopicsTest.java | 20 +++-------- .../broker/admin/v1/V1_AdminApiTest.java | 6 ++-- .../impl/PatternTopicsConsumerImplTest.java | 8 ++--- 6 files changed, 36 insertions(+), 48 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java index c417604d4885e..0b5cc015a3e5b 100644 --- a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java +++ b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java @@ -460,7 +460,7 @@ public void getChildren(final String path, boolean watcher, final Children2Callb List children = Lists.newArrayList(); for (String item : tree.tailMap(path).keySet()) { log.debug("Checking path {}", item); - if (!item.startsWith(path)) { + if (!item.startsWith(path) || !item.replace(path, "").contains("/")) { break; } else if (item.equals(path)) { continue; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 38eb8343ac693..f84931290310a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1827,24 +1827,28 @@ private CompletableFuture createSubscriptions(TopicName topicName, int num } admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats -> { - stats.subscriptions.keySet().forEach(subscription -> { - List> subscriptionFutures = new ArrayList<>(); - for (int i = partitionMetadata.partitions; i < numPartitions; i++) { - final String topicNamePartition = topicName.getPartition(i).toString(); + if (stats.subscriptions.size() == 0) { + result.complete(null); + } else { + stats.subscriptions.keySet().forEach(subscription -> { + List> subscriptionFutures = new ArrayList<>(); + for (int i = partitionMetadata.partitions; i < numPartitions; i++) { + final String topicNamePartition = topicName.getPartition(i).toString(); - subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition, - subscription, MessageId.latest)); - } + subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition, + subscription, MessageId.latest)); + } - FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> { - log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName); - result.complete(null); - }).exceptionally(ex -> { - log.warn("[{}] Failed to create subscriptions on new partitions for {}", clientAppId(), topicName, ex); - result.completeExceptionally(ex); - return null; + FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> { + log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName); + result.complete(null); + }).exceptionally(ex -> { + log.warn("[{}] Failed to create subscriptions on new partitions for {}", clientAppId(), topicName, ex); + result.completeExceptionally(ex); + return null; + }); }); - }); + } }).exceptionally(ex -> { if (ex.getCause() instanceof PulsarAdminException.NotFoundException) { // The first partition doesn't exist, so there are currently to subscriptions to recreate diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index ea96cecf380f7..42d8b973d2adc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -812,9 +812,8 @@ public void partitionedTopics(String topicName) throws Exception { assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 4); - // check if the virtual topic doesn't get created List topics = admin.topics().getList("prop-xyz/ns1"); - assertEquals(topics.size(), 0); + assertEquals(topics.size(), 4); assertEquals(admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/ns1/ds2").partitions, 0); @@ -826,15 +825,8 @@ public void partitionedTopics(String topicName) throws Exception { assertEquals(admin.topics().getPartitionedStats(partitionedTopicName, false).partitions.size(), 0); - try { - admin.topics().getSubscriptions(partitionedTopicName); - fail("should have failed"); - } catch (PulsarAdminException e) { - // ok - assertEquals(e.getStatusCode(), Status.NOT_FOUND.getStatusCode()); - } catch (Exception e) { - fail(e.getMessage()); - } + List subscriptions = admin.topics().getSubscriptions(partitionedTopicName); + assertEquals(subscriptions.size(), 0); // create consumer and subscription URL pulsarUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 300e3de173674..389ba199ee102 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -155,17 +155,7 @@ public void testGetSubscriptions() { // 3) Create the partitioned topic persistentTopics.createPartitionedTopic(testTenant, testNamespace, testLocalTopicName, 3); - // 4) Confirm that the topic partitions has not been created yet - response = mock(AsyncResponse.class); - persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0", - true); - errorCaptor = ArgumentCaptor.forClass(RestException.class); - verify(response, timeout(5000).times(1)).resume(errorCaptor.capture()); - Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(), - Response.Status.NOT_FOUND.getStatusCode()); - Assert.assertEquals(errorCaptor.getValue().getMessage(), "Topic partitions were not yet created"); - - // 5) Create a subscription + // 4) Create a subscription response = mock(AsyncResponse.class); persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true, (MessageIdImpl) MessageId.earliest, false); @@ -173,26 +163,26 @@ public void testGetSubscriptions() { verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); - // 6) Confirm that the subscription exists + // 5) Confirm that the subscription exists response = mock(AsyncResponse.class); persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0", true); verify(response, timeout(5000).times(1)).resume(Lists.newArrayList("test")); - // 7) Delete the subscription + // 6) Delete the subscription response = mock(AsyncResponse.class); persistentTopics.deleteSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true); responseCaptor = ArgumentCaptor.forClass(Response.class); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); - // 8) Confirm that the subscription does not exist + // 7) Confirm that the subscription does not exist response = mock(AsyncResponse.class); persistentTopics.getSubscriptions(response, testTenant, testNamespace, testLocalTopicName + "-partition-0", true); verify(response, timeout(5000).times(1)).resume(Lists.newArrayList()); - // 9) Delete the partitioned topic + // 8) Delete the partitioned topic response = mock(AsyncResponse.class); persistentTopics.deletePartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true, true); responseCaptor = ArgumentCaptor.forClass(Response.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index 24adb74a5a5e5..4dbd4480fc1ce 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -781,9 +781,8 @@ public void partitionedTopics(String topicName) throws Exception { assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 4); - // check if the virtual topic doesn't get created List topics = admin.topics().getList("prop-xyz/use/ns1"); - assertEquals(topics.size(), 0); + assertEquals(topics.size(), 4); assertEquals( admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/use/ns1/ds2").partitions, @@ -807,6 +806,9 @@ public void partitionedTopics(String topicName) throws Exception { fail(e.getMessage()); } + List subscriptions = admin.topics().getSubscriptions(partitionedTopicName); + assertEquals(subscriptions.size(), 1); + Consumer consumer1 = client.newConsumer().topic(partitionedTopicName).subscriptionName("my-sub-1") .subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 7413825d9668a..a5b20f3586929 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -487,11 +487,11 @@ public void testStartEmptyPatternConsumer() throws Exception { .receiverQueueSize(4) .subscribe(); - // 3. verify consumer get methods, to get 0 number of partitions and topics. + // 3. verify consumer get methods, to get 5 number of partitions and topics. assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 0); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getConsumers().size(), 0); - assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 0); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 5); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getConsumers().size(), 5); + assertEquals(((PatternMultiTopicsConsumerImpl) consumer).getTopics().size(), 2); // 4. create producer String messagePredicate = "my-message-" + key + "-"; From 6ae3c5c3b9a0c3ffc88cb79d4f874528ad80819c Mon Sep 17 00:00:00 2001 From: lipenghui Date: Mon, 18 Nov 2019 18:46:19 +0800 Subject: [PATCH 5/6] Fix unit tests --- .../org/apache/zookeeper/MockZooKeeper.java | 24 ++++++++++++------- ...onTest.java => PartitionCreationTest.java} | 2 +- 2 files changed, 16 insertions(+), 10 deletions(-) rename pulsar-broker/src/test/java/org/apache/pulsar/client/api/{ConsumerCreationTest.java => PartitionCreationTest.java} (99%) diff --git a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java index 0b5cc015a3e5b..f4160edf3737b 100644 --- a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java +++ b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java @@ -425,10 +425,13 @@ public List getChildren(String path, boolean watch) throws KeeperExcepti if (path.length() >= item.length()) { continue; } - - String child = item.substring(path.length() + 1); - if (!child.contains("/")) { - children.add(child); + String child = item.substring(path.length()); + if (child.indexOf("/") == 0) { + child = child.substring(1); + log.debug("child: '{}'", child); + if (!child.contains("/")) { + children.add(child); + } } } } @@ -460,15 +463,18 @@ public void getChildren(final String path, boolean watcher, final Children2Callb List children = Lists.newArrayList(); for (String item : tree.tailMap(path).keySet()) { log.debug("Checking path {}", item); - if (!item.startsWith(path) || !item.replace(path, "").contains("/")) { + if (!item.startsWith(path)) { break; } else if (item.equals(path)) { continue; } else { - String child = item.substring(path.length() + 1); - log.debug("child: '{}'", child); - if (!child.contains("/")) { - children.add(child); + String child = item.substring(path.length()); + if (child.indexOf("/") == 0) { + child = child.substring(1); + log.debug("child: '{}'", child); + if (!child.contains("/")) { + children.add(child); + } } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java similarity index 99% rename from pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java index 52cf915b26267..2647bbba380a8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java @@ -39,7 +39,7 @@ import static org.apache.pulsar.broker.admin.AdminResource.jsonMapper; -public class ConsumerCreationTest extends ProducerConsumerBase { +public class PartitionCreationTest extends ProducerConsumerBase { @DataProvider(name = "topicDomainProvider") public Object[][] topicDomainProvider() { From 06bb67f0c1b5075ce0a4c9fa19cab2f540f1ca5c Mon Sep 17 00:00:00 2001 From: lipenghui Date: Tue, 24 Dec 2019 11:51:36 +0800 Subject: [PATCH 6/6] Fix comments --- .../pulsar/broker/admin/AdminResource.java | 58 +++++++++++++++---- .../admin/impl/PersistentTopicsBase.java | 13 ++--- .../broker/admin/v1/NonPersistentTopics.java | 4 +- .../broker/admin/v2/NonPersistentTopics.java | 4 +- .../broker/admin/v2/PersistentTopics.java | 2 +- .../pulsar/broker/service/BrokerService.java | 15 +++-- 6 files changed, 67 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 228e21ec295b6..e9d559ef014e6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -30,6 +30,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import javax.servlet.ServletContext; @@ -66,9 +68,9 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.zookeeper.ZooKeeperCache; -import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer; import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache; import org.apache.pulsar.zookeeper.ZooKeeperDataCache; +import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; @@ -111,6 +113,11 @@ protected void zkCreateOptimistic(String path, byte[] content) throws Exception ZkUtils.createFullPathOptimistic(globalZk(), path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } + protected void zkCreateOptimisticAsync(String path, byte[] content, AsyncCallback.StringCallback callback) { + ZkUtils.asyncCreateFullPathOptimistic(globalZk(), path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, callback, null); + } + protected boolean zkPathExists(String path) throws KeeperException, InterruptedException { Stat stat = globalZk().exists(path, false); if (null != stat) { @@ -119,6 +126,21 @@ protected boolean zkPathExists(String path) throws KeeperException, InterruptedE return false; } + protected void zkSync(String path) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger rc = new AtomicInteger(KeeperException.Code.OK.intValue()); + globalZk().sync(path, (rc2, s, ctx) -> { + if (KeeperException.Code.OK.intValue() != rc2) { + rc.set(rc2); + } + latch.countDown(); + }, null); + latch.await(); + if (KeeperException.Code.OK.intValue() != rc.get()) { + throw KeeperException.create(KeeperException.Code.get(rc.get())); + } + } + /** * Get the domain of the topic (whether it's persistent or non-persistent) */ @@ -233,23 +255,37 @@ protected List getListOfNamespaces(String property) throws Exception { return namespaces; } - protected void tryCreatePartitions(int numPartitions) { + protected void tryCreatePartitionsAsync(int numPartitions) { if (!topicName.isPersistent()) { return; } for (int i = 0; i < numPartitions; i++) { - try { - zkCreateOptimistic(ZkAdminPaths.managedLedgerPath(topicName.getPartition(i)), new byte[0]); - } catch (KeeperException.NodeExistsException e) { - log.warn("[{}] Topic partition is already existing, doing nothing {}", clientAppId(), topicName.getPartition(i)); - } catch (KeeperException.BadVersionException e) { - log.warn("[{}] Fail to create topic partition {}: concurrent modification", clientAppId(), topicName.getPartition(i)); - } catch (Exception e) { - log.error("[{}] Fail to create topic partition {}", clientAppId(), topicName.getPartition(i), e); - } + tryCreatePartitionAsync(i); } } + private void tryCreatePartitionAsync(final int partition) { + zkCreateOptimisticAsync(ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0], + (rc, s, o, s1) -> { + if (KeeperException.Code.OK.intValue() == rc) { + if (log.isDebugEnabled()) { + log.debug("[{}] Topic partition {} created.", clientAppId(), + topicName.getPartition(partition)); + } + } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { + log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(), + topicName.getPartition(partition)); + } else if (KeeperException.Code.BADVERSION.intValue() == rc) { + log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.", + clientAppId(), topicName.getPartition(partition)); + tryCreatePartitionAsync(partition); + } else { + log.error("[{}] Fail to create topic partition {}", clientAppId(), + topicName.getPartition(partition), KeeperException.create(KeeperException.Code.get(rc))); + } + }); + } + protected NamespaceName namespaceName; protected void validateNamespaceName(String property, String namespace) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index f84931290310a..3379c38532e09 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -116,7 +116,6 @@ public class PersistentTopicsBase extends AdminResource { private static final Logger log = LoggerFactory.getLogger(PersistentTopicsBase.class); - public static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000; private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10; private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v"; private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1, 21); @@ -394,9 +393,9 @@ protected void internalCreatePartitionedTopic(int numPartitions) { String path = ZkAdminPaths.partitionedTopicPath(topicName); byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); zkCreateOptimistic(path, data); - tryCreatePartitions(numPartitions); - // we wait for the data to be synced in all quorums and the observers - Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS); + tryCreatePartitionsAsync(numPartitions); + // Sync data to all quorums and the observers + zkSync(path); log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName); } catch (KeeperException.NodeExistsException e) { log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName); @@ -517,7 +516,7 @@ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateL protected void internalCreateMissedPartitions() { PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, false, false); if (metadata != null) { - tryCreatePartitions(metadata.partitions); + tryCreatePartitionsAsync(metadata.partitions); } } @@ -608,8 +607,8 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boole try { globalZk().delete(path, -1); globalZkCache().invalidate(path); - // we wait for the data to be synced in all quorums and the observers - Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS); + // Sync data to all quorums and the observers + zkSync(path); log.info("[{}] Deleted partitioned topic {}", clientAppId(), topicName); asyncResponse.resume(Response.noContent().build()); return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java index 76671672d42b8..0179847590bb7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java @@ -144,8 +144,8 @@ public void createPartitionedTopic(@PathParam("property") String property, @Path topicName.getEncodedLocalName()); byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); zkCreateOptimistic(path, data); - // we wait for the data to be synced in all quorums and the observers - Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS); + // Sync data to all quorums and the observers + zkSync(path); log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName); } catch (KeeperException.NodeExistsException e) { log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index eeaeb96a649ee..a41db33b9f19c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -190,8 +190,8 @@ public void createPartitionedTopic( topicName.getEncodedLocalName()); byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); zkCreateOptimistic(path, data); - // we wait for the data to be synced in all quorums and the observers - Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS); + // Sync data to all quorums and the observers + zkSync(path); log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName); } catch (KeeperException.NodeExistsException e) { log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 6568687f97baa..08411dd5a33d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -273,7 +273,7 @@ public void updatePartitionedTopic( @POST @Path("/{tenant}/{namespace}/{topic}/createMissedPartitions") - @ApiOperation(value = "Create missed partitions of an existing partitioned topic.", notes = "It only try to create missed partitions of existing non-global partitioned-topic") + @ApiOperation(value = "Create missed partitions of an existing partitioned topic.", notes = "This is a best-effort operation for create missed partitions of existing non-global partitioned-topic and does't throw any exceptions when create failed") @ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"), @ApiResponse(code = 403, message = "Don't have admin permission"), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d75731c3ec5ae..ab8da42eef91c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -79,7 +79,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; -import org.apache.bookkeeper.util.SafeRunnable; import org.apache.bookkeeper.util.ZkUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -87,7 +86,6 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; -import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.cache.ConfigurationCacheService; @@ -1717,10 +1715,15 @@ private CompletableFuture createDefaultPartitionedTopi partitionedTopicPath(topicName), content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path1, ctx, name) -> { if (rc == KeeperException.Code.OK.intValue()) { - // we wait for the data to be synced in all quorums and the observers - executor().schedule( - SafeRunnable.safeRun(() -> partitionedTopicFuture.complete(configMetadata)), - PersistentTopicsBase.PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS, TimeUnit.MILLISECONDS); + // Sync data to all quorums and the observers + pulsar.getGlobalZkCache().getZooKeeper().sync(partitionedTopicPath(topicName), + (rc2, path2, ctx2) -> { + if (rc2 == KeeperException.Code.OK.intValue()) { + partitionedTopicFuture.complete(configMetadata); + } else { + partitionedTopicFuture.completeExceptionally(KeeperException.create(rc2)); + } + }, null); } else { partitionedTopicFuture.completeExceptionally(KeeperException.create(rc)); }