Skip to content

Commit

Permalink
[fix][txn] Fix append txn message is lower than lowWaterMark decrease…
Browse files Browse the repository at this point in the history
… pendingWriteOps (#16266)
  • Loading branch information
congbobo184 committed Jun 28, 2022
1 parent b2146fd commit af7990d
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 0 deletions.
Expand Up @@ -2955,6 +2955,7 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
throwable = throwable.getCause();
if (throwable instanceof NotAllowedException) {
publishContext.completed((NotAllowedException) throwable, -1, -1);
decrementPendingWriteOpsAndCheck();
return null;
} else if (!(throwable instanceof ManagedLedgerException)) {
throwable = new ManagedLedgerException(throwable);
Expand Down
Expand Up @@ -29,6 +29,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down Expand Up @@ -60,6 +61,7 @@
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.powermock.reflect.Whitebox;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -321,12 +323,18 @@ public void testTBLowWaterMarkEndToEnd() throws Exception {
Field field = TransactionImpl.class.getDeclaredField("state");
field.setAccessible(true);
field.set(txn1, TransactionImpl.State.OPEN);

AtomicLong pendingWriteOps = Whitebox.getInternalState(getPulsarServiceList().get(0)
.getBrokerService().getTopic(TopicName.get(TOPIC).toString(),
false).get().get(), "pendingWriteOps");
try {
producer.newMessage(txn1).send();
fail();
} catch (PulsarClientException.NotAllowedException ignore) {
// no-op
}

assertEquals(pendingWriteOps.get(), 0);
}

@Test
Expand Down

0 comments on commit af7990d

Please sign in to comment.