Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Client] Reduce redundant FLOW requests for non-durable multi-topics consumer #11802

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,25 @@
import java.lang.reflect.Field;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarChannelInitializer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNull;
Expand All @@ -42,6 +49,8 @@
@Slf4j
public class NonDurableSubscriptionTest extends ProducerConsumerBase {

private final AtomicInteger numFlow = new AtomicInteger(0);

@BeforeMethod
@Override
protected void setup() throws Exception {
Expand All @@ -56,6 +65,34 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Override
protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception {
return new PulsarService(conf) {

@Override
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
BrokerService broker = new BrokerService(this, ioEventLoopGroup);
broker.setPulsarChannelInitializerFactory(
(_pulsar, tls) -> {
return new PulsarChannelInitializer(_pulsar, tls) {
@Override
protected ServerCnx newServerCnx(PulsarService pulsar) throws Exception {
return new ServerCnx(pulsar) {

@Override
protected void handleFlow(CommandFlow flow) {
super.handleFlow(flow);
numFlow.incrementAndGet();
}
};
}
};
});
return broker;
}
};
}

@Test
public void testNonDurableSubscription() throws Exception {
String topicName = "persistent://my-property/my-ns/nonDurable-topic1";
Expand Down Expand Up @@ -250,4 +287,22 @@ public void testNonDurableSubscriptionRecovery(SubscriptionType subscriptionType
}

}

@Test
public void testFlowCountForMultiTopics() throws Exception {
String topicName = "persistent://my-property/my-ns/test-flow-count";
int numPartitions = 5;
admin.topics().createPartitionedTopic(topicName, numPartitions);
numFlow.set(0);

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("my-nonDurable-subscriber")
.subscriptionMode(SubscriptionMode.NonDurable)
.subscribe();
consumer.receive(1, TimeUnit.SECONDS);
consumer.close();

assertEquals(numFlow.get(), numPartitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -769,9 +769,7 @@ public void connectionOpened(final ClientCnx cnx) {
boolean firstTimeConnect = subscribeFuture.complete(this);
// if the consumer is not partitioned or is re-connected and is partitioned, we send the flow
// command to receive messages.
// For readers too (isDurable==false), the partition idx will be set though we have to
// send available permits immediately after establishing the reader session
if (!(firstTimeConnect && hasParentConsumer && isDurable) && conf.getReceiverQueueSize() != 0) {
if (!(firstTimeConnect && hasParentConsumer) && conf.getReceiverQueueSize() != 0) {
increaseAvailablePermits(cnx, conf.getReceiverQueueSize());
}
}).exceptionally((e) -> {
Expand Down