-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Transaction] Produce transaction messages and commit #7552
[Transaction] Produce transaction messages and commit #7552
Conversation
3c564a0
to
23a279b
Compare
/pulsarbot run-failure-checks |
4 similar comments
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
2 similar comments
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
656c9af
to
cc99b42
Compare
/pulsarbot run-failure-checks |
32081ba
to
8fbcf12
Compare
/pulsarbot run-failure-checks |
8fbcf12
to
ce460c9
Compare
transactionBufferProvider = TransactionBufferProvider.newProvider( | ||
PersistentTransactionBufferProvider.class.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to config in the broker.conf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll fix it.
@@ -1212,6 +1221,33 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { | |||
|
|||
startSendOperation(producer, headersAndPayload.readableBytes(), send.getNumMessages()); | |||
|
|||
final long produceId = producer.getProducerId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to handle the transaction message in the topic, not the server cnx.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's more reasonable.
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
.thenRun(() -> { | ||
if (log.isDebugEnabled()) { | ||
log.debug("Send response success for end txn request {}", command.getRequestId()); | ||
} | ||
updateTBStatus(txnID, newStatus).thenRun(() -> { | ||
service.pulsar().getTransactionMetadataStoreService() | ||
.updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING); | ||
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, | ||
txnID.getLeastSigBits(), txnID.getMostSigBits())); | ||
}).exceptionally(throwable -> { | ||
log.error("Failed to get txn `" + txnID + "` txnMeta.", throwable); | ||
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(), | ||
BrokerServiceException.getClientErrorCode(throwable), throwable.getMessage())); | ||
return null; | ||
}); | ||
}).exceptionally(throwable -> { | ||
if (log.isDebugEnabled()) { | ||
log.debug("Send response error for end txn request {}", command.getRequestId()); | ||
} | ||
ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(), txnID.getMostSigBits(), | ||
BrokerServiceException.getClientErrorCode(throwable), throwable.getMessage())); | ||
return null; | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should handle it in the TC and TC should retry to make sure the TB can handle success.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll fix this.
2. Adjust publish transaction message logic; 2. Integrate with TransactionBufferClientImpl; 3. Update unit test, support multi brokers.
d8c208e
to
a1f5490
Compare
/pulsarbot run-failure-checks |
### Motivation Currently, the transaction components are all independent, the relationship between transaction client and transaction server needs to be established. The target of this PR is making the Pulsar client could send transaction messages to the Pulsar broker and execute commit command.
### Motivation Currently, the transaction components are all independent, the relationship between transaction client and transaction server needs to be established. The target of this PR is making the Pulsar client could send transaction messages to the Pulsar broker and execute commit command.
### Motivation Currently, the transaction components are all independent, the relationship between transaction client and transaction server needs to be established. The target of this PR is making the Pulsar client could send transaction messages to the Pulsar broker and execute commit command.
### Motivation Currently, the transaction components are all independent, the relationship between transaction client and transaction server needs to be established. The target of this PR is making the Pulsar client could send transaction messages to the Pulsar broker and execute commit command.
Master Issue: #2664
Motivation
Currently, the transaction components are all independent, the relationship between transaction client and transaction server needs to be established.
The target of this PR is making the Pulsar client could send transaction messages to the Pulsar broker and execute commit command.
Modifications
Verifying this change
This change added tests
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation