diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index eda51045dd259..e5634f6a6ce82 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -815,6 +815,21 @@ protected CompletableFuture internalSetMaxUnackedMessagesOnSubscription(In return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); } + protected CompletableFuture internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum) { + TopicPolicies topicPolicies = null; + try { + topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName); + } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { + log.error("Topic {} policies cache have not init.", topicName); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init")); + } + if (topicPolicies == null) { + topicPolicies = new TopicPolicies(); + } + topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + } + private void internalUnloadNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { Topic topic; try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 3139b9c165dbd..21147233104e6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -250,6 +250,69 @@ public void createNonPartitionedTopic( internalCreateNonPartitionedTopic(authoritative); } + @GET + @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer") + @ApiOperation(value = "Get max unacked messages per consumer config on a topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), + @ApiResponse(code = 500, message = "Internal server error"),}) + public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies()); + if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) { + asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer()); + } else { + asyncResponse.resume(Response.noContent().build()); + } + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer") + @ApiOperation(value = "Set max unacked messages per consumer config on a topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),}) + public void setMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Max unacked messages on consumer policies for the specified topic") + Integer maxUnackedNum) { + validateTopicName(tenant, namespace, encodedTopic); + validateAdminAccessForTenant(tenant); + validatePoliciesReadOnlyAccess(); + checkTopicLevelPolicyEnable(); + if (topicName.isGlobal()) { + validateGlobalNamespaceOwnership(namespaceName); + } + internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex) -> { + if (ex instanceof RestException) { + log.error("Failed set MaxUnackedMessagesOnConsumer", ex); + asyncResponse.resume(ex); + } else if (ex != null) { + log.error("Failed set MaxUnackedMessagesOnConsumer", ex); + asyncResponse.resume(new RestException(ex)); + } else { + asyncResponse.resume(Response.noContent().build()); + } + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer") + @ApiOperation(value = "Delete max unacked messages per consumer config on a topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),}) + public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null); + } + @GET @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription") @ApiOperation(value = "Get max unacked messages per subscription config on a topic.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index a31071656356f..daf08e15aacc2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -598,7 +598,7 @@ public CompletableFuture subscribe(final ServerCnx cnx, String subscri : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, startMessageRollbackDurationSec); int maxUnackedMessages = isDurable - ? maxUnackedMessagesOnConsumer + ? getMaxUnackedMessagesOnConsumer() : 0; subscriptionFuture.thenAccept(subscription -> { @@ -2317,6 +2317,14 @@ public long getDelayedDeliveryTickTimeMillis() { return delayedDeliveryTickTimeMillis; } + public int getMaxUnackedMessagesOnConsumer() { + TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic)); + if (topicPolicies != null && topicPolicies.isMaxUnackedMessagesOnConsumerSet()) { + return topicPolicies.getMaxUnackedMessagesOnConsumer(); + } + return maxUnackedMessagesOnConsumer; + } + public boolean isDelayedDeliveryEnabled() { TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic)); //Topic level setting has higher priority than namespace level diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java index c30e03abdf7e5..93fd08c10e8a4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MaxUnackedMessagesTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -25,8 +26,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.collect.Maps; +import lombok.Cleanup; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Message; @@ -34,10 +37,11 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import org.testng.collections.Lists; @@ -46,12 +50,12 @@ import static org.testng.Assert.fail; public class MaxUnackedMessagesTest extends ProducerConsumerBase { - private final String testTenant = "public"; - private final String testNamespace = "default"; + private final String testTenant = "my-property"; + private final String testNamespace = "my-ns"; private final String myNamespace = testTenant + "/" + testNamespace; private final String testTopic = "persistent://" + myNamespace + "/max-unacked-"; - @BeforeMethod + @BeforeClass @Override protected void setup() throws Exception { this.conf.setSystemTopicEnabled(true); @@ -60,7 +64,7 @@ protected void setup() throws Exception { super.producerBaseSetup(); } - @AfterMethod + @AfterClass @Override protected void cleanup() throws Exception { super.internalCleanup(); @@ -188,6 +192,117 @@ public void testMaxUnackedMessagesOnSubscription() throws Exception { }); } + @Test(timeOut = 20000) + public void testMaxUnackedMessagesOnConsumerApi() throws Exception { + final String topicName = testTopic + UUID.randomUUID().toString(); + admin.topics().createPartitionedTopic(topicName, 3); + waitCacheInit(topicName); + Integer max = admin.topics().getMaxUnackedMessagesOnConsumer(topicName); + assertNull(max); + + admin.topics().setMaxUnackedMessagesOnConsumer(topicName, 2048); + for (int i = 0; i < 50; i++) { + if (admin.topics().getMaxUnackedMessagesOnConsumer(topicName) != null) { + break; + } + Thread.sleep(100); + } + assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topicName).intValue(), 2048); + admin.topics().removeMaxUnackedMessagesOnConsumer(topicName); + for (int i = 0; i < 50; i++) { + if (admin.topics().getMaxUnackedMessagesOnConsumer(topicName) == null) { + break; + } + Thread.sleep(100); + } + assertNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName)); + } + + @Test(timeOut = 30000) + public void testMaxUnackedMessagesOnConsumer() throws Exception { + final String topicName = testTopic + System.currentTimeMillis(); + final String subscriberName = "test-sub" + System.currentTimeMillis(); + final int unackMsgAllowed = 100; + final int receiverQueueSize = 10; + final int totalProducedMsgs = 300; + + ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(Schema.STRING).topic(topicName) + .subscriptionName(subscriberName).receiverQueueSize(receiverQueueSize) + .ackTimeout(1, TimeUnit.MINUTES) + .subscriptionType(SubscriptionType.Shared); + @Cleanup + Consumer consumer1 = consumerBuilder.subscribe(); + // 1) Produced Messages + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + for (int i = 0; i < totalProducedMsgs; i++) { + String message = "my-message-" + i; + producer.send(message); + } + // 2) Unlimited, so all messages can be consumed + int count = 0; + List> list = new ArrayList<>(totalProducedMsgs); + for (int i = 0; i < totalProducedMsgs; i++) { + Message message = consumer1.receive(1, TimeUnit.SECONDS); + if (message == null) { + break; + } + count++; + list.add(message); + } + assertEquals(count, totalProducedMsgs); + list.forEach(message -> { + try { + consumer1.acknowledge(message); + } catch (PulsarClientException e) { + } + }); + // 3) Set restrictions, so only part of the data can be consumed + waitCacheInit(topicName); + admin.topics().setMaxUnackedMessagesOnConsumer(topicName, unackMsgAllowed); + for (int i = 0; i < 50; i++) { + if (admin.topics().getMaxUnackedMessagesOnConsumer(topicName) != null) { + break; + } + Thread.sleep(100); + } + assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topicName).intValue(), unackMsgAllowed); + // 4) Start 2 consumer, each consumer can only consume 100 messages + @Cleanup + Consumer consumer2 = consumerBuilder.subscribe(); + @Cleanup + Consumer consumer3 = consumerBuilder.subscribe(); + for (int i = 0; i < totalProducedMsgs; i++) { + String message = "my-message-" + i; + producer.send(message); + } + AtomicInteger consumer2Counter = new AtomicInteger(0); + AtomicInteger consumer3Counter = new AtomicInteger(0); + CountDownLatch countDownLatch = new CountDownLatch(2); + startConsumer(consumer2, consumer2Counter, countDownLatch); + startConsumer(consumer3, consumer3Counter, countDownLatch); + countDownLatch.await(10, TimeUnit.SECONDS); + assertEquals(consumer2Counter.get(), unackMsgAllowed); + assertEquals(consumer3Counter.get(), unackMsgAllowed); + } + + private void startConsumer(Consumer consumer, AtomicInteger consumerCounter, CountDownLatch countDownLatch) { + new Thread(() -> { + while (true) { + try { + Message message = consumer.receive(500, TimeUnit.MILLISECONDS); + if (message == null) { + countDownLatch.countDown(); + break; + } + consumerCounter.incrementAndGet(); + } catch (PulsarClientException e) { + break; + } + } + }).start(); + } + private void waitCacheInit(String topicName) throws Exception { for (int i = 0; i < 50; i++) { try { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index 025a62f48e328..18910acb15fe2 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1664,6 +1664,51 @@ CompletableFuture setDelayedDeliveryPolicyAsync(String topic */ CompletableFuture removeRetentionAsync(String topic); + /** + * get max unacked messages on consumer of a topic. + * @param topic + * @return + * @throws PulsarAdminException + */ + Integer getMaxUnackedMessagesOnConsumer(String topic) throws PulsarAdminException; + + /** + * get max unacked messages on consumer of a topic asynchronously. + * @param topic + * @return + */ + CompletableFuture getMaxUnackedMessagesOnConsumerAsync(String topic); + + /** + * set max unacked messages on consumer of a topic. + * @param topic + * @param maxNum + * @throws PulsarAdminException + */ + void setMaxUnackedMessagesOnConsumer(String topic, int maxNum) throws PulsarAdminException; + + /** + * set max unacked messages on consumer of a topic asynchronously. + * @param topic + * @param maxNum + * @return + */ + CompletableFuture setMaxUnackedMessagesOnConsumerAsync(String topic, int maxNum); + + /** + * remove max unacked messages on consumer of a topic. + * @param topic + * @throws PulsarAdminException + */ + void removeMaxUnackedMessagesOnConsumer(String topic) throws PulsarAdminException; + + /** + * remove max unacked messages on consumer of a topic asynchronously. + * @param topic + * @return + */ + CompletableFuture removeMaxUnackedMessagesOnConsumerAsync(String topic); + /** * get max unacked messages on subscription of a topic. * @param topic diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 391b0926c84f8..d85c3478b1d42 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1431,6 +1431,83 @@ public void removeBacklogQuota(String topic) throws PulsarAdminException { } } + @Override + public Integer getMaxUnackedMessagesOnConsumer(String topic) throws PulsarAdminException { + try { + return getMaxUnackedMessagesOnConsumerAsync(topic). + get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getMaxUnackedMessagesOnConsumerAsync(String topic) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "maxUnackedMessagesOnConsumer"); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, new InvocationCallback() { + @Override + public void completed(Integer maxNum) { + future.complete(maxNum); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public CompletableFuture setMaxUnackedMessagesOnConsumerAsync(String topic, int maxNum) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "maxUnackedMessagesOnConsumer"); + return asyncPostRequest(path, Entity.entity(maxNum, MediaType.APPLICATION_JSON)); + } + + @Override + public void setMaxUnackedMessagesOnConsumer(String topic, int maxNum) throws PulsarAdminException { + try { + setMaxUnackedMessagesOnConsumerAsync(topic, maxNum) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture removeMaxUnackedMessagesOnConsumerAsync(String topic) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "maxUnackedMessagesOnConsumer"); + return asyncDeleteRequest(path); + } + + @Override + public void removeMaxUnackedMessagesOnConsumer(String topic) throws PulsarAdminException { + try { + removeMaxUnackedMessagesOnConsumerAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + @Override public DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic) throws PulsarAdminException { try { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index 8be73127b64d3..7864e0e6ac28b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -45,16 +45,22 @@ public class TopicPolicies { private Integer maxProducerPerTopic = null; private Integer maxConsumerPerTopic = null; private Integer maxConsumersPerSubscription = null; + private Integer maxUnackedMessagesOnConsumer = null; + private Integer maxUnackedMessagesOnSubscription = null; private Long delayedDeliveryTickTimeMillis = null; private Boolean delayedDeliveryEnabled = null; + public boolean isMaxUnackedMessagesOnConsumerSet() { + return maxUnackedMessagesOnConsumer != null; + } + public boolean isDelayedDeliveryTickTimeMillisSet(){ return delayedDeliveryTickTimeMillis != null; } + public boolean isDelayedDeliveryEnabledSet(){ return delayedDeliveryEnabled != null; } - private Integer maxUnackedMessagesOnSubscription = null; public boolean isMaxUnackedMessagesOnSubscriptionSet() { return maxUnackedMessagesOnSubscription != null;