Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unack message count for transaction Ack while disabled batch index ack #14071

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -465,17 +465,13 @@ private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) {
ackSets[j] = msgId.getAckSetAt(j);
}
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets);
ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets);
ackedCount = getAckedCountForTransactionAck(batchSize, ackSets);
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position);
ackedCount = batchSize;
}

if (msgId.hasBatchIndex()) {
positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
} else {
positionsAcked.add(new MutablePair<>(position, 0));
}
positionsAcked.add(new MutablePair<>(position, (int) batchSize));

addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);

Expand Down Expand Up @@ -519,7 +515,7 @@ private long getBatchSize(MessageIdData msgId) {
}

private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position) {
if (Subscription.isIndividualAckMode(subType) && isAcknowledgmentAtBatchIndexLevelEnabled) {
if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) {
long[] cursorAckSet = getCursorAckSet(position);
if (cursorAckSet != null) {
return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET);
Expand Down Expand Up @@ -548,6 +544,13 @@ private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long
return ackedCount;
}

private long getAckedCountForTransactionAck(long batchSize, long[] ackSets) {
BitSetRecyclable bitset = BitSetRecyclable.create().resetWords(ackSets);
long ackedCount = batchSize - bitset.cardinality();
bitset.recycle();
return ackedCount;
}

private long getUnAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize) {
long unAckedCount = batchSize;
if (isAcknowledgmentAtBatchIndexLevelEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,13 +674,18 @@ private void individualAckAbortCommon(TxnID txnID, HashMap<PositionImpl, Positio
&& individualAckPositions.containsKey(entry.getValue())) {
BitSetRecyclable thisBitSet =
BitSetRecyclable.valueOf(entry.getValue().getAckSet());
thisBitSet.flip(0, individualAckPositions.get(entry.getValue()).right);
int batchSize = individualAckPositions.get(entry.getValue()).right;
thisBitSet.flip(0, batchSize);
BitSetRecyclable otherBitSet =
BitSetRecyclable.valueOf(individualAckPositions
.get(entry.getValue()).left.getAckSet());
otherBitSet.or(thisBitSet);
individualAckPositions.get(entry.getKey())
.left.setAckSet(otherBitSet.toLongArray());
if (otherBitSet.cardinality() == batchSize) {
individualAckPositions.remove(entry.getValue());
} else {
individualAckPositions.get(entry.getKey())
.left.setAckSet(otherBitSet.toLongArray());
}
otherBitSet.recycle();
thisBitSet.recycle();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import lombok.Cleanup;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
Expand All @@ -49,9 +51,13 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -63,6 +69,8 @@
@Test(groups = "broker")
public class BatchMessageTest extends BrokerTestBase {

private static final Logger log = LoggerFactory.getLogger(BatchMessageTest.class);

@BeforeClass
@Override
protected void setup() throws Exception {
Expand Down Expand Up @@ -95,6 +103,15 @@ public Object[][] containerBuilderProvider() {
};
}

@DataProvider(name = "testSubTypeAndEnableBatch")
public Object[][] testSubTypeAndEnableBatch() {
return new Object[][] { { SubscriptionType.Shared, Boolean.TRUE },
{ SubscriptionType.Failover, Boolean.TRUE },
{ SubscriptionType.Shared, Boolean.FALSE },
{ SubscriptionType.Failover, Boolean.FALSE }
};
}

@Test(dataProvider = "codecAndContainerBuilder")
public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressionType, BatcherBuilder builder) throws Exception {
int numMsgs = 50;
Expand Down Expand Up @@ -919,5 +936,80 @@ public void testBatchMessageDispatchingAccordingToPermits() throws Exception {
consumer1.close();
}

@Test(dataProvider="testSubTypeAndEnableBatch")
private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subType,
boolean enableBatch) throws Exception {
final int messageCount = 50;
final String topicName = "persistent://prop/ns-abc/testDecreaseWithAckReceipt" + UUID.randomUUID();
final String subscriptionName = "sub-batch-1";
@Cleanup
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient
.newConsumer(Schema.BYTES)
.topic(topicName)
.isAckReceiptEnabled(true)
.subscriptionName(subscriptionName)
.subscriptionType(subType)
.enableBatchIndexAcknowledgment(true)
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.enableBatching(enableBatch)
.topic(topicName)
.batchingMaxPublishDelay(Integer.MAX_VALUE, TimeUnit.MILLISECONDS)
.create();

CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
producer.sendAsync((i + "").getBytes()).thenAccept(msgId -> {
log.info("Published message with msgId: {}", msgId);
countDownLatch.countDown();
});
// To generate batch message with different batch size
// 31 total batches, 5 batches with 3 messages, 8 batches with 2 messages and 37 batches with 1 message
if (((i / 3) % (i % 3 + 1)) == 0) {
producer.flush();
}
}

countDownLatch.await();

for (int i = 0; i < messageCount; i++) {
Message<byte[]> message = consumer.receive();
if (enableBatch) {
// only ack messages which batch index < 2, which means we will not to ack the
// whole batch for the batch that with more than 2 messages
if (((BatchMessageIdImpl) message.getMessageId()).getBatchIndex() < 2) {
consumer.acknowledgeAsync(message).get();
}
} else {
if (i % 2 == 0) {
consumer.acknowledgeAsync(message).get();
}
}
}

String topic = TopicName.get(topicName).toString();
PersistentSubscription persistentSubscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopic(topic, false).get().get().getSubscription(subscriptionName);

Awaitility.await().untilAsserted(() -> {
if (subType == SubscriptionType.Shared) {
if (enableBatch) {
if (conf.isAcknowledgmentAtBatchIndexLevelEnabled()) {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 5 * 1);
} else {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 5 * 3);
}
} else {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2);
}
} else {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0);
}
});
}

private static final Logger LOG = LoggerFactory.getLogger(BatchMessageTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,24 @@
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertEquals;

@Test(groups = "broker")
Expand Down Expand Up @@ -111,66 +107,4 @@ public void testBatchMessageAck() {
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
});
}

@DataProvider(name = "testSubTypeAndEnableBatch")
public Object[][] testSubTypeAndEnableBatch() {
return new Object[][] { { SubscriptionType.Shared, Boolean.TRUE },
{ SubscriptionType.Failover, Boolean.TRUE },
{ SubscriptionType.Shared, Boolean.FALSE },
{ SubscriptionType.Failover, Boolean.FALSE }};
}


@Test(dataProvider="testSubTypeAndEnableBatch")
private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subType,
boolean enableBatch) throws Exception {

final int messageCount = 50;
final String topicName = "persistent://prop/ns-abc/testDecreaseWithAckReceipt" + UUID.randomUUID();
final String subscriptionName = "sub-batch-1";
@Cleanup
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient
.newConsumer(Schema.BYTES)
.topic(topicName)
.isAckReceiptEnabled(true)
.subscriptionName(subscriptionName)
.subscriptionType(subType)
.enableBatchIndexAcknowledgment(true)
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.enableBatching(enableBatch)
.topic(topicName)
.batchingMaxMessages(10)
.create();

CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
producer.sendAsync((i + "").getBytes()).thenRun(countDownLatch::countDown);
}

countDownLatch.await();

for (int i = 0; i < messageCount; i++) {
Message<byte[]> message = consumer.receive();
// wait for receipt
if (i < messageCount / 2) {
consumer.acknowledgeAsync(message.getMessageId()).get();
}
}

String topic = TopicName.get(topicName).toString();
PersistentSubscription persistentSubscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopic(topic, false).get().get().getSubscription(subscriptionName);

Awaitility.await().untilAsserted(() -> {
if (subType == SubscriptionType.Shared) {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2);
} else {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public abstract class TransactionTestBase extends TestRetrySupport {

public static final String TENANT = "tnx";
protected static final String NAMESPACE1 = TENANT + "/ns1";
protected ServiceConfiguration conf = new ServiceConfiguration();

public void internalSetup() throws Exception {
incrementSetupNumber();
Expand Down Expand Up @@ -145,7 +146,6 @@ protected void setUpBase(int numBroker,int numPartitionsOfTC, String topic, int

protected void startBroker() throws Exception {
for (int i = 0; i < brokerCount; i++) {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setClusterName(CLUSTER_NAME);
conf.setAdvertisedAddress("localhost");
conf.setManagedLedgerCacheSizeMB(8);
Expand All @@ -155,7 +155,6 @@ protected void startBroker() throws Exception {
conf.setConfigurationStoreServers("localhost:3181");
conf.setAllowAutoTopicCreationType("non-partitioned");
conf.setBookkeeperClientExposeStatsToPrometheus(true);
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);

conf.setBrokerShutdownTimeoutMs(0L);
conf.setBrokerServicePort(Optional.of(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setUpBase(1, NUM_PARTITIONS, NAMESPACE1 +"/test", 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@
@Test(groups = "flaky")
public class TransactionEndToEndTest extends TransactionTestBase {

private static final int TOPIC_PARTITION = 3;
private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
private static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test";
private static final int NUM_PARTITIONS = 16;
protected static final int TOPIC_PARTITION = 3;
protected static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
protected static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test";
protected static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
}
Expand Down Expand Up @@ -323,7 +324,7 @@ public void txnIndividualAckTestBatchAndFailoverSub() throws Exception {
txnAckTest(true, 200, SubscriptionType.Failover);
}

private void txnAckTest(boolean batchEnable, int maxBatchSize,
protected void txnAckTest(boolean batchEnable, int maxBatchSize,
SubscriptionType subscriptionType) throws Exception {
String normalTopic = NAMESPACE1 + "/normal-topic";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.SubscriptionType;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/**
* End to end transaction test.
*/
@Slf4j
@Test(groups = "flaky")
gaoran10 marked this conversation as resolved.
Show resolved Hide resolved
public class TransactionEndToEndWithoutBatchIndexAckTest extends TransactionEndToEndTest {

@BeforeMethod
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(false);
setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
}

// TODO need to fix which using transaction with individual ack for failover subscription
@Test
public void txnIndividualAckTestBatchAndFailoverSub() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
txnAckTest(true, 200, SubscriptionType.Failover);
}
}