Skip to content

Commit

Permalink
Support MaxUnackedMessagesOnConsumer on topic level (#7818)
Browse files Browse the repository at this point in the history
### Motivation
support set MaxUnackedMessagesOnConsumer on topic level

### Modifications
Support set/get/remove MaxUnackedMessagesOnConsumer policy on topic level.

### Verifying this change
Added Unit test to verify set/get/remove MaxUnackedMessagesOnConsumer policy at Topic level work as expected when Topic level policy is enabled/disabled

- org.apache.pulsar.broker.admin.MaxUnackedMessagesTest#testMaxUnackedMessagesOnConsumerApi
- org.apache.pulsar.broker.admin.MaxUnackedMessagesTest#testMaxUnackedMessagesOnConsumer
  • Loading branch information
315157973 authored Aug 17, 2020
1 parent e1b76a3 commit 3928005
Show file tree
Hide file tree
Showing 7 changed files with 337 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,21 @@ protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(In
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum) {
TopicPolicies topicPolicies = null;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies cache have not init.", topicName);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init"));
}
if (topicPolicies == null) {
topicPolicies = new TopicPolicies();
}
topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

private void internalUnloadNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
Topic topic;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,69 @@ public void createNonPartitionedTopic(
internalCreateNonPartitionedTopic(authoritative);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
@ApiOperation(value = "Get max unacked messages per consumer config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error"),})
public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
} else {
asyncResponse.resume(Response.noContent().build());
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
@ApiOperation(value = "Set max unacked messages per consumer config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void setMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
Integer maxUnackedNum) {
validateTopicName(tenant, namespace, encodedTopic);
validateAdminAccessForTenant(tenant);
validatePoliciesReadOnlyAccess();
checkTopicLevelPolicyEnable();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(Response.noContent().build());
}
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
@ApiOperation(value = "Delete max unacked messages per consumer config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
@ApiOperation(value = "Get max unacked messages per subscription config on a topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, startMessageRollbackDurationSec);

int maxUnackedMessages = isDurable
? maxUnackedMessagesOnConsumer
? getMaxUnackedMessagesOnConsumer()
: 0;

subscriptionFuture.thenAccept(subscription -> {
Expand Down Expand Up @@ -2317,6 +2317,14 @@ public long getDelayedDeliveryTickTimeMillis() {
return delayedDeliveryTickTimeMillis;
}

public int getMaxUnackedMessagesOnConsumer() {
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null && topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
return topicPolicies.getMaxUnackedMessagesOnConsumer();
}
return maxUnackedMessagesOnConsumer;
}

public boolean isDelayedDeliveryEnabled() {
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
//Topic level setting has higher priority than namespace level
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,30 @@
*/
package org.apache.pulsar.broker.admin;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.Maps;
import lombok.Cleanup;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

Expand All @@ -46,12 +50,12 @@
import static org.testng.Assert.fail;

public class MaxUnackedMessagesTest extends ProducerConsumerBase {
private final String testTenant = "public";
private final String testNamespace = "default";
private final String testTenant = "my-property";
private final String testNamespace = "my-ns";
private final String myNamespace = testTenant + "/" + testNamespace;
private final String testTopic = "persistent://" + myNamespace + "/max-unacked-";

@BeforeMethod
@BeforeClass
@Override
protected void setup() throws Exception {
this.conf.setSystemTopicEnabled(true);
Expand All @@ -60,7 +64,7 @@ protected void setup() throws Exception {
super.producerBaseSetup();
}

@AfterMethod
@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
Expand Down Expand Up @@ -188,6 +192,117 @@ public void testMaxUnackedMessagesOnSubscription() throws Exception {
});
}

@Test(timeOut = 20000)
public void testMaxUnackedMessagesOnConsumerApi() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
admin.topics().createPartitionedTopic(topicName, 3);
waitCacheInit(topicName);
Integer max = admin.topics().getMaxUnackedMessagesOnConsumer(topicName);
assertNull(max);

admin.topics().setMaxUnackedMessagesOnConsumer(topicName, 2048);
for (int i = 0; i < 50; i++) {
if (admin.topics().getMaxUnackedMessagesOnConsumer(topicName) != null) {
break;
}
Thread.sleep(100);
}
assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topicName).intValue(), 2048);
admin.topics().removeMaxUnackedMessagesOnConsumer(topicName);
for (int i = 0; i < 50; i++) {
if (admin.topics().getMaxUnackedMessagesOnConsumer(topicName) == null) {
break;
}
Thread.sleep(100);
}
assertNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName));
}

@Test(timeOut = 30000)
public void testMaxUnackedMessagesOnConsumer() throws Exception {
final String topicName = testTopic + System.currentTimeMillis();
final String subscriberName = "test-sub" + System.currentTimeMillis();
final int unackMsgAllowed = 100;
final int receiverQueueSize = 10;
final int totalProducedMsgs = 300;

ConsumerBuilder<String> consumerBuilder = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionName(subscriberName).receiverQueueSize(receiverQueueSize)
.ackTimeout(1, TimeUnit.MINUTES)
.subscriptionType(SubscriptionType.Shared);
@Cleanup
Consumer<String> consumer1 = consumerBuilder.subscribe();
// 1) Produced Messages
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message);
}
// 2) Unlimited, so all messages can be consumed
int count = 0;
List<Message<String>> list = new ArrayList<>(totalProducedMsgs);
for (int i = 0; i < totalProducedMsgs; i++) {
Message<String> message = consumer1.receive(1, TimeUnit.SECONDS);
if (message == null) {
break;
}
count++;
list.add(message);
}
assertEquals(count, totalProducedMsgs);
list.forEach(message -> {
try {
consumer1.acknowledge(message);
} catch (PulsarClientException e) {
}
});
// 3) Set restrictions, so only part of the data can be consumed
waitCacheInit(topicName);
admin.topics().setMaxUnackedMessagesOnConsumer(topicName, unackMsgAllowed);
for (int i = 0; i < 50; i++) {
if (admin.topics().getMaxUnackedMessagesOnConsumer(topicName) != null) {
break;
}
Thread.sleep(100);
}
assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topicName).intValue(), unackMsgAllowed);
// 4) Start 2 consumer, each consumer can only consume 100 messages
@Cleanup
Consumer<String> consumer2 = consumerBuilder.subscribe();
@Cleanup
Consumer<String> consumer3 = consumerBuilder.subscribe();
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message);
}
AtomicInteger consumer2Counter = new AtomicInteger(0);
AtomicInteger consumer3Counter = new AtomicInteger(0);
CountDownLatch countDownLatch = new CountDownLatch(2);
startConsumer(consumer2, consumer2Counter, countDownLatch);
startConsumer(consumer3, consumer3Counter, countDownLatch);
countDownLatch.await(10, TimeUnit.SECONDS);
assertEquals(consumer2Counter.get(), unackMsgAllowed);
assertEquals(consumer3Counter.get(), unackMsgAllowed);
}

private void startConsumer(Consumer<String> consumer, AtomicInteger consumerCounter, CountDownLatch countDownLatch) {
new Thread(() -> {
while (true) {
try {
Message<String> message = consumer.receive(500, TimeUnit.MILLISECONDS);
if (message == null) {
countDownLatch.countDown();
break;
}
consumerCounter.incrementAndGet();
} catch (PulsarClientException e) {
break;
}
}
}).start();
}

private void waitCacheInit(String topicName) throws Exception {
for (int i = 0; i < 50; i++) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1664,6 +1664,51 @@ CompletableFuture<Void> setDelayedDeliveryPolicyAsync(String topic
*/
CompletableFuture<Void> removeRetentionAsync(String topic);

/**
* get max unacked messages on consumer of a topic.
* @param topic
* @return
* @throws PulsarAdminException
*/
Integer getMaxUnackedMessagesOnConsumer(String topic) throws PulsarAdminException;

/**
* get max unacked messages on consumer of a topic asynchronously.
* @param topic
* @return
*/
CompletableFuture<Integer> getMaxUnackedMessagesOnConsumerAsync(String topic);

/**
* set max unacked messages on consumer of a topic.
* @param topic
* @param maxNum
* @throws PulsarAdminException
*/
void setMaxUnackedMessagesOnConsumer(String topic, int maxNum) throws PulsarAdminException;

/**
* set max unacked messages on consumer of a topic asynchronously.
* @param topic
* @param maxNum
* @return
*/
CompletableFuture<Void> setMaxUnackedMessagesOnConsumerAsync(String topic, int maxNum);

/**
* remove max unacked messages on consumer of a topic.
* @param topic
* @throws PulsarAdminException
*/
void removeMaxUnackedMessagesOnConsumer(String topic) throws PulsarAdminException;

/**
* remove max unacked messages on consumer of a topic asynchronously.
* @param topic
* @return
*/
CompletableFuture<Void> removeMaxUnackedMessagesOnConsumerAsync(String topic);

/**
* get max unacked messages on subscription of a topic.
* @param topic
Expand Down
Loading

0 comments on commit 3928005

Please sign in to comment.