From 06b8a1d28fc11cf3c94bdd83388a4f1b9b4bcd0e Mon Sep 17 00:00:00 2001 From: congbobo184 Date: Tue, 28 Jun 2022 16:40:47 +0800 Subject: [PATCH] [fix][txn] Fix append txn message is lower than lowWaterMark decrease pendingWriteOps --- .../pulsar/broker/service/persistent/PersistentTopic.java | 1 + .../transaction/buffer/TransactionLowWaterMarkTest.java | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c312052f6eacc..ae44520ee31a2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2955,6 +2955,7 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon throwable = throwable.getCause(); if (throwable instanceof NotAllowedException) { publishContext.completed((NotAllowedException) throwable, -1, -1); + decrementPendingWriteOpsAndCheck(); return null; } else if (!(throwable instanceof ManagedLedgerException)) { throwable = new ManagedLedgerException(throwable); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java index 7252d21d89827..4cc29f396cfa9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java @@ -29,6 +29,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -60,6 +61,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; +import org.powermock.reflect.Whitebox; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -321,12 +323,18 @@ public void testTBLowWaterMarkEndToEnd() throws Exception { Field field = TransactionImpl.class.getDeclaredField("state"); field.setAccessible(true); field.set(txn1, TransactionImpl.State.OPEN); + + AtomicLong pendingWriteOps = Whitebox.getInternalState(getPulsarServiceList().get(0) + .getBrokerService().getTopic(TopicName.get(TOPIC).toString(), + false).get().get(), "pendingWriteOps"); try { producer.newMessage(txn1).send(); fail(); } catch (PulsarClientException.NotAllowedException ignore) { // no-op } + + assertEquals(pendingWriteOps.get(), 0); } @Test