[Transaction] Transaction pending ack lazy init.#11091
[Transaction] Transaction pending ack lazy init.#11091congbobo184 merged 18 commits intoapache:masterfrom
Conversation
eolivelli
left a comment
There was a problem hiding this comment.
very interesting work !
it is a big patch. I will do a second pass tomorrow
...src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleState.java
Show resolved
Hide resolved
|
@merlimat @codelipenghui @eolivelli Would you please help review this PR ? |
|
move to 2.8.2. |
…_ack_lazy_load # Conflicts: # managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java # managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java # managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java # managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
| initPendingAckStore(); | ||
| return completableFuture; | ||
| } else if (checkIfReady()) { | ||
|
|
There was a problem hiding this comment.
When ready, we should let the code continue to execute downward
| synchronized (PendingAckHandleImpl.this) { | ||
| if (!acceptQueue.isEmpty()) { | ||
| CompletableFuture<Void> completableFuture = new CompletableFuture<>(); | ||
| addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture); |
There was a problem hiding this comment.
Why we can't add to the queue if the queue is empty?
There was a problem hiding this comment.
We need to make sure that Ack is in order
| } | ||
| } | ||
|
|
||
| if (!acceptQueue.isEmpty() && !isInCacheRequest) { |
There was a problem hiding this comment.
Is it able to refine the code here? looks like the above part has check isInCacheRequest but here check it again.
There was a problem hiding this comment.
optimized, please review again
|
|
||
| if (pendingAckHandle.changeToReadyState()) { | ||
| pendingAckHandle.completeHandleFuture(); | ||
| pendingAckHandle.handleCacheRequest(); |
There was a problem hiding this comment.
We will hold the lock for a long time here.
There was a problem hiding this comment.
In order to ensure the order of ack, I think we should continue to acquire this lock
gaoran10
left a comment
There was a problem hiding this comment.
Great work! Left some comments.
It seems that the transaction pending ack lay initialization increases the logic complexity, does this is necessary?
| if (pendingAckHandle.changeToReadyState()) { | ||
| pendingAckHandle.completeHandleFuture(); | ||
| pendingAckHandle.handleCacheRequest(); | ||
| log.info("Topic name : [{}], SubName : [{}] pending ack state reply success!", |
There was a problem hiding this comment.
It seems that this log is the same as the above.
| log.info("Topic name : [{}], SubName : [{}] pending ack state reply success!", | ||
| pendingAckHandle.getTopicName(), pendingAckHandle.getSubName()); | ||
| } else { | ||
| log.error("Topic name : [{}], SubName : [{}] pending ack state reply fail!", |
There was a problem hiding this comment.
If the pending ack handle relay failed, how to reply again?
There was a problem hiding this comment.
if change ready state fail, represent that the pending ack has already close
|
|
||
| private void initPendingAckStore() { | ||
| if (changeToInitializingState()) { | ||
| synchronized (PendingAckHandleImpl.this) { |
There was a problem hiding this comment.
I'm not sure why need this lock, it seems that only one thread will perform the replay operation because only one thread could change the state successfully.
There was a problem hiding this comment.
if change Initializing state success, in this time, we can change the state to closed, when change state to closed, we don't need to do replay op.
| @Override | ||
| public synchronized CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties, | ||
| long lowWaterMark) { | ||
| long lowWaterMark, boolean isInCacheRequest) { |
There was a problem hiding this comment.
It seems that the param isInCacheRequest isn't used, do we need to add this param?
There was a problem hiding this comment.
good catch, we should add the commit request !isInCacheRequest in to a queue.
| @Override | ||
| public synchronized CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark) { | ||
| public synchronized CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, | ||
| long lowWaterMark, boolean isInCacheRequest) { |
There was a problem hiding this comment.
It seems that the param isInCacheRequest isn't used, do we need to add this param?
There was a problem hiding this comment.
good catch, we should add the abort request !isInCacheRequest in to a queue.
| public CompletableFuture<Void> close() { | ||
| return this.pendingAckStoreFuture.thenAccept(PendingAckStore::closeAsync); | ||
| changeToCloseState(); | ||
| synchronized (PendingAckHandleImpl.this) { |
|
|
||
| } else { | ||
| return FutureUtil.failedFuture( | ||
| new ServiceUnitNotReadyException("PendingAckHandle not replay complete!")); |
There was a problem hiding this comment.
Does this error log is right? It seems that the state is Error or Close.
| if (!isInCacheRequest) { | ||
| if (!checkIfReady()) { | ||
| synchronized (PendingAckHandleImpl.this) { | ||
| if (state == State.Initializing) { |
There was a problem hiding this comment.
It seems that this check is repeated many times, maybe we could add a method to make this check.
such as
public boolean needCacheRequest() {
if (state == State.Initializing ||) {
return true;
} else if (state == State.None) {
initPendingAckStore();
return true;
} else if (checkIfReady()) {
// do nothing
} else {
return FutureUtil.failedFuture(
new ServiceUnitNotReadyException("PendingAckHandle replay failed!"));
}
}
There was a problem hiding this comment.
in different state, commit, ack and abort will do different op, so this method return what?
There was a problem hiding this comment.
If the return value is true, we could cache the request in the queue.
| boolean isInCacheRequest) { | ||
| if (!isInCacheRequest) { | ||
| if (!checkIfReady()) { | ||
| synchronized (PendingAckHandleImpl.this) { |
There was a problem hiding this comment.
Does this lock is used to make sure the state doesn't be changed?
|
@gaoran10 thanks for you review, fix some comment, please review again. |
# Conflicts: # pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
## Motivation now, in `broker.conf` `transactionCoordinatorEnabled=true` MLPendingAck will init manageLedger, some ack will not use transaction, so don't need to init manageLedger. When this sub use transaction, we can lazy init `PendingAckHandle`. ## implement When this sub use transaction, we can lazy init `PendingAckHandle`.
Motivation
now, in
broker.conftransactionCoordinatorEnabled=trueMLPendingAck will init manageLedger, some ack will not use transaction, so don't need to init manageLedger. When this sub use transaction, we can lazy initPendingAckHandle.implement
When this sub use transaction, we can lazy init
PendingAckHandle.Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)