diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ad7903ba8d882..268ad1f0b8b5a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2982,6 +2982,7 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon throwable = throwable.getCause(); if (throwable instanceof NotAllowedException) { publishContext.completed((NotAllowedException) throwable, -1, -1); + decrementPendingWriteOpsAndCheck(); return null; } else if (!(throwable instanceof ManagedLedgerException)) { throwable = new ManagedLedgerException(throwable); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java index a1ff3e7d34aec..5d91c16e76ca9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java @@ -29,6 +29,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -61,6 +62,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; +import org.powermock.reflect.Whitebox; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -322,12 +324,18 @@ public void testTBLowWaterMarkEndToEnd() throws Exception { Field field = TransactionImpl.class.getDeclaredField("state"); field.setAccessible(true); field.set(txn1, TransactionImpl.State.OPEN); + + AtomicLong pendingWriteOps = Whitebox.getInternalState(getPulsarServiceList().get(0) + .getBrokerService().getTopic(TopicName.get(TOPIC).toString(), + false).get().get(), "pendingWriteOps"); try { producer.newMessage(txn1).send(); fail(); } catch (PulsarClientException.NotAllowedException ignore) { // no-op } + + assertEquals(pendingWriteOps.get(), 0); } @Test