[feat][txn] Transaction buffer snapshot writer reuse#19641
[feat][txn] Transaction buffer snapshot writer reuse#19641gaoran10 merged 7 commits intoapache:masterfrom
Conversation
...oker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
Outdated
Show resolved
Hide resolved
poorbarcode
left a comment
There was a problem hiding this comment.
Great improve, left some comments
| }); | ||
| } | ||
|
|
||
| private SystemTopicClient<T> getTransactionBufferSystemTopicClient(TopicName topicName) { |
There was a problem hiding this comment.
Why not change the param TopicName to Namespace?
...org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
Outdated
Show resolved
Hide resolved
...org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
Outdated
Show resolved
Hide resolved
.../org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
Outdated
Show resolved
Hide resolved
| return future; | ||
| } | ||
|
|
||
| private void retain() { |
There was a problem hiding this comment.
Should we lock in release and retain? Otherwise, it may result in getting a writer whose referenceCount is 0
There was a problem hiding this comment.
We may not only need to lock release and retain.
If we do not lock release and retain.
| time | task execute ReferenceCountedWriter::close |
task execute getReferenceWriter |
|---|---|---|
| 0 | check referenceCount != 0 | retain check referenceCount != 0 |
| 1 | decrement referenceCount = 0 | |
| 2 | remove and close the writer | |
| 3 | increment referenceCount | |
| 4 | return the closed writer |
There was a problem hiding this comment.
If we only lock release and retain.
| time | task execute ReferenceCountedWriter::close |
task execute getReferenceWriter |
|---|---|---|
| 0 | check referenceCount != 0 | |
| 1 | decrement referenceCount = 0 | |
| 2 | remove and close the writer | |
| 3 | retain check referenceCount == 0 |
|
| 4 | throw RuntimeException |
There was a problem hiding this comment.
We can change the retain method to
private synchronized boolean retain() {
if (referenceCount.get() == 0) {
return false;
} else {
this.referenceCount.incrementAndGet();
return true;
}
}And change getReferenceWriter to
public ReferenceCountedWriter<T> getReferenceWriter(TopicName topicName) {
return refCountedWriterMap.compute(topicName.getNamespaceObject(), (k, v) -> {
if (v != null && v.retain()) {
return v;
} else {
return new ReferenceCountedWriter<>(topicName.getNamespaceObject(),
getTransactionBufferSystemTopicClient(topicName).newWriterAsync(), this);
}
});
}There was a problem hiding this comment.
Good catch! I adjust logic.
| this.refCountedWriterMap = new ConcurrentHashMap<>(); | ||
| } | ||
|
|
||
| public CompletableFuture<SystemTopicClient.Writer<T>> createWriter(TopicName topicName) { |
There was a problem hiding this comment.
This method is useless. We can delete it, right?
| return future; | ||
| } | ||
|
|
||
| private void retain() { |
There was a problem hiding this comment.
We may not only need to lock release and retain.
If we do not lock release and retain.
| time | task execute ReferenceCountedWriter::close |
task execute getReferenceWriter |
|---|---|---|
| 0 | check referenceCount != 0 | retain check referenceCount != 0 |
| 1 | decrement referenceCount = 0 | |
| 2 | remove and close the writer | |
| 3 | increment referenceCount | |
| 4 | return the closed writer |
| return future; | ||
| } | ||
|
|
||
| private void retain() { |
There was a problem hiding this comment.
If we only lock release and retain.
| time | task execute ReferenceCountedWriter::close |
task execute getReferenceWriter |
|---|---|---|
| 0 | check referenceCount != 0 | |
| 1 | decrement referenceCount = 0 | |
| 2 | remove and close the writer | |
| 3 | retain check referenceCount == 0 |
|
| 4 | throw RuntimeException |
| return future; | ||
| } | ||
|
|
||
| private void retain() { |
There was a problem hiding this comment.
We can change the retain method to
private synchronized boolean retain() {
if (referenceCount.get() == 0) {
return false;
} else {
this.referenceCount.incrementAndGet();
return true;
}
}And change getReferenceWriter to
public ReferenceCountedWriter<T> getReferenceWriter(TopicName topicName) {
return refCountedWriterMap.compute(topicName.getNamespaceObject(), (k, v) -> {
if (v != null && v.retain()) {
return v;
} else {
return new ReferenceCountedWriter<>(topicName.getNamespaceObject(),
getTransactionBufferSystemTopicClient(topicName).newWriterAsync(), this);
}
});
}|
@liangyepianzhou Good idea! I make some changes, PLTA. |
|
An error was encountered while deleting the namespace, I'm trying to fix it. |
cb945b8 to
4472ee6
Compare
11cd55e to
299d196
Compare
|
Please rebase master branch, the testBatchMetadataMetrics should be fixed |
299d196 to
bdbc228
Compare
Motivation
The transaction buffer(short for TB) snapshot topic is used to persistent snapshot data for TB, the snapshot can reduce read data when recovering.
Currently, if enable the transaction feature, every topic will create a TB snapshot producer, the topic of the producer is a namespace system topic, so we don't need to create a producer for every topic, one producer per namespace is enough.
Modifications
Add a new inner class
ReferenceCountedWriter, if theReferenceCountedWriterfor one namespace does not exist, it will be initialized, or else its reference count will be increased, if the reference count reduces to 0, it will be removed from map-cache.Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: gaoran10#23