-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Motivation
Now, a namespace will have a system topic that stores the TransactionBuffer snapshot of all topics under the namespace. When the broker recover, all topic transaction buffers in this namespace will create a reader to read the snapshot in the system topic. A large number of read op will reach to bookkeeper. The read operations performed by all topics under this namespace are the same. So let all topics under a namespace share one reader to reduce the pressure to read bookkeeper.
Goal
By adding a map<String, List> snapshotBuffer to the TransactionBufferSnapshotReader to cache snapshots, all topic transaction buffers in a namespace can share a TransactionBufferSnapshotReader. To reduce the read pressure on the bookkeeper when the broker recovers.
Next, we will briefly introduce the current implementation scheme and the replacement scheme we plan to adopt.
API change
Add a default method to the internal interface Reader of the SystemTopicClient interface.
/**
* Read event from system topic by reused reader.
* @param topicName Need a topic name to distinguish different topic calls,
* when a Reader is shared by multiple topics
* @return pulsar event
*/
default Message<T> readNext(String topicName) throws PulsarClientException {
return readNext();
}Implementation
Existing implementation
As shown in the figure below, in the current implementation, each topic has a TransactionBuffer, and each TransactionBuffer will create a transactionBufferReader and a manager ledger reader when recovering. Each topic will read the snapshot in TransactionBufferSystemTopic from the beginning position to last position.
Such an implementation will create a large number of readers for repeated reading. It will not only cause a waste of memory but also cause huge reading pressure to bookkeeper.

Alternative Implementation
As shown in the figure below, we will add a map to the TransactionBufferSnapshotReader to cache the data read by the reader from the bookkeeper. Use the name of the topic as the key of the map, and store the ordered list of read snapshots as the value.
TransactionBuffers of all topics under a namespace share a TransactionBufferSnapshotReader and manager ledger reader.

Process flow
- When the
TransactionBufferSnapshotReaderreads data, it will first determine whether there are unprocessed snapshots of this topic in the cache map. - Returns the oldest unprocessed snapshot to the caller, if one exists. and move it out of the cache map.
- If not, go to the
TransactionBufferSystemTopicto read snapshot. - If the read snapshot is not of this topic, then it will be added to the cache map and continue to read.
- If the read snapshot is of this topic, it is returned directly to the caller.
- If the reader do not have more entries, return null to tell caller recover completely.
Code implement
Implement default read next method.
@Override
public synchronized Message<TransactionBufferSnapshot> readNext(String topicName)
throws PulsarClientException {
LinkedList<Message<TransactionBufferSnapshot>> cache = transactionBufferSnapshotCache.get(topicName);
if (cache != null && cache.size() != 0) {
return cache.removeFirst();
} else {
Message<TransactionBufferSnapshot> message;
while (reader.hasMessageAvailable()) {
message = reader.readNext();
if (message.getKey().equals(topicName)) {
return message;
} else {
LinkedList<Message<TransactionBufferSnapshot>> linkedList =
transactionBufferSnapshotCache.computeIfAbsent(message.getKey(),
k -> new LinkedList<>());
linkedList.add(message);
}
}
return null;
}
}Using the new implementation does not need to evaluate reader.hasMoreEvents() and topic.getName().equals(message.getKey()).
while (true) {
Message<TransactionBufferSnapshot> message = reader.readNext(topic.getName());
if (message == null) {
break;
}
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
if (transactionBufferSnapshot != null) {
hasSnapshot = true;
callBack.handleSnapshot(transactionBufferSnapshot);
this.startReadCursorPosition = PositionImpl.get(
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
transactionBufferSnapshot.getMaxReadPositionEntryId());
}
}Alternative Feasibility Analysis
From the perspective of memory usage
- Added a map to store snapshots that have not been processed. But considering that the processing speed of cpu is much greater than the reading speed of bookkeeper, the map will not be very large and occupy limited memory space.
- Reduced the number of
TransactionBufferSnapshotReadersand the number of manager ledger Readers
So in general, the memory usage in the new scheme increases slightly, but it can be ignored.
From an IO perspective
- If it is read from the start position of the
transactionBufferSystemTopicto the end position, it is recorded as x times of IO. The number of namespaces is m, and the number of topics under one namespace is n. - Then the times of IO has changed from
m* n * xtom*x. dropped an order of magnitude
Therefore, the IO pressure in the new scheme will be greatly reduced.
From the perspective of recovery performance
- Because the processing speed of Cpu is much greater than the speed of reading data from bookkeeper. The time for recover to complete depends on the time taken by bookkeeper to read the snapshot.
- When the amount of IO concurrency is large, the reading speed is lower than that when the amount of IO concurrency is small. So the new implementation can read the snapshot data in bookkeeper faster.
- The speed of reading data from the cache is faster than reading data from the bookkeeper, which can reduce the waiting time of the thread.
Therefore, the recovery speed of the transaction buffer in the new scheme will be faster and the performance will be better.