From a3d78812befd04f5e7e156158d83c10a966a7c1c Mon Sep 17 00:00:00 2001 From: penghui Date: Sun, 30 Jan 2022 22:44:56 +0800 Subject: [PATCH 1/8] Fix unack message count for transaction Ack while disabled batch index ack. ### Motivation Fix unack message count for transaction Ack while disabled batch index ack. Transaction Ack is different with normal message ack for a batch message. For normal message, we are using a bitset to carry the batch index state, for example ``` 1. Ack with `00111111` means acks batch index 0 and 1 2. For ack batch index 2 and 3, the client will send `00001111` to broker 3. After all the batch been acked, send `00000000` to broker ``` The following is for transaction ack: ``` 1. `00111111` means acks batch index 0 and 1 1. `11001111` means acks batch index 2 and 3 ``` ### Verification Enabled transaction e2e test for batch index ack disabled --- .../pulsar/broker/service/Consumer.java | 47 +++++----- .../broker/service/BatchMessageTest.java | 86 +++++++++++++++++++ .../BatchMessageWithBatchIndexLevelTest.java | 70 +-------------- .../transaction/TransactionTestBase.java | 3 +- .../client/impl/TransactionEndToEndTest.java | 9 +- ...ctionEndToEndWithoutBatchIndexAckTest.java | 39 +++++++++ 6 files changed, 160 insertions(+), 94 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 4805eb5404957..9340f8ce606ad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -404,7 +404,7 @@ private CompletableFuture individualAckNormal(CommandAck ack, Map individualAckNormal(CommandAck ack, Map individualAckWithTransaction(CommandAck ack) { ackSets[j] = msgId.getAckSetAt(j); } position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets); - ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets); + ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets, true); } else { position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); - ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position); + ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, true); } if (msgId.hasBatchIndex()) { @@ -518,32 +518,39 @@ private long getBatchSize(MessageIdData msgId) { return batchSize; } - private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position) { + private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position, boolean isTransactionAck) { if (Subscription.isIndividualAckMode(subType) && isAcknowledgmentAtBatchIndexLevelEnabled) { long[] cursorAckSet = getCursorAckSet(position); if (cursorAckSet != null) { - return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET); + return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET, isTransactionAck); } } return batchSize; } - private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets) { + private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets, + boolean isTransactionAck) { long ackedCount = 0; - if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) { - long[] cursorAckSet = getCursorAckSet(position); - if (cursorAckSet != null) { - BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet); - int lastCardinality = cursorBitSet.cardinality(); - BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(ackSets); - cursorBitSet.and(givenBitSet); - givenBitSet.recycle(); - int currentCardinality = cursorBitSet.cardinality(); - ackedCount = lastCardinality - currentCardinality; - cursorBitSet.recycle(); - } else if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) != null) { - ackedCount = batchSize - BitSet.valueOf(ackSets).cardinality(); + if (isAcknowledgmentAtBatchIndexLevelEnabled) { + if(Subscription.isIndividualAckMode(subType)) { + long[] cursorAckSet = getCursorAckSet(position); + if (cursorAckSet != null) { + BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet); + int lastCardinality = cursorBitSet.cardinality(); + BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(ackSets); + cursorBitSet.and(givenBitSet); + givenBitSet.recycle(); + int currentCardinality = cursorBitSet.cardinality(); + ackedCount = lastCardinality - currentCardinality; + cursorBitSet.recycle(); + } else if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) != null) { + ackedCount = batchSize - BitSet.valueOf(ackSets).cardinality(); + } } + } else if (isTransactionAck) { + BitSetRecyclable bitset = BitSetRecyclable.create().resetWords(ackSets); + ackedCount = batchSize - bitset.cardinality(); + bitset.recycle(); } return ackedCount; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 6d8b31c4586cb..3a1b229c70bbc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -39,7 +39,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Cleanup; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.CompressionType; @@ -49,9 +51,13 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -95,6 +101,15 @@ public Object[][] containerBuilderProvider() { }; } + @DataProvider(name = "testSubTypeAndEnableBatch") + public Object[][] testSubTypeAndEnableBatch() { + return new Object[][] { { SubscriptionType.Shared, Boolean.TRUE }, + { SubscriptionType.Failover, Boolean.TRUE }, + { SubscriptionType.Shared, Boolean.FALSE }, + { SubscriptionType.Failover, Boolean.FALSE } + }; + } + @Test(dataProvider = "codecAndContainerBuilder") public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressionType, BatcherBuilder builder) throws Exception { int numMsgs = 50; @@ -919,5 +934,76 @@ public void testBatchMessageDispatchingAccordingToPermits() throws Exception { consumer1.close(); } + @Test(dataProvider="testSubTypeAndEnableBatch") + private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subType, + boolean enableBatch) throws Exception { + final int messageCount = 50; + final String topicName = "persistent://prop/ns-abc/testDecreaseWithAckReceipt" + UUID.randomUUID(); + final String subscriptionName = "sub-batch-1"; + @Cleanup + ConsumerImpl consumer = (ConsumerImpl) pulsarClient + .newConsumer(Schema.BYTES) + .topic(topicName) + .isAckReceiptEnabled(true) + .subscriptionName(subscriptionName) + .subscriptionType(subType) + .enableBatchIndexAcknowledgment(true) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient + .newProducer() + .enableBatching(enableBatch) + .topic(topicName) + .batchingMaxMessages(10) + .create(); + + CountDownLatch countDownLatch = new CountDownLatch(messageCount); + for (int i = 0; i < messageCount; i++) { + producer.sendAsync((i + "").getBytes()).thenRun(countDownLatch::countDown); + } + + countDownLatch.await(); + + MessageId lastAckedMessageId = null; + for (int i = 0; i < messageCount; i++) { + Message message = consumer.receive(); + // wait for receipt + if (i < messageCount / 2) { + lastAckedMessageId = message.getMessageId(); + consumer.acknowledgeAsync(lastAckedMessageId).get(); + } + } + + String topic = TopicName.get(topicName).toString(); + PersistentSubscription persistentSubscription = (PersistentSubscription) pulsar.getBrokerService() + .getTopic(topic, false).get().get().getSubscription(subscriptionName); + + MessageId finalLastAckedMessageId = lastAckedMessageId; + Awaitility.await().untilAsserted(() -> { + if (subType == SubscriptionType.Shared) { + if (conf.isAcknowledgmentAtBatchIndexLevelEnabled()) { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2); + } else { + if (finalLastAckedMessageId instanceof BatchMessageIdImpl) { + BatchMessageIdImpl batchMessageId = ((BatchMessageIdImpl) finalLastAckedMessageId); + if (batchMessageId.getBatchSize() + 1 == batchMessageId.getBatchIndex()) { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), + messageCount / 2); + } else { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), + messageCount / 2 + batchMessageId.getBatchIndex() + 1); + } + } else { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), + messageCount / 2); + } + } + } else { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0); + } + }); + } + private static final Logger LOG = LoggerFactory.getLogger(BatchMessageTest.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java index 5e09def401663..c3785c7d3cb40 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java @@ -19,28 +19,24 @@ package org.apache.pulsar.broker.service; import com.google.common.collect.Lists; -import lombok.Cleanup; import lombok.SneakyThrows; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; -import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerImpl; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.testng.annotations.BeforeClass; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; + import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; + import static org.testng.Assert.assertEquals; @Test(groups = "broker") @@ -111,66 +107,4 @@ public void testBatchMessageAck() { assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16); }); } - - @DataProvider(name = "testSubTypeAndEnableBatch") - public Object[][] testSubTypeAndEnableBatch() { - return new Object[][] { { SubscriptionType.Shared, Boolean.TRUE }, - { SubscriptionType.Failover, Boolean.TRUE }, - { SubscriptionType.Shared, Boolean.FALSE }, - { SubscriptionType.Failover, Boolean.FALSE }}; - } - - - @Test(dataProvider="testSubTypeAndEnableBatch") - private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subType, - boolean enableBatch) throws Exception { - - final int messageCount = 50; - final String topicName = "persistent://prop/ns-abc/testDecreaseWithAckReceipt" + UUID.randomUUID(); - final String subscriptionName = "sub-batch-1"; - @Cleanup - ConsumerImpl consumer = (ConsumerImpl) pulsarClient - .newConsumer(Schema.BYTES) - .topic(topicName) - .isAckReceiptEnabled(true) - .subscriptionName(subscriptionName) - .subscriptionType(subType) - .enableBatchIndexAcknowledgment(true) - .subscribe(); - - @Cleanup - Producer producer = pulsarClient - .newProducer() - .enableBatching(enableBatch) - .topic(topicName) - .batchingMaxMessages(10) - .create(); - - CountDownLatch countDownLatch = new CountDownLatch(messageCount); - for (int i = 0; i < messageCount; i++) { - producer.sendAsync((i + "").getBytes()).thenRun(countDownLatch::countDown); - } - - countDownLatch.await(); - - for (int i = 0; i < messageCount; i++) { - Message message = consumer.receive(); - // wait for receipt - if (i < messageCount / 2) { - consumer.acknowledgeAsync(message.getMessageId()).get(); - } - } - - String topic = TopicName.get(topicName).toString(); - PersistentSubscription persistentSubscription = (PersistentSubscription) pulsar.getBrokerService() - .getTopic(topic, false).get().get().getSubscription(subscriptionName); - - Awaitility.await().untilAsserted(() -> { - if (subType == SubscriptionType.Shared) { - assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2); - } else { - assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0); - } - }); - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index ab248a2ba017e..de1ffdb593763 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -84,6 +84,7 @@ public abstract class TransactionTestBase extends TestRetrySupport { public static final String TENANT = "tnx"; protected static final String NAMESPACE1 = TENANT + "/ns1"; + protected ServiceConfiguration conf = new ServiceConfiguration(); public void internalSetup() throws Exception { incrementSetupNumber(); @@ -145,7 +146,6 @@ protected void setUpBase(int numBroker,int numPartitionsOfTC, String topic, int protected void startBroker() throws Exception { for (int i = 0; i < brokerCount; i++) { - ServiceConfiguration conf = new ServiceConfiguration(); conf.setClusterName(CLUSTER_NAME); conf.setAdvertisedAddress("localhost"); conf.setManagedLedgerCacheSizeMB(8); @@ -155,7 +155,6 @@ protected void startBroker() throws Exception { conf.setConfigurationStoreServers("localhost:3181"); conf.setAllowAutoTopicCreationType("non-partitioned"); conf.setBookkeeperClientExposeStatsToPrometheus(true); - conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); conf.setBrokerShutdownTimeoutMs(0L); conf.setBrokerServicePort(Optional.of(0)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 343791b7ae57f..ac734a43e86f0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -83,12 +83,13 @@ @Test(groups = "flaky") public class TransactionEndToEndTest extends TransactionTestBase { - private static final int TOPIC_PARTITION = 3; - private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output"; - private static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test"; - private static final int NUM_PARTITIONS = 16; + protected static final int TOPIC_PARTITION = 3; + protected static final String TOPIC_OUTPUT = NAMESPACE1 + "/output"; + protected static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test"; + protected static final int NUM_PARTITIONS = 16; @BeforeMethod protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION); admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java new file mode 100644 index 0000000000000..5d52db1404d3d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import lombok.extern.slf4j.Slf4j; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * End to end transaction test. + */ +@Slf4j +@Test(groups = "flaky") +public class TransactionEndToEndWithoutBatchIndexAckTest extends TransactionEndToEndTest { + + @BeforeMethod + protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(false); + setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION); + admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1); + } + +} From c8c267c542a151a92909183245c173f7c9325ff8 Mon Sep 17 00:00:00 2001 From: penghui Date: Mon, 31 Jan 2022 00:26:29 +0800 Subject: [PATCH 2/8] Fix checkstyle --- .../main/java/org/apache/pulsar/broker/service/Consumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 9340f8ce606ad..2d18c2d8aa706 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -532,7 +532,7 @@ private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long boolean isTransactionAck) { long ackedCount = 0; if (isAcknowledgmentAtBatchIndexLevelEnabled) { - if(Subscription.isIndividualAckMode(subType)) { + if (Subscription.isIndividualAckMode(subType)) { long[] cursorAckSet = getCursorAckSet(position); if (cursorAckSet != null) { BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet); From add5041bb76e38c50353ea5d764242eae7391212 Mon Sep 17 00:00:00 2001 From: penghui Date: Tue, 8 Feb 2022 12:36:29 +0800 Subject: [PATCH 3/8] Apply comment --- .../pulsar/broker/service/Consumer.java | 56 +++++++++---------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 2d18c2d8aa706..0303db40ea3c5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -404,7 +404,7 @@ private CompletableFuture individualAckNormal(CommandAck ack, Map individualAckNormal(CommandAck ack, Map individualAckWithTransaction(CommandAck ack) { ackSets[j] = msgId.getAckSetAt(j); } position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets); - ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets, true); + ackedCount = getAckedCountForTransactionAck(batchSize, ackSets); } else { position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); - ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, true); + ackedCount = batchSize; } if (msgId.hasBatchIndex()) { @@ -518,43 +518,43 @@ private long getBatchSize(MessageIdData msgId) { return batchSize; } - private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position, boolean isTransactionAck) { - if (Subscription.isIndividualAckMode(subType) && isAcknowledgmentAtBatchIndexLevelEnabled) { + private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position) { + if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) { long[] cursorAckSet = getCursorAckSet(position); if (cursorAckSet != null) { - return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET, isTransactionAck); + return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET); } } return batchSize; } - private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets, - boolean isTransactionAck) { + private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets) { long ackedCount = 0; - if (isAcknowledgmentAtBatchIndexLevelEnabled) { - if (Subscription.isIndividualAckMode(subType)) { - long[] cursorAckSet = getCursorAckSet(position); - if (cursorAckSet != null) { - BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet); - int lastCardinality = cursorBitSet.cardinality(); - BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(ackSets); - cursorBitSet.and(givenBitSet); - givenBitSet.recycle(); - int currentCardinality = cursorBitSet.cardinality(); - ackedCount = lastCardinality - currentCardinality; - cursorBitSet.recycle(); - } else if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) != null) { - ackedCount = batchSize - BitSet.valueOf(ackSets).cardinality(); - } + if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) { + long[] cursorAckSet = getCursorAckSet(position); + if (cursorAckSet != null) { + BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet); + int lastCardinality = cursorBitSet.cardinality(); + BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(ackSets); + cursorBitSet.and(givenBitSet); + givenBitSet.recycle(); + int currentCardinality = cursorBitSet.cardinality(); + ackedCount = lastCardinality - currentCardinality; + cursorBitSet.recycle(); + } else if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) != null) { + ackedCount = batchSize - BitSet.valueOf(ackSets).cardinality(); } - } else if (isTransactionAck) { - BitSetRecyclable bitset = BitSetRecyclable.create().resetWords(ackSets); - ackedCount = batchSize - bitset.cardinality(); - bitset.recycle(); } return ackedCount; } + private long getAckedCountForTransactionAck(long batchSize, long[] ackSets) { + BitSetRecyclable bitset = BitSetRecyclable.create().resetWords(ackSets); + long ackedCount = batchSize - bitset.cardinality(); + bitset.recycle(); + return ackedCount; + } + private long getUnAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize) { long unAckedCount = batchSize; if (isAcknowledgmentAtBatchIndexLevelEnabled) { From db2731f3934f4ab700894a5285e8050db3d577cd Mon Sep 17 00:00:00 2001 From: penghui Date: Tue, 8 Feb 2022 12:36:41 +0800 Subject: [PATCH 4/8] Apply comment --- .../broker/service/BatchMessageTest.java | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 3a1b229c70bbc..e35d2a0fd6ff7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -69,6 +69,8 @@ @Test(groups = "broker") public class BatchMessageTest extends BrokerTestBase { + private static final Logger log = LoggerFactory.getLogger(BatchMessageTest.class); + @BeforeClass @Override protected void setup() throws Exception { @@ -955,23 +957,36 @@ private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subTyp .newProducer() .enableBatching(enableBatch) .topic(topicName) - .batchingMaxMessages(10) + .batchingMaxPublishDelay(Integer.MAX_VALUE, TimeUnit.MILLISECONDS) .create(); CountDownLatch countDownLatch = new CountDownLatch(messageCount); for (int i = 0; i < messageCount; i++) { - producer.sendAsync((i + "").getBytes()).thenRun(countDownLatch::countDown); + producer.sendAsync((i + "").getBytes()).thenAccept(msgId -> { + log.info("Published message with msgId: {}", msgId); + countDownLatch.countDown(); + }); + // To generate batch message with different batch size + // 31 total batches, 5 batches with 3 messages, 8 batches with 2 messages and 37 batches with 1 message + if (((i / 3) % (i % 3 + 1)) == 0) { + producer.flush(); + } } countDownLatch.await(); - MessageId lastAckedMessageId = null; for (int i = 0; i < messageCount; i++) { Message message = consumer.receive(); - // wait for receipt - if (i < messageCount / 2) { - lastAckedMessageId = message.getMessageId(); - consumer.acknowledgeAsync(lastAckedMessageId).get(); + if (enableBatch) { + // only ack messages which batch index < 2, which means we will not to ack the + // whole batch for the batch that with more than 2 messages + if (((BatchMessageIdImpl) message.getMessageId()).getBatchIndex() < 2) { + consumer.acknowledgeAsync(message).get(); + } + } else { + if (i % 2 == 0) { + consumer.acknowledgeAsync(message).get(); + } } } @@ -979,25 +994,16 @@ private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subTyp PersistentSubscription persistentSubscription = (PersistentSubscription) pulsar.getBrokerService() .getTopic(topic, false).get().get().getSubscription(subscriptionName); - MessageId finalLastAckedMessageId = lastAckedMessageId; Awaitility.await().untilAsserted(() -> { if (subType == SubscriptionType.Shared) { - if (conf.isAcknowledgmentAtBatchIndexLevelEnabled()) { - assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2); - } else { - if (finalLastAckedMessageId instanceof BatchMessageIdImpl) { - BatchMessageIdImpl batchMessageId = ((BatchMessageIdImpl) finalLastAckedMessageId); - if (batchMessageId.getBatchSize() + 1 == batchMessageId.getBatchIndex()) { - assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), - messageCount / 2); - } else { - assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), - messageCount / 2 + batchMessageId.getBatchIndex() + 1); - } + if (enableBatch) { + if (conf.isAcknowledgmentAtBatchIndexLevelEnabled()) { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 5 * 1); } else { - assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), - messageCount / 2); + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 5 * 3); } + } else { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2); } } else { assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0); From e39cdc59d5908b893b308cd67b59abca8d52df58 Mon Sep 17 00:00:00 2001 From: penghui Date: Tue, 8 Feb 2022 21:01:20 +0800 Subject: [PATCH 5/8] Fix test. --- .../org/apache/pulsar/broker/service/Consumer.java | 6 +----- .../pendingack/impl/PendingAckHandleImpl.java | 11 ++++++++--- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 0303db40ea3c5..3bcb4920b5f8a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -471,11 +471,7 @@ private CompletableFuture individualAckWithTransaction(CommandAck ack) { ackedCount = batchSize; } - if (msgId.hasBatchIndex()) { - positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize())); - } else { - positionsAcked.add(new MutablePair<>(position, 0)); - } + positionsAcked.add(new MutablePair<>(position, (int) batchSize)); addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 1c0b10fbd57f6..6fafc686169d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -674,13 +674,18 @@ private void individualAckAbortCommon(TxnID txnID, HashMap Date: Wed, 9 Feb 2022 00:19:24 +0800 Subject: [PATCH 6/8] Fix test. --- .../transaction/pendingack/PendingAckInMemoryDeleteTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java index bc22473e285f0..da2a3a940bd2c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java @@ -62,6 +62,7 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase { private static final int NUM_PARTITIONS = 16; @BeforeMethod protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); setUpBase(1, NUM_PARTITIONS, NAMESPACE1 +"/test", 0); } From 6346dcbaec05e21800f42da7f3bda4813c84a294 Mon Sep 17 00:00:00 2001 From: penghui Date: Wed, 9 Feb 2022 20:09:21 +0800 Subject: [PATCH 7/8] Fix test. --- .../impl/TransactionEndToEndWithoutBatchIndexAckTest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java index 5d52db1404d3d..1ef3998c3467d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.SubscriptionType; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -36,4 +37,10 @@ protected void setup() throws Exception { admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1); } + // TODO need to fix which using transaction with individual ack for failover subscription + @Test + public void txnIndividualAckTestBatchAndFailoverSub() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); + txnAckTest(true, 200, SubscriptionType.Failover); + } } From 65f26b4db329017a9a2c43a359fe619c10585233 Mon Sep 17 00:00:00 2001 From: penghui Date: Wed, 9 Feb 2022 20:09:39 +0800 Subject: [PATCH 8/8] Fix test. --- .../org/apache/pulsar/client/impl/TransactionEndToEndTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index ac734a43e86f0..bfa19b2496abb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -324,7 +324,7 @@ public void txnIndividualAckTestBatchAndFailoverSub() throws Exception { txnAckTest(true, 200, SubscriptionType.Failover); } - private void txnAckTest(boolean batchEnable, int maxBatchSize, + protected void txnAckTest(boolean batchEnable, int maxBatchSize, SubscriptionType subscriptionType) throws Exception { String normalTopic = NAMESPACE1 + "/normal-topic";