-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Transaction] Transaction buffer snapshot implementation. #9490
[Transaction] Transaction buffer snapshot implementation. #9490
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great work
log.debug("[{}]Transaction buffer not recover complete!", topic); | ||
} | ||
publishContext.completed(new BrokerServiceException | ||
.ServiceUnitNotReadyException("[{" + topic + "}]Transaction buffer not " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about: "Transaction buffer recovery is still running" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
this.topic = topic; | ||
this.changeToInitializingState(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about moving this initialisation procedure in a separate method ?
executing non trivial code in the constructor is problematic because in case of problems you are not able to dispose correctly the object, because the caller does not receive a reference to the new object but we are still leaking references to the object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks. i will move it to recover method
@@ -168,7 +277,53 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { | |||
} | |||
} | |||
|
|||
private synchronized void takeSnapshotBuyChangeTimes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: "buy" -> "by"
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! left some comments.
protected CompletableFuture<Reader<TransactionBufferSnapshot>> newReaderAsyncInternal() { | ||
return client.newReader(Schema.AVRO(TransactionBufferSnapshot.class)) | ||
.topic(topicName.toString()) | ||
.startMessageId(MessageId.latest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the start message-id is MessageId.earliest
?
+ "messageId : {}", topic.getName(), messageId); | ||
} | ||
}).exceptionally(e -> { | ||
if (log.isDebugEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could make a warn-level log? Because if many snapshot operations are failed, the recovery time will be too long, the warn-level log could help users find out why the recovery cost such time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, great suggestion!
if (!checkIfReady()) { | ||
if (log.isDebugEnabled()) { | ||
log.debug("[{}]Transaction buffer not recover complete!", topic.getName()); | ||
} | ||
return FutureUtil.failedFuture( | ||
new ServiceUnitNotReadyException("[{" + topic.getName() | ||
+ "}]Transaction buffer not recover complete!")); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about making these codes as a method? I found three points use this check.
this.recover = recover; | ||
} | ||
void fillQueue() { | ||
if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could wait for the entry queue size to a threshold value then read more entries to fill the entry queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should make sure there is only one read operation outside, so we should not only use threshold value
} | ||
} else { | ||
if (exceptionNumber.get() > MAX_EXCEPTION_NUMBER) { | ||
log.error("[{}]Transaction buffer recover fail when " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the entry could be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is poll method, when read operation not complete, the entry will be null.
} | ||
} | ||
|
||
static class TopicTransactionBufferRecover implements Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a simple description for the recovery flow?
/** | ||
* Topic transaction buffer recover complete. | ||
*/ | ||
void replayComplete(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about changing the method name to recoverComplete
?
void handleSnapshot(TransactionBufferSnapshot snapshot); | ||
|
||
/** | ||
* Handle transaction entry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Handle transaction entry. | |
* Handle transaction entry beyond the snapshot. |
/pulsarbot run-failure-checks |
conf/broker.conf
Outdated
@@ -1203,6 +1203,13 @@ brokerServicePurgeInactiveFrequencyInSeconds=60 | |||
transactionCoordinatorEnabled=false | |||
transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider | |||
|
|||
# Transaction buffer take snapshot interval number | |||
transactionBufferTakeSnapshotIntervalNumber=1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number is the transactions count or entry count? And it's better to use max
as the prefix?
conf/broker.conf
Outdated
|
||
# Transaction buffer take snapshot interval time | ||
# Unit : millisecond | ||
transactionBufferTakeSnapshotIntervalTime=5000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transactionBufferTakeSnapshotIntervalTime=5000 | |
transactionBufferSnapshotMinTimeInMills=5000 |
log.debug("[{}]Transaction buffer not recover complete!", topic.getName()); | ||
} | ||
return FutureUtil.failedFuture( | ||
new ServiceUnitNotReadyException("[{" + topic.getName() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If return ServiceUntiNotReadException, the client will redo the lookup. It should be a transaction exception
@@ -168,7 +280,53 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { | |||
} | |||
} | |||
|
|||
private synchronized void takeSnapshotByChangeTimes() { | |||
if (changeMaxReadPositionAndAddAbortTimes.get() >= takeSnapshotIntervalNumber) { | |||
takeSnapshot(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
takeSnapshot is an async method. we need to avoid concurrent snapshots.
private final AtomicLong exceptionNumber = new AtomicLong(); | ||
|
||
// TODO: MAX_EXCEPTION_NUMBER can config | ||
private static final int MAX_EXCEPTION_NUMBER = 500; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What the expected behavior if the exceptions < MAX_EXCEPTION_NUMBER? In this case, if the transaction buffer recovered, does the transaction buffer will lose some transaction state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it won't lose transaction state, because when read fail the read position will not change. it always read the correct position by the cursor.
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
@@ -224,6 +224,7 @@ enum ServerError { | |||
// use this error to indicate that this producer is now permanently | |||
// fenced. Applications are now supposed to close it and create a | |||
// new producer | |||
TransactionBufferNotRecover = 26; // When transaction buffer not recover completely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we need to expose this error to the client-side? It's better to handle it in the broker internal.
protected final CompletableFuture<Void> transactionCompletableFuture; | ||
protected final TransactionBuffer transactionBuffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to use 2 fields for this case? We can just use CompletableFuture?
/pulsarbot run-failure-checks |
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work.
Overall looks good to me
But there is a typo in the configuration bean, I believe this should be fixed.
category = CATEGORY_TRANSACTION, | ||
doc = "Transaction buffer take snapshot min interval time" | ||
) | ||
private int transactionBufferSnapshotMinTimeInMills = 5000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo:
transactionBufferSnapshotMinTimeInMills -> transactionBufferSnapshotMinTimeInMillis
/** | ||
* Close transaction buffer snapshot service. | ||
*/ | ||
void close() throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need to throw a generic Exception
?
how can this error be handled downstream ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method only close pulsarService use, so I think we don't need to think too much .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
/pulsarbot run-faliure-checks |
/pulsarbot run-failure-checks |
Motivation
Transaction buffer snapshot to recover ongoing and abort transaction.
implement
snapshot metadata:
__transaction_buffer_snapshot
transactionBufferSnapshotMaxTransactionCount
andtransactionBufferSnapshotMinTimeInMills
to control taking snapshotSystemTopicBaseTxnBufferSnapshotService
to create take snapshotWriter
andReader
and cache the same namespace client.Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (yes)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)