diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 78e3d2fd1963d..b8841c1ef705f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -856,7 +856,7 @@ protected void internalRemoveAutoSubscriptionCreation(AsyncResponse asyncRespons internalSetAutoSubscriptionCreation(asyncResponse, null); } - protected void internalModifyDeduplication(boolean enableDeduplication) { + protected void internalModifyDeduplication(Boolean enableDeduplication) { validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); updatePolicies(path(POLICIES, namespaceName.toString()), policies ->{ @@ -1999,6 +1999,11 @@ protected void internalSetMaxProducersPerTopic(Integer maxProducersPerTopic) { } } + protected Boolean internalGetDeduplication() { + validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ); + return getNamespacePolicies(namespaceName).deduplicationEnabled; + } + protected Integer internalGetMaxConsumersPerTopic() { validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ); return getNamespacePolicies(namespaceName).max_consumers_per_topic; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 159a6be03c989..b6844cba0485a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2561,7 +2561,20 @@ protected void internalSetBacklogQuota(AsyncResponse asyncResponse, }); } - protected CompletableFuture internalSetDeduplicationEnabled(Boolean enabled) { + protected CompletableFuture internalGetDeduplication(boolean applied) { + Boolean deduplicationEnabled = getTopicPolicies(topicName) + .map(TopicPolicies::getDeduplicationEnabled) + .orElseGet(() -> { + if (applied) { + Boolean enabled = getNamespacePolicies(namespaceName).deduplicationEnabled; + return enabled == null ? config().isBrokerDeduplicationEnabled() : enabled; + } + return null; + }); + return CompletableFuture.completedFuture(deduplicationEnabled); + } + + protected CompletableFuture internalSetDeduplication(Boolean enabled) { TopicPolicies topicPolicies = null; try { topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index a9ff3ddf297c2..79c4cc4f82e0c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -336,6 +336,16 @@ public void setSubscriptionExpirationTime(@PathParam("tenant") String tenant, internalSetSubscriptionExpirationTime(expirationTime); } + @GET + @Path("/{tenant}/{namespace}/deduplication") + @ApiOperation(value = "Get broker side deduplication for all topics in a namespace") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) + public Boolean getDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + return internalGetDeduplication(); + } + @POST @Path("/{tenant}/{namespace}/deduplication") @ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace") @@ -349,6 +359,16 @@ public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam(" internalModifyDeduplication(enableDeduplication); } + @DELETE + @Path("/{tenant}/{namespace}/deduplication") + @ApiOperation(value = "Remove broker side deduplication for all topics in a namespace") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) + public void removeDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + internalModifyDeduplication(null); + } + @POST @Path("/{tenant}/{namespace}/autoTopicCreation") @ApiOperation(value = "Override broker's allowAutoTopicCreation setting for a namespace") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index f98a2ee073a25..1c41475fa5d58 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1657,17 +1657,23 @@ public void removeMessageTTL(@Suspended final AsyncResponse asyncResponse, @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")}) - public void getDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse, + public void getDeduplication(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied) { validateTopicName(tenant, namespace, encodedTopic); - TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies()); - if (topicPolicies.isDeduplicationSet()) { - asyncResponse.resume(topicPolicies.getDeduplicationEnabled()); - } else { - asyncResponse.resume(Response.noContent().build()); - } + internalGetDeduplication(applied).whenComplete((res, ex) -> { + if (ex instanceof RestException) { + log.error("Failed get Deduplication", ex); + asyncResponse.resume(ex); + } else if (ex != null) { + log.error("Failed get Deduplication", ex); + asyncResponse.resume(new RestException(ex)); + } else { + asyncResponse.resume(res); + } + }); } @POST @@ -1677,7 +1683,7 @@ public void getDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")}) - public void setDeduplicationEnabled( + public void setDeduplication( @Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @@ -1685,7 +1691,7 @@ public void setDeduplicationEnabled( @ApiParam(value = "DeduplicationEnabled policies for the specified topic") Boolean enabled) { validateTopicName(tenant, namespace, encodedTopic); - internalSetDeduplicationEnabled(enabled).whenComplete((r, ex) -> { + internalSetDeduplication(enabled).whenComplete((r, ex) -> { if (ex instanceof RestException) { log.error("Failed updated deduplication", ex); asyncResponse.resume(ex); @@ -1706,12 +1712,12 @@ public void setDeduplicationEnabled( @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) - public void removeDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse, + public void removeDeduplication(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic) { validateTopicName(tenant, namespace, encodedTopic); - setDeduplicationEnabled(asyncResponse, tenant, namespace, encodedTopic, null); + setDeduplication(asyncResponse, tenant, namespace, encodedTopic, null); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 56689a27b3f5b..5ad45e30b367a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2738,6 +2738,8 @@ public void onUpdate(TopicPolicies policies) { replicators.forEach((name, replicator) -> replicator.getRateLimiter() .ifPresent(DispatchRateLimiter::updateDispatchRate)); updateUnackedMessagesExceededOnConsumer(null); + + checkDeduplicationStatus(); } private Optional getNamespacePolicies() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java index fdea26499ad20..4a37d638ca7d0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java @@ -38,7 +38,9 @@ import org.testng.annotations.Test; import org.awaitility.Awaitility; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; @@ -86,6 +88,121 @@ public void testDuplicationApi() throws Exception { assertNull(admin.topics().getDeduplicationEnabled(topicName)); } + @Test(timeOut = 10000) + public void testTopicDuplicationApi2() throws Exception { + final String topicName = testTopic + UUID.randomUUID().toString(); + admin.topics().createPartitionedTopic(topicName, 3); + waitCacheInit(topicName); + Boolean enabled = admin.topics().getDeduplicationStatus(topicName); + assertNull(enabled); + + admin.topics().setDeduplicationStatus(topicName, true); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> admin.topics().getDeduplicationStatus(topicName) != null); + Assert.assertEquals(admin.topics().getDeduplicationStatus(topicName), true); + + admin.topics().removeDeduplicationStatus(topicName); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> admin.topics().getMaxUnackedMessagesOnSubscription(topicName) == null); + assertNull(admin.topics().getDeduplicationStatus(topicName)); + } + + @Test(timeOut = 10000) + public void testTopicDuplicationAppliedApi() throws Exception { + final String topicName = testTopic + UUID.randomUUID().toString(); + waitCacheInit(topicName); + assertNull(admin.namespaces().getDeduplicationStatus(myNamespace)); + assertNull(admin.topics().getDeduplicationStatus(topicName)); + assertEquals(admin.topics().getDeduplicationStatus(topicName, true).booleanValue(), + conf.isBrokerDeduplicationEnabled()); + + admin.namespaces().setDeduplicationStatus(myNamespace, false); + Awaitility.await().untilAsserted(() -> assertFalse(admin.topics().getDeduplicationStatus(topicName, true))); + admin.topics().setDeduplicationStatus(topicName, true); + Awaitility.await().untilAsserted(() -> assertTrue(admin.topics().getDeduplicationStatus(topicName, true))); + + admin.topics().removeDeduplicationStatus(topicName); + Awaitility.await().untilAsserted(() -> assertFalse(admin.topics().getDeduplicationStatus(topicName, true))); + admin.namespaces().removeDeduplicationStatus(myNamespace); + Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getDeduplicationStatus(topicName, true).booleanValue(), + conf.isBrokerDeduplicationEnabled())); + } + + @Test(timeOut = 30000) + public void testDeduplicationPriority() throws Exception { + final String topicName = testTopic + UUID.randomUUID().toString(); + final String producerName = "my-producer"; + final int maxMsgNum = 5; + waitCacheInit(topicName); + //1) Start up producer and send msg.We specified the max sequenceId + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName) + .producerName(producerName).create(); + long maxSeq = sendMessageAndGetMaxSeq(maxMsgNum, producer); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get(); + MessageDeduplication messageDeduplication = persistentTopic.getMessageDeduplication(); + //broker-level deduplication is enabled in setup() by default + checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq); + //disabled in namespace-level + admin.namespaces().setDeduplicationStatus(myNamespace, false); + Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getDeduplicationStatus(myNamespace))); + sendMessageAndGetMaxSeq(maxMsgNum, producer); + checkDeduplicationDisabled(producerName, messageDeduplication); + //enabled in topic-level + admin.topics().setDeduplicationStatus(topicName, true); + Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getDeduplicationStatus(topicName))); + Awaitility.await().untilAsserted(() -> assertTrue(messageDeduplication.isEnabled())); + long maxSeq2 = sendMessageAndGetMaxSeq(maxMsgNum, producer); + checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq2); + //remove topic-level, use namespace-level + admin.topics().removeDeduplicationStatus(topicName); + Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getDeduplicationStatus(topicName))); + Awaitility.await().untilAsserted(() -> assertFalse(messageDeduplication.isEnabled())); + producer.newMessage().value("msg").sequenceId(1).send(); + checkDeduplicationDisabled(producerName, messageDeduplication); + //remove namespace-level , use broker-level + admin.namespaces().removeDeduplicationStatus(myNamespace); + Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getDeduplicationStatus(myNamespace))); + Awaitility.await().untilAsserted(() -> assertTrue(messageDeduplication.isEnabled())); + long maxSeq3 = sendMessageAndGetMaxSeq(maxMsgNum, producer); + checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq3); + } + + private long sendMessageAndGetMaxSeq(int maxMsgNum, Producer producer) throws Exception{ + long seq = System.nanoTime(); + for (int i = 0; i <= maxMsgNum; i++) { + producer.newMessage().value("msg-" + i).sequenceId(seq + i).send(); + } + return seq + maxMsgNum; + } + + private void checkDeduplicationDisabled(String producerName, MessageDeduplication messageDeduplication) throws Exception { + messageDeduplication.checkStatus().whenComplete((res, ex) -> { + if (ex != null) { + fail("should not fail"); + } + assertEquals(messageDeduplication.getLastPublishedSequenceId(producerName), -1); + assertEquals(messageDeduplication.highestSequencedPersisted.size(), 0); + assertEquals(messageDeduplication.highestSequencedPushed.size(), 0); + }).get(); + } + + private void checkDeduplicationEnabled(String producerName, MessageDeduplication messageDeduplication, + long maxSeq) throws Exception { + messageDeduplication.checkStatus().whenComplete((res, ex) -> { + if (ex != null) { + fail("should not fail"); + } + assertNotNull(messageDeduplication.highestSequencedPersisted); + assertNotNull(messageDeduplication.highestSequencedPushed); + long seqId = messageDeduplication.getLastPublishedSequenceId(producerName); + assertEquals(seqId, maxSeq); + assertEquals(messageDeduplication.highestSequencedPersisted.get(producerName).longValue(), maxSeq); + assertEquals(messageDeduplication.highestSequencedPushed.get(producerName).longValue(), maxSeq); + }).get(); + } + @Test(timeOut = 10000) public void testDuplicationSnapshotApi() throws Exception { final String topicName = testTopic + UUID.randomUUID().toString(); @@ -192,27 +309,13 @@ public void testDuplicationMethod() throws Exception { @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName) .producerName(producerName).create(); - long seq = System.currentTimeMillis(); - for (int i = 0; i <= maxMsgNum; i++) { - producer.newMessage().value("msg-" + i).sequenceId(seq + i).send(); - } - long maxSeq = seq + maxMsgNum; + long maxSeq = sendMessageAndGetMaxSeq(maxMsgNum, producer); //2) Max sequenceId should be recorded correctly CompletableFuture> completableFuture = pulsar.getBrokerService().getTopics().get(topicName); Topic topic = completableFuture.get(1, TimeUnit.SECONDS).get(); PersistentTopic persistentTopic = (PersistentTopic) topic; MessageDeduplication messageDeduplication = persistentTopic.getMessageDeduplication(); - messageDeduplication.checkStatus().whenComplete((res, ex) -> { - if (ex != null) { - fail("should not fail"); - } - assertNotNull(messageDeduplication.highestSequencedPersisted); - assertNotNull(messageDeduplication.highestSequencedPushed); - long seqId = messageDeduplication.getLastPublishedSequenceId(producerName); - assertEquals(seqId, maxSeq); - assertEquals(messageDeduplication.highestSequencedPersisted.get(producerName).longValue(), maxSeq); - assertEquals(messageDeduplication.highestSequencedPushed.get(producerName).longValue(), maxSeq); - }).get(); + checkDeduplicationEnabled(producerName, messageDeduplication, maxSeq); //3) disable the deduplication check admin.topics().enableDeduplication(topicName, false); Awaitility.await().atMost(5, TimeUnit.SECONDS) @@ -221,14 +324,7 @@ public void testDuplicationMethod() throws Exception { producer.newMessage().value("msg-" + i).sequenceId(maxSeq + i).send(); } //4) Max sequenceId record should be clear - messageDeduplication.checkStatus().whenComplete((res, ex) -> { - if (ex != null) { - fail("should not fail"); - } - assertEquals(messageDeduplication.getLastPublishedSequenceId(producerName), -1); - assertEquals(messageDeduplication.highestSequencedPersisted.size(), 0); - assertEquals(messageDeduplication.highestSequencedPushed.size(), 0); - }).get(); + checkDeduplicationDisabled(producerName, messageDeduplication); } @@ -315,7 +411,7 @@ public void testNamespacePolicyApi() throws Exception { } @Test(timeOut = 30000) - private void testNamespacePolicyTakeSnapshot() throws Exception { + public void testNamespacePolicyTakeSnapshot() throws Exception { resetConfig(); conf.setBrokerDeduplicationEnabled(true); conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1); @@ -367,7 +463,7 @@ private void testNamespacePolicyTakeSnapshot() throws Exception { } @Test(timeOut = 30000) - private void testDisableNamespacePolicyTakeSnapshot() throws Exception { + public void testDisableNamespacePolicyTakeSnapshot() throws Exception { resetConfig(); conf.setBrokerDeduplicationEnabled(true); conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java index 347903b540acf..bdb0cc4726a57 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java @@ -19,9 +19,13 @@ package org.apache.pulsar.client.api; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + import java.util.concurrent.TimeUnit; +import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -40,6 +44,18 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Test + public void testNamespaceDeduplicationApi() throws Exception { + final String namespace = "my-property/my-ns"; + assertNull(admin.namespaces().getDeduplicationStatus(namespace)); + admin.namespaces().setDeduplicationStatus(namespace, true); + Awaitility.await().untilAsserted(() -> assertTrue(admin.namespaces().getDeduplicationStatus(namespace))); + admin.namespaces().setDeduplicationStatus(namespace, false); + Awaitility.await().untilAsserted(() -> assertFalse(admin.namespaces().getDeduplicationStatus(namespace))); + admin.namespaces().removeDeduplicationStatus(namespace); + Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getDeduplicationStatus(namespace))); + } + @Test public void testProducerSequenceAfterReconnect() throws Exception { String topic = "persistent://my-property/my-ns/testProducerSequenceAfterReconnect"; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 226877fcc1f81..13230243379c7 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -1085,6 +1085,33 @@ CompletableFuture> getAntiAffinityNamespacesAsync( */ CompletableFuture deleteNamespaceAntiAffinityGroupAsync(String namespace); + /** + * Remove the deduplication status for all topics within a namespace. + * @param namespace + * @throws PulsarAdminException + */ + void removeDeduplicationStatus(String namespace) throws PulsarAdminException; + + /** + * Get the deduplication status for all topics within a namespace asynchronously. + * @param namespace + * @return + */ + CompletableFuture removeDeduplicationStatusAsync(String namespace); + /** + * Get the deduplication status for all topics within a namespace . + * @param namespace + * @return + * @throws PulsarAdminException + */ + Boolean getDeduplicationStatus(String namespace) throws PulsarAdminException; + + /** + * Get the deduplication status for all topics within a namespace asynchronously. + * @param namespace + * @return + */ + CompletableFuture getDeduplicationStatusAsync(String namespace); /** * Set the deduplication status for all topics within a namespace. *

diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 1cbf890529a64..b5cc4ac41a1ca 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -2118,6 +2118,7 @@ void setInactiveTopicPolicies(String topic * @return * @throws PulsarAdminException */ + @Deprecated Boolean getDeduplicationEnabled(String topic) throws PulsarAdminException; /** @@ -2125,14 +2126,45 @@ void setInactiveTopicPolicies(String topic * @param topic * @return */ + @Deprecated CompletableFuture getDeduplicationEnabledAsync(String topic); + /** + * get deduplication enabled of a topic. + * @param topic + * @return + * @throws PulsarAdminException + */ + Boolean getDeduplicationStatus(String topic) throws PulsarAdminException; + + /** + * get deduplication enabled of a topic asynchronously. + * @param topic + * @return + */ + CompletableFuture getDeduplicationStatusAsync(String topic); + /** + * get applied deduplication enabled of a topic. + * @param topic + * @return + * @throws PulsarAdminException + */ + Boolean getDeduplicationStatus(String topic, boolean applied) throws PulsarAdminException; + + /** + * get applied deduplication enabled of a topic asynchronously. + * @param topic + * @return + */ + CompletableFuture getDeduplicationStatusAsync(String topic, boolean applied); + /** * set deduplication enabled of a topic. * @param topic * @param enabled * @throws PulsarAdminException */ + @Deprecated void enableDeduplication(String topic, boolean enabled) throws PulsarAdminException; /** @@ -2141,13 +2173,31 @@ void setInactiveTopicPolicies(String topic * @param enabled * @return */ + @Deprecated CompletableFuture enableDeduplicationAsync(String topic, boolean enabled); + /** + * set deduplication enabled of a topic. + * @param topic + * @param enabled + * @throws PulsarAdminException + */ + void setDeduplicationStatus(String topic, boolean enabled) throws PulsarAdminException; + + /** + * set deduplication enabled of a topic asynchronously. + * @param topic + * @param enabled + * @return + */ + CompletableFuture setDeduplicationStatusAsync(String topic, boolean enabled); + /** * remove deduplication enabled of a topic. * @param topic * @throws PulsarAdminException */ + @Deprecated void disableDeduplication(String topic) throws PulsarAdminException; /** @@ -2155,8 +2205,23 @@ void setInactiveTopicPolicies(String topic * @param topic * @return */ + @Deprecated CompletableFuture disableDeduplicationAsync(String topic); + /** + * remove deduplication enabled of a topic. + * @param topic + * @throws PulsarAdminException + */ + void removeDeduplicationStatus(String topic) throws PulsarAdminException; + + /** + * remove deduplication enabled of a topic asynchronously. + * @param topic + * @return + */ + CompletableFuture removeDeduplicationStatusAsync(String topic); + /** * Set message-dispatch-rate (topic can dispatch this many messages per second). * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 40ef320c653dd..1b70f79d4a2e6 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -869,6 +869,63 @@ public CompletableFuture deleteNamespaceAntiAffinityGroupAsync(String name return asyncDeleteRequest(path); } + @Override + public void removeDeduplicationStatus(String namespace) throws PulsarAdminException { + try { + removeDeduplicationStatusAsync(namespace) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture removeDeduplicationStatusAsync(String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "deduplication"); + return asyncDeleteRequest(path); + } + + @Override + public Boolean getDeduplicationStatus(String namespace) throws PulsarAdminException { + try { + return getDeduplicationStatusAsync(namespace) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getDeduplicationStatusAsync(String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "deduplication"); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(Boolean enabled) { + future.complete(enabled); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + @Override public void setDeduplicationStatus(String namespace, boolean enableDeduplication) throws PulsarAdminException { try { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index e4ca396cfdaa9..4e5ee73e80360 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1848,6 +1848,51 @@ public void failed(Throwable throwable) { return future; } + @Override + public Boolean getDeduplicationStatus(String topic) throws PulsarAdminException { + return getDeduplicationStatus(topic, false); + } + + @Override + public CompletableFuture getDeduplicationStatusAsync(String topic) { + return getDeduplicationStatusAsync(topic, false); + } + + @Override + public Boolean getDeduplicationStatus(String topic, boolean applied) throws PulsarAdminException { + try { + return getDeduplicationStatusAsync(topic, applied). + get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getDeduplicationStatusAsync(String topic, boolean applied) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "deduplicationEnabled"); + path = path.queryParam("applied", applied); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, new InvocationCallback() { + @Override + public void completed(Boolean enabled) { + future.complete(enabled); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + @Override public void enableDeduplication(String topic, boolean enabled) throws PulsarAdminException { try { @@ -1870,6 +1915,28 @@ public CompletableFuture enableDeduplicationAsync(String topic, boolean en return asyncPostRequest(path, Entity.entity(enabled, MediaType.APPLICATION_JSON)); } + @Override + public void setDeduplicationStatus(String topic, boolean enabled) throws PulsarAdminException { + try { + enableDeduplicationAsync(topic, enabled). + get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture setDeduplicationStatusAsync(String topic, boolean enabled) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "deduplicationEnabled"); + return asyncPostRequest(path, Entity.entity(enabled, MediaType.APPLICATION_JSON)); + } + @Override public void disableDeduplication(String topic) throws PulsarAdminException { try { @@ -1892,6 +1959,27 @@ public CompletableFuture disableDeduplicationAsync(String topic) { return asyncDeleteRequest(path); } + @Override + public void removeDeduplicationStatus(String topic) throws PulsarAdminException { + try { + removeDeduplicationStatusAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture removeDeduplicationStatusAsync(String topic) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "deduplicationEnabled"); + return asyncDeleteRequest(path); + } + @Override public OffloadPolicies getOffloadPolicies(String topic) throws PulsarAdminException { return getOffloadPolicies(topic, false); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index ee098c109d89f..e7b201702038a 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -365,8 +365,12 @@ public void namespaces() throws Exception { namespaces.run(split("set-subscription-expiration-time myprop/clust/ns1 -t 60")); verify(mockNamespaces).setSubscriptionExpirationTime("myprop/clust/ns1", 60); + namespaces.run(split("get-deduplication myprop/clust/ns1")); + verify(mockNamespaces).getDeduplicationStatus("myprop/clust/ns1"); namespaces.run(split("set-deduplication myprop/clust/ns1 --enable")); verify(mockNamespaces).setDeduplicationStatus("myprop/clust/ns1", true); + namespaces.run(split("remove-deduplication myprop/clust/ns1")); + verify(mockNamespaces).removeDeduplicationStatus("myprop/clust/ns1"); namespaces.run(split("set-auto-topic-creation myprop/clust/ns1 -e -t non-partitioned")); verify(mockNamespaces).setAutoTopicCreation("myprop/clust/ns1", @@ -773,7 +777,10 @@ public void topics() throws Exception { verify(mockTopics).enableDeduplication("persistent://myprop/clust/ns1/ds1", false); cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable")); - verify(mockTopics, times(2)).enableDeduplication("persistent://myprop/clust/ns1/ds1", false); + verify(mockTopics).setDeduplicationStatus("persistent://myprop/clust/ns1/ds1", false); + + cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1"); cmdTopics.run(split("get-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1"); @@ -793,9 +800,9 @@ public void topics() throws Exception { verify(mockTopics).removeReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1"); cmdTopics.run(split("get-deduplication-enabled persistent://myprop/clust/ns1/ds1")); - verify(mockTopics).getDeduplicationEnabled("persistent://myprop/clust/ns1/ds1"); + verify(mockTopics).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1"); cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1")); - verify(mockTopics, times(2)).getDeduplicationEnabled("persistent://myprop/clust/ns1/ds1"); + verify(mockTopics, times(2)).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1"); cmdTopics.run(split("get-offload-policies persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getOffloadPolicies("persistent://myprop/clust/ns1/ds1", false); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 4b05ae0806cc4..fcf6a99aaf33a 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -478,6 +478,29 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get Deduplication for a namespace") + private class GetDeduplication extends CliCommand { + @Parameter(description = "tenant/namespace\n", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + print(getAdmin().namespaces().getDeduplicationStatus(namespace)); + } + } + + @Parameters(commandDescription = "Remove Deduplication for a namespace") + private class RemoveDeduplication extends CliCommand { + @Parameter(description = "tenant/namespace\n", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + getAdmin().namespaces().removeDeduplicationStatus(namespace); + } + } @Parameters(commandDescription = "Enable or disable deduplication for a namespace") private class SetDeduplication extends CliCommand { @@ -2014,6 +2037,8 @@ public CmdNamespaces(Supplier admin) { jcommander.addCommand("delete-anti-affinity-group", new DeleteAntiAffinityGroup()); jcommander.addCommand("set-deduplication", new SetDeduplication()); + jcommander.addCommand("get-deduplication", new GetDeduplication()); + jcommander.addCommand("remove-deduplication", new RemoveDeduplication()); jcommander.addCommand("set-auto-topic-creation", new SetAutoTopicCreation()); jcommander.addCommand("remove-auto-topic-creation", new RemoveAutoTopicCreation()); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 0549d7aa4cc83..76297a342489b 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -131,10 +131,11 @@ public CmdTopics(Supplier admin) { //deprecated commands jcommander.addCommand("enable-deduplication", new EnableDeduplication()); jcommander.addCommand("disable-deduplication", new DisableDeduplication()); - jcommander.addCommand("get-deduplication-enabled", new GetDeduplicationEnabled()); + jcommander.addCommand("get-deduplication-enabled", new GetDeduplicationStatus()); - jcommander.addCommand("set-deduplication", new SetDeduplication()); - jcommander.addCommand("get-deduplication", new GetDeduplicationEnabled()); + jcommander.addCommand("set-deduplication", new SetDeduplicationStatus()); + jcommander.addCommand("get-deduplication", new GetDeduplicationStatus()); + jcommander.addCommand("remove-deduplication", new RemoveDeduplicationStatus()); jcommander.addCommand("get-deduplication-snapshot-interval", new GetDeduplicationSnapshotInterval()); jcommander.addCommand("set-deduplication-snapshot-interval", new SetDeduplicationSnapshotInterval()); @@ -1346,7 +1347,7 @@ void run() throws PulsarAdminException { } @Parameters(commandDescription = "Enable or disable deduplication for a topic") - private class SetDeduplication extends CliCommand { + private class SetDeduplicationStatus extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) private java.util.List params; @@ -1363,19 +1364,31 @@ void run() throws PulsarAdminException { if (enable == disable) { throw new ParameterException("Need to specify either --enable or --disable"); } - getAdmin().topics().enableDeduplication(persistentTopic, enable); + getAdmin().topics().setDeduplicationStatus(persistentTopic, enable); } } @Parameters(commandDescription = "Get the deduplication policy for a topic") - private class GetDeduplicationEnabled extends CliCommand { + private class GetDeduplicationStatus extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) private java.util.List params; @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); - print(getAdmin().topics().getDeduplicationEnabled(persistentTopic)); + print(getAdmin().topics().getDeduplicationStatus(persistentTopic)); + } + } + + @Parameters(commandDescription = "Remove the deduplication policy for a topic") + private class RemoveDeduplicationStatus extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + getAdmin().topics().removeDeduplicationStatus(persistentTopic); } }