Skip to content

Commit

Permalink
[Transaction] add method to clear up transaction buffer snapshot (#11934
Browse files Browse the repository at this point in the history
)

(cherry picked from commit d86db3f)
  • Loading branch information
gaoran10 authored and codelipenghui committed Sep 9, 2021
1 parent 9ce29f6 commit 2cd8d47
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
Expand Down Expand Up @@ -218,6 +219,7 @@ protected TopicStatsHelper initialValue() {

// this future is for publish txn message in order.
private volatile CompletableFuture<Void> transactionCompletableFuture;
@Getter
protected final TransactionBuffer transactionBuffer;

private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
Expand Down Expand Up @@ -1108,7 +1110,8 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
CompletableFuture<SchemaVersion> deleteSchemaFuture =
deleteSchema ? deleteSchema() : CompletableFuture.completedFuture(null);

deleteSchemaFuture.thenAccept(__ -> deleteTopicPolicies()).whenComplete((v, ex) -> {
deleteSchemaFuture.thenAccept(__ -> deleteTopicPolicies())
.thenCompose(__ -> transactionBuffer.clearSnapshot()).whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
unfenceTopicToResume();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,25 @@ interface Writer<T> {
*/
CompletableFuture<MessageId> writeAsync(T t);

/**
* Delete event in the system topic.
* @param t pulsar event
* @return message id
* @throws PulsarClientException exception while write event cause
*/
default MessageId delete(T t) throws PulsarClientException {
throw new UnsupportedOperationException("Unsupported operation");
}

/**
* Async delete event in the system topic.
* @param t pulsar event
* @return message id future
*/
default CompletableFuture<MessageId> deleteAsync(T t) {
throw new UnsupportedOperationException("Unsupported operation");
}

/**
* Close the system topic writer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ public CompletableFuture<MessageId> writeAsync(TransactionBufferSnapshot transac
.value(transactionBufferSnapshot).sendAsync();
}

@Override
public MessageId delete(TransactionBufferSnapshot transactionBufferSnapshot) throws PulsarClientException {
return producer.newMessage()
.key(transactionBufferSnapshot.getTopicName())
.value(null)
.send();
}

@Override
public CompletableFuture<MessageId> deleteAsync(TransactionBufferSnapshot transactionBufferSnapshot) {
return producer.newMessage()
.key(transactionBufferSnapshot.getTopicName())
.value(null)
.sendAsync();
}

@Override
public void close() throws IOException {
this.producer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ public interface TransactionBuffer {
*/
CompletableFuture<Void> purgeTxns(List<Long> dataLedgers);

/**
* Clear up the snapshot of the TransactionBuffer.
*
* @return Clear up operation result.
*/
CompletableFuture<Void> clearSnapshot();

/**
* Close the buffer asynchronously.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ public CompletableFuture<Void> purgeTxns(List<Long> dataLedgers) {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> clearSnapshot() {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> closeAsync() {
buffers.values().forEach(TxnBuffer::close);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,15 @@ public CompletableFuture<Void> purgeTxns(List<Long> dataLedgers) {
return null;
}

@Override
public CompletableFuture<Void> clearSnapshot() {
return this.takeSnapshotWriter.thenCompose(writer -> {
TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
snapshot.setTopicName(topic.getName());
return writer.deleteAsync(snapshot);
}).thenCompose(__ -> CompletableFuture.completedFuture(null));
}

@Override
public CompletableFuture<Void> closeAsync() {
changeToCloseState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public CompletableFuture<Void> purgeTxns(List<Long> dataLedgers) {
return null;
}

@Override
public CompletableFuture<Void> clearSnapshot() {
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> closeAsync() {
return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -33,9 +34,11 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.Consumer;
Expand All @@ -50,11 +53,11 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -403,4 +406,71 @@ private void testTopicTransactionBufferDeleteAbort() throws Exception {
}
assertTrue(exist);
}

@Test
public void clearTransactionBufferSnapshotTest() throws Exception {
String topic = NAMESPACE1 + "/tb-snapshot-delete-" + RandomUtils.nextInt();

@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();

Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
producer.newMessage(txn).value("test".getBytes()).sendAsync();
producer.newMessage(txn).value("test".getBytes()).sendAsync();
txn.commit().get();

// take snapshot
PersistentTopic originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
.getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) originalTopic.getTransactionBuffer();
Method takeSnapshotMethod = TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot");
takeSnapshotMethod.setAccessible(true);
takeSnapshotMethod.invoke(topicTransactionBuffer);

TopicName transactionBufferTopicName =
NamespaceEventsSystemTopicFactory.getSystemTopicName(
TopicName.get(topic).getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
PersistentTopic snapshotTopic = (PersistentTopic) getPulsarServiceList().get(0)
.getBrokerService().getTopic(transactionBufferTopicName.toString(), false).get().get();
Field field = PersistentTopic.class.getDeclaredField("currentCompaction");
field.setAccessible(true);

// Trigger compaction and make sure it is finished.
checkSnapshotCount(transactionBufferTopicName, true, snapshotTopic, field);
admin.topics().delete(topic, true);
checkSnapshotCount(transactionBufferTopicName, false, snapshotTopic, field);
}

private void checkSnapshotCount(TopicName topicName, boolean hasSnapshot,
PersistentTopic persistentTopic, Field field) throws Exception {
persistentTopic.triggerCompaction();
CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>) field.get(persistentTopic);
Awaitility.await().untilAsserted(() -> assertTrue(compactionFuture.isDone()));

Reader<TransactionBufferSnapshot> reader = pulsarClient.newReader(Schema.AVRO(TransactionBufferSnapshot.class))
.readCompacted(true)
.startMessageId(MessageId.earliest)
.startMessageIdInclusive()
.topic(topicName.toString())
.create();

int count = 0;
while (true) {
Message<TransactionBufferSnapshot> snapshotMsg = reader.readNext(2, TimeUnit.SECONDS);
if (snapshotMsg != null) {
count++;
} else {
break;
}
}
assertTrue(hasSnapshot ? count > 0 : count == 0);
reader.close();
}

}

0 comments on commit 2cd8d47

Please sign in to comment.