Skip to content

Commit 6ae2d0e

Browse files
3paccccccganesh-ctds
authored andcommitted
[fix][broker] Fix potential NPE in InMemTransactionBuffer.appendBufferToTxn by returning a valid Position (apache#25039)
(cherry picked from commit 6199148)
1 parent 7612bc6 commit 6ae2d0e

File tree

2 files changed

+30
-3
lines changed

2 files changed

+30
-3
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.ConcurrentHashMap;
3232
import java.util.concurrent.ConcurrentMap;
3333
import org.apache.bookkeeper.mledger.Position;
34+
import org.apache.bookkeeper.mledger.PositionFactory;
3435
import org.apache.pulsar.broker.service.Topic;
3536
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
3637
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
@@ -47,7 +48,7 @@
4748
/**
4849
* The in-memory implementation of {@link TransactionBuffer}.
4950
*/
50-
class InMemTransactionBuffer implements TransactionBuffer {
51+
public class InMemTransactionBuffer implements TransactionBuffer {
5152

5253
/**
5354
* A class represents the buffer of a transaction.
@@ -269,10 +270,10 @@ public CompletableFuture<Position> appendBufferToTxn(TxnID txnId,
269270
ByteBuf buffer) {
270271
TxnBuffer txnBuffer = getTxnBufferOrCreateIfNotExist(txnId);
271272

272-
CompletableFuture appendFuture = new CompletableFuture();
273+
CompletableFuture<Position> appendFuture = new CompletableFuture<>();
273274
try {
274275
txnBuffer.appendEntry(sequenceId, buffer);
275-
appendFuture.complete(null);
276+
appendFuture.complete(PositionFactory.EARLIEST);
276277
} catch (TransactionBufferException.TransactionSealedException e) {
277278
appendFuture.completeExceptionally(e);
278279
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import lombok.Cleanup;
3939
import org.apache.bookkeeper.mledger.ManagedLedger;
4040
import org.apache.bookkeeper.mledger.ManagedLedgerException;
41+
import org.apache.bookkeeper.mledger.Position;
4142
import org.apache.bookkeeper.mledger.PositionFactory;
4243
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
4344
import org.apache.commons.lang3.RandomUtils;
@@ -50,6 +51,8 @@
5051
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
5152
import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
5253
import org.apache.pulsar.broker.transaction.TransactionTestBase;
54+
import org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBuffer;
55+
import org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferProvider;
5356
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
5457
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
5558
import org.apache.pulsar.broker.transaction.buffer.utils.TransactionBufferTestImpl;
@@ -574,4 +577,27 @@ public void testRefCountWhenAppendBufferToTxn() throws Exception {
574577
byteBuf2.release();
575578
byteBuf3.release();
576579
}
580+
581+
@Test(timeOut = 10000)
582+
public void testAppendBufferToTxnWithInMemTransactionBuffer() throws Exception {
583+
// 1. Prepare test resource
584+
this.pulsarServiceList.forEach(pulsarService -> {
585+
pulsarService.setTransactionBufferProvider(new InMemTransactionBufferProvider());
586+
});
587+
String topic = "persistent://" + NAMESPACE1 + "/testAppendBufferToTxnWithInMemTransactionBuffer";
588+
admin.topics().createNonPartitionedTopic(topic);
589+
PersistentTopic persistentTopic = (PersistentTopic) pulsarServiceList.get(0).getBrokerService()
590+
.getTopic(topic, false)
591+
.get()
592+
.get();
593+
InMemTransactionBuffer topicTransactionBuffer = (InMemTransactionBuffer) persistentTopic
594+
.getTransactionBuffer();
595+
ByteBuf byteBuf = Unpooled.buffer();
596+
Position position = topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L, byteBuf)
597+
.get(5, TimeUnit.SECONDS);
598+
// 2.position should be PositionFactory.EARLIEST with InMemTransactionBuffer
599+
assertEquals(PositionFactory.EARLIEST, position);
600+
// 3. release resource
601+
byteBuf.release();
602+
}
577603
}

0 commit comments

Comments
 (0)