Skip to content

Commit

Permalink
Reduce redundant FLOW requests for non-durable multi-topics consumer (#…
Browse files Browse the repository at this point in the history
…11802)

### Motivation

#3960 fixed the bug that reader will get stuck if it's reading a partition of a partitioned topic. The fix is using `isDurable` to check whether the consumer is a reader's internal consumer because it used `partitionIndex` to check whether the target topic is a partition while reader's `partitionIndex` is already set. However, for a non-durable multi-topics consumer, `isDurable` is false and each internal consumer will send FLOW request once the connection is established, which is unnecessary because `MultiTopicsConsumerImpl#startReceivingMessages` will send FLOW requests for each internal consumer after all internal consumers are connected.

After #4591 introduced `hasParentConsumer` field, the check works for even a reader without the `isDurable` check.

### Modifications

- Remove the check for `isDurable` before sending FLOW request and update the related comment.
- Add a test for non-durable multi-topics consumer to verify the number of FLOW requests is the topics number, not the twice the topics number.

### Verifying this change

- [x] Make sure that the change passes the CI checks.

This change added `NonDurableSubscriptionTest#testFlowCountForMultiTopics` and the existing test `ReaderTest#testReadFromPartition` added in #3960 can also pass after this change.
This change added tests and can be verified as follows:

(cherry picked from commit 1303e7d)
  • Loading branch information
BewareMyPower authored and codelipenghui committed Sep 9, 2021
1 parent 7c6828a commit 0669247
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,25 @@
import java.lang.reflect.Field;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.PulsarChannelInitializer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNull;
Expand All @@ -42,6 +49,8 @@
@Slf4j
public class NonDurableSubscriptionTest extends ProducerConsumerBase {

private final AtomicInteger numFlow = new AtomicInteger(0);

@BeforeMethod
@Override
protected void setup() throws Exception {
Expand All @@ -56,6 +65,34 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Override
protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception {
return new PulsarService(conf) {

@Override
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
BrokerService broker = new BrokerService(this, ioEventLoopGroup);
broker.setPulsarChannelInitializerFactory(
(_pulsar, tls) -> {
return new PulsarChannelInitializer(_pulsar, tls) {
@Override
protected ServerCnx newServerCnx(PulsarService pulsar) throws Exception {
return new ServerCnx(pulsar) {

@Override
protected void handleFlow(CommandFlow flow) {
super.handleFlow(flow);
numFlow.incrementAndGet();
}
};
}
};
});
return broker;
}
};
}

@Test
public void testNonDurableSubscription() throws Exception {
String topicName = "persistent://my-property/my-ns/nonDurable-topic1";
Expand Down Expand Up @@ -188,4 +225,22 @@ public void testNonDurableSubscriptionRecovery(SubscriptionType subscriptionType
}

}

@Test
public void testFlowCountForMultiTopics() throws Exception {
String topicName = "persistent://my-property/my-ns/test-flow-count";
int numPartitions = 5;
admin.topics().createPartitionedTopic(topicName, numPartitions);
numFlow.set(0);

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("my-nonDurable-subscriber")
.subscriptionMode(SubscriptionMode.NonDurable)
.subscribe();
consumer.receive(1, TimeUnit.SECONDS);
consumer.close();

assertEquals(numFlow.get(), numPartitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -755,9 +755,7 @@ public void connectionOpened(final ClientCnx cnx) {
boolean firstTimeConnect = subscribeFuture.complete(this);
// if the consumer is not partitioned or is re-connected and is partitioned, we send the flow
// command to receive messages.
// For readers too (isDurable==false), the partition idx will be set though we have to
// send available permits immediately after establishing the reader session
if (!(firstTimeConnect && hasParentConsumer && isDurable) && conf.getReceiverQueueSize() != 0) {
if (!(firstTimeConnect && hasParentConsumer) && conf.getReceiverQueueSize() != 0) {
increaseAvailablePermits(cnx, conf.getReceiverQueueSize());
}
}).exceptionally((e) -> {
Expand Down

0 comments on commit 0669247

Please sign in to comment.