diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java index 68d1c0e124003..1b588c7d4ca32 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java @@ -65,7 +65,7 @@ public CompletableFuture newTransaction(long timeoutInMills) { tcID.getId(), localID.getAndIncrement() ); - TxnMetaImpl txn = TxnMetaImpl.create(txnID); + TxnMetaImpl txn = new TxnMetaImpl(txnID); transactions.put(txnID, txn); return CompletableFuture.completedFuture(txnID); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index ce0fa96a3ce79..77b3086d1caf7 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -106,7 +106,7 @@ public void handleMetadataEntry(Position position, TransactionMetadataEntry tran } else { List positions = new ArrayList<>(); positions.add(position); - txnMetaMap.put(txnID, MutablePair.of(TxnMetaImpl.create(txnID), positions)); + txnMetaMap.put(txnID, MutablePair.of(new TxnMetaImpl(txnID), positions)); txnIdSortedSet.add(transactionMetadataEntry.getTxnidLeastBits()); timeoutTracker.replayAddTransaction(transactionMetadataEntry.getTxnidLeastBits(), transactionMetadataEntry.getTimeoutMs()); @@ -140,7 +140,6 @@ public void handleMetadataEntry(Position position, TransactionMetadataEntry tran transactionLog.deletePosition(txnMetaMap.get(txnID).getRight()).thenAccept(v -> { TxnMeta txnMeta = txnMetaMap.remove(txnID).getLeft(); txnIdSortedSet.remove(transactionMetadataEntry.getTxnidLeastBits()); - ((TxnMetaImpl) txnMeta).recycle(); }); } else { txnMetaMap.get(txnID).getLeft() @@ -200,7 +199,7 @@ public CompletableFuture newTransaction(long timeOut) { .setLastModificationTime(currentTimeMillis); return transactionLog.append(transactionMetadataEntry) .thenCompose(position -> { - TxnMeta txn = TxnMetaImpl.create(txnID); + TxnMeta txn = new TxnMetaImpl(txnID); List positions = new ArrayList<>(); positions.add(position); Pair> pair = MutablePair.of(txn, positions); @@ -229,11 +228,13 @@ public CompletableFuture addProducedPartitionToTxn(TxnID txnID, List { try { - txnMetaListPair.getLeft().addProducedPartitions(partitions); - txnMetaMap.get(txnID).getRight().add(position); + synchronized (txnMetaListPair.getLeft()) { + txnMetaListPair.getLeft().addProducedPartitions(partitions); + txnMetaMap.get(txnID).getRight().add(position); + } return CompletableFuture.completedFuture(null); } catch (InvalidTxnStatusException e) { - txnMetaMap.get(txnID).getRight().add(position); + transactionLog.deletePosition(Collections.singletonList(position)); log.error("TxnID : " + txnMetaListPair.getLeft().id().toString() + " add produced partition error with TxnStatus : " + txnMetaListPair.getLeft().status().name(), e); @@ -262,11 +263,13 @@ public CompletableFuture addAckedPartitionToTxn(TxnID txnID, return transactionLog.append(transactionMetadataEntry) .thenCompose(position -> { try { - txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions); - txnMetaMap.get(txnID).getRight().add(position); + synchronized (txnMetaListPair.getLeft()) { + txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions); + txnMetaMap.get(txnID).getRight().add(position); + } return CompletableFuture.completedFuture(null); } catch (InvalidTxnStatusException e) { - txnMetaMap.get(txnID).getRight().add(position); + transactionLog.deletePosition(Collections.singletonList(position)); log.error("TxnID : " + txnMetaListPair.getLeft().id().toString() + " add acked subscription error with TxnStatus : " + txnMetaListPair.getLeft().status().name(), e); @@ -295,19 +298,20 @@ public CompletableFuture updateTxnStatus(TxnID txnID, TxnStatus newStatus, return transactionLog.append(transactionMetadataEntry).thenCompose(position -> { try { - txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus); - txnMetaListPair.getRight().add(position); + synchronized (txnMetaListPair.getLeft()) { + txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus); + txnMetaListPair.getRight().add(position); + } if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) { return transactionLog.deletePosition(txnMetaListPair.getRight()).thenCompose(v -> { txnMetaMap.remove(txnID); txnIdSortedSet.remove(txnID.getLeastSigBits()); - ((TxnMetaImpl) txnMetaListPair.getLeft()).recycle(); return CompletableFuture.completedFuture(null); }); } return CompletableFuture.completedFuture(null); } catch (InvalidTxnStatusException e) { - txnMetaListPair.getRight().add(position); + transactionLog.deletePosition(Collections.singletonList(position)); log.error("TxnID : " + txnMetaListPair.getLeft().id().toString() + " add update txn status error with TxnStatus : " + txnMetaListPair.getLeft().status().name(), e); diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java index 16c5a7b3c9fc6..4462ffc3e4df7 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.transaction.coordinator.impl; -import io.netty.util.Recycler; -import io.netty.util.Recycler.Handle; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -38,38 +36,13 @@ */ class TxnMetaImpl implements TxnMeta { - private TxnID txnID; + private final TxnID txnID; private final Set producedPartitions = new HashSet<>(); private final Set ackedPartitions = new HashSet<>(); private volatile TxnStatus txnStatus = TxnStatus.OPEN; - private final Handle recycleHandle; - private static final Recycler RECYCLER = new Recycler() { - protected TxnMetaImpl newObject(Recycler.Handle handle) { - return new TxnMetaImpl(handle); - } - }; - - TxnMetaImpl(Handle handle) { - this.recycleHandle = handle; - } - - // Constructor for transaction metadata - static TxnMetaImpl create(TxnID txnID) { - @SuppressWarnings("unchecked") - TxnMetaImpl txnMeta = RECYCLER.get(); - txnMeta.txnID = txnID; - return txnMeta; - } - - public void recycle() { - this.producedPartitions.clear(); - this.ackedPartitions.clear(); - this.txnStatus = TxnStatus.OPEN; - - if (recycleHandle != null) { - recycleHandle.recycle(this); - } + TxnMetaImpl(TxnID txnID) { + this.txnID = txnID; } @Override