Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support MaxUnackedMessagesOnConsumer on topic level #7818

Merged
merged 2 commits into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,21 @@ protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(In
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum) {
TopicPolicies topicPolicies = null;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies cache have not init.", topicName);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init"));
}
if (topicPolicies == null) {
topicPolicies = new TopicPolicies();
}
topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

private void internalUnloadNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
Topic topic;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,69 @@ public void createNonPartitionedTopic(
internalCreateNonPartitionedTopic(authoritative);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
@ApiOperation(value = "Get max unacked messages per consumer config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error"),})
public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
} else {
asyncResponse.resume(Response.noContent().build());
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
@ApiOperation(value = "Set max unacked messages per consumer config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void setMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
Integer maxUnackedNum) {
validateTopicName(tenant, namespace, encodedTopic);
validateAdminAccessForTenant(tenant);
validatePoliciesReadOnlyAccess();
checkTopicLevelPolicyEnable();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(Response.noContent().build());
}
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
@ApiOperation(value = "Delete max unacked messages per consumer config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null);
315157973 marked this conversation as resolved.
Show resolved Hide resolved
}

@GET
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
@ApiOperation(value = "Get max unacked messages per subscription config on a topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, startMessageRollbackDurationSec);

int maxUnackedMessages = isDurable
? maxUnackedMessagesOnConsumer
? getMaxUnackedMessagesOnConsumer()
: 0;

subscriptionFuture.thenAccept(subscription -> {
Expand Down Expand Up @@ -2317,6 +2317,14 @@ public long getDelayedDeliveryTickTimeMillis() {
return delayedDeliveryTickTimeMillis;
}

public int getMaxUnackedMessagesOnConsumer() {
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null && topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
return topicPolicies.getMaxUnackedMessagesOnConsumer();
}
return maxUnackedMessagesOnConsumer;
}

public boolean isDelayedDeliveryEnabled() {
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
//Topic level setting has higher priority than namespace level
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,30 @@
*/
package org.apache.pulsar.broker.admin;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.Maps;
import lombok.Cleanup;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

Expand All @@ -46,12 +50,12 @@
import static org.testng.Assert.fail;

public class MaxUnackedMessagesTest extends ProducerConsumerBase {
private final String testTenant = "public";
private final String testNamespace = "default";
private final String testTenant = "my-property";
private final String testNamespace = "my-ns";
private final String myNamespace = testTenant + "/" + testNamespace;
private final String testTopic = "persistent://" + myNamespace + "/max-unacked-";

@BeforeMethod
@BeforeClass
@Override
protected void setup() throws Exception {
this.conf.setSystemTopicEnabled(true);
Expand All @@ -60,7 +64,7 @@ protected void setup() throws Exception {
super.producerBaseSetup();
}

@AfterMethod
@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
Expand Down Expand Up @@ -188,6 +192,117 @@ public void testMaxUnackedMessagesOnSubscription() throws Exception {
});
}

@Test(timeOut = 20000)
public void testMaxUnackedMessagesOnConsumerApi() throws Exception {
final String topicName = testTopic + UUID.randomUUID().toString();
admin.topics().createPartitionedTopic(topicName, 3);
waitCacheInit(topicName);
Integer max = admin.topics().getMaxUnackedMessagesOnConsumer(topicName);
assertNull(max);

admin.topics().setMaxUnackedMessagesOnConsumer(topicName, 2048);
for (int i = 0; i < 50; i++) {
if (admin.topics().getMaxUnackedMessagesOnConsumer(topicName) != null) {
break;
}
Thread.sleep(100);
}
assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topicName).intValue(), 2048);
admin.topics().removeMaxUnackedMessagesOnConsumer(topicName);
for (int i = 0; i < 50; i++) {
if (admin.topics().getMaxUnackedMessagesOnConsumer(topicName) == null) {
break;
}
Thread.sleep(100);
}
assertNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName));
}

@Test(timeOut = 30000)
public void testMaxUnackedMessagesOnConsumer() throws Exception {
final String topicName = testTopic + System.currentTimeMillis();
final String subscriberName = "test-sub" + System.currentTimeMillis();
final int unackMsgAllowed = 100;
final int receiverQueueSize = 10;
final int totalProducedMsgs = 300;

ConsumerBuilder<String> consumerBuilder = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionName(subscriberName).receiverQueueSize(receiverQueueSize)
.ackTimeout(1, TimeUnit.MINUTES)
.subscriptionType(SubscriptionType.Shared);
@Cleanup
Consumer<String> consumer1 = consumerBuilder.subscribe();
// 1) Produced Messages
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message);
}
// 2) Unlimited, so all messages can be consumed
int count = 0;
List<Message<String>> list = new ArrayList<>(totalProducedMsgs);
for (int i = 0; i < totalProducedMsgs; i++) {
Message<String> message = consumer1.receive(1, TimeUnit.SECONDS);
if (message == null) {
break;
}
count++;
list.add(message);
}
assertEquals(count, totalProducedMsgs);
list.forEach(message -> {
try {
consumer1.acknowledge(message);
} catch (PulsarClientException e) {
}
});
// 3) Set restrictions, so only part of the data can be consumed
waitCacheInit(topicName);
admin.topics().setMaxUnackedMessagesOnConsumer(topicName, unackMsgAllowed);
for (int i = 0; i < 50; i++) {
if (admin.topics().getMaxUnackedMessagesOnConsumer(topicName) != null) {
break;
}
Thread.sleep(100);
}
assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topicName).intValue(), unackMsgAllowed);
// 4) Start 2 consumer, each consumer can only consume 100 messages
@Cleanup
Consumer<String> consumer2 = consumerBuilder.subscribe();
@Cleanup
Consumer<String> consumer3 = consumerBuilder.subscribe();
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message);
}
AtomicInteger consumer2Counter = new AtomicInteger(0);
AtomicInteger consumer3Counter = new AtomicInteger(0);
CountDownLatch countDownLatch = new CountDownLatch(2);
startConsumer(consumer2, consumer2Counter, countDownLatch);
startConsumer(consumer3, consumer3Counter, countDownLatch);
countDownLatch.await(10, TimeUnit.SECONDS);
assertEquals(consumer2Counter.get(), unackMsgAllowed);
assertEquals(consumer3Counter.get(), unackMsgAllowed);
}

private void startConsumer(Consumer<String> consumer, AtomicInteger consumerCounter, CountDownLatch countDownLatch) {
new Thread(() -> {
while (true) {
try {
Message<String> message = consumer.receive(500, TimeUnit.MILLISECONDS);
if (message == null) {
countDownLatch.countDown();
break;
}
consumerCounter.incrementAndGet();
} catch (PulsarClientException e) {
break;
}
}
}).start();
}

private void waitCacheInit(String topicName) throws Exception {
for (int i = 0; i < 50; i++) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1664,6 +1664,51 @@ CompletableFuture<Void> setDelayedDeliveryPolicyAsync(String topic
*/
CompletableFuture<Void> removeRetentionAsync(String topic);

/**
* get max unacked messages on consumer of a topic.
* @param topic
* @return
* @throws PulsarAdminException
*/
Integer getMaxUnackedMessagesOnConsumer(String topic) throws PulsarAdminException;

/**
* get max unacked messages on consumer of a topic asynchronously.
* @param topic
* @return
*/
CompletableFuture<Integer> getMaxUnackedMessagesOnConsumerAsync(String topic);

/**
* set max unacked messages on consumer of a topic.
* @param topic
* @param maxNum
* @throws PulsarAdminException
*/
void setMaxUnackedMessagesOnConsumer(String topic, int maxNum) throws PulsarAdminException;

/**
* set max unacked messages on consumer of a topic asynchronously.
* @param topic
* @param maxNum
* @return
*/
CompletableFuture<Void> setMaxUnackedMessagesOnConsumerAsync(String topic, int maxNum);

/**
* remove max unacked messages on consumer of a topic.
* @param topic
* @throws PulsarAdminException
*/
void removeMaxUnackedMessagesOnConsumer(String topic) throws PulsarAdminException;

/**
* remove max unacked messages on consumer of a topic asynchronously.
* @param topic
* @return
*/
CompletableFuture<Void> removeMaxUnackedMessagesOnConsumerAsync(String topic);

/**
* get max unacked messages on subscription of a topic.
* @param topic
Expand Down
Loading