-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Transaction] TransactionBuffer Refactor #8347
Conversation
@@ -80,10 +80,12 @@ message ClusterMessageId { | |||
message MessageIdData { | |||
required uint64 ledger_id = 1; | |||
required uint64 entry_id = 2; | |||
optional int32 partition = 3 [default = -1]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need partition information in the marker? A marker can only belong to a single partition right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll fix this.
optional MessageIdData message_id = 1; | ||
repeated MessageIdData messageIdList = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can keep repeated MessageIdData message_ids
here. Since we don't need to consider the compatibility guarantee for the first transaction release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
@@ -34,6 +34,7 @@ | |||
/** | |||
* Persistent transaction buffer provider. | |||
*/ | |||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's some of the old codes are not useful, we can delete them directly. we don't want to tell users there some deprecated components in the first transaction release.
Please check all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
@Slf4j | ||
public class TopicTransactionBuffer implements TransactionBuffer { | ||
|
||
private PersistentTopic topic; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private PersistentTopic topic; | |
private final PersistentTopic topic; |
public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) { | ||
return null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we need this method?
CompletableFuture<Position> completableFuture = new CompletableFuture<>(); | ||
topic.publishMessage(buffer, (e, ledgerId, entryId) -> { | ||
if (e != null) { | ||
log.error("Failed to appendBufferToTxn for txn {}", txnId, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.error("Failed to appendBufferToTxn for txn {}", txnId, e); | |
log.error("Failed to append buffer to txn {}", txnId, e); |
} | ||
|
||
private List<PulsarMarkers.MessageIdData> getMessageIdDataList(List<MessageIdData> sendMessageIdList) { | ||
List<PulsarMarkers.MessageIdData> messageIdDataList = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List<PulsarMarkers.MessageIdData> messageIdDataList = new ArrayList<>(); | |
List<PulsarMarkers.MessageIdData> messageIdDataList = new ArrayList<>(sendMessageIdList.size()); |
.setLedgerId(msgIdData.getLedgerId()) | ||
.setEntryId(msgIdData.getEntryId()).build()); | ||
} | ||
return messageIdDataList; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recycle the MessageIdData list before return?
|
||
try { | ||
return Commands.serializeMetadataAndPayload(ChecksumType.Crc32c, msgMetadata, payload); | ||
} finally { | ||
payload.release(); | ||
msgMetadata.recycle(); | ||
msgMetadataBuilder.recycle(); | ||
if (commitMarkerBuilder != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commitMarkerBuilder always not null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll fix this.
PulsarMarkers.TxnCommitMarker.Builder commitMarkerBuilder = PulsarMarkers.TxnCommitMarker.newBuilder(); | ||
|
||
messageIdDataList.ifPresent(commitMarkerBuilder::addAllMessageId); | ||
PulsarMarkers.TxnCommitMarker commitMarker = commitMarkerBuilder.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commitMarker also should be recycled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
@@ -236,7 +238,16 @@ public void handleAddSubscriptionToTxnResponse(PulsarApi.CommandAddSubscriptionT | |||
return callback; | |||
} | |||
long requestId = client.newRequestId(); | |||
ByteBuf cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), PulsarApi.TxnAction.COMMIT); | |||
List<PulsarApi.MessageIdData> messageIdDataList = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be recycled after no longer use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
@@ -255,7 +266,17 @@ public void handleAddSubscriptionToTxnResponse(PulsarApi.CommandAddSubscriptionT | |||
return callback; | |||
} | |||
long requestId = client.newRequestId(); | |||
ByteBuf cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), PulsarApi.TxnAction.ABORT); | |||
|
|||
List<PulsarApi.MessageIdData> messageIdDataList = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be recycled after no longer use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
CompletableFuture<TxnID> cb = new CompletableFuture<>(); | ||
if (!canSendRequest(cb)) { | ||
return cb; | ||
} | ||
long requestId = requestIdGenerator.getAndIncrement(); | ||
ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits, txnIdMostBits, topic, action); | ||
List<PulsarApi.MessageIdData> messageIdDataList = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be recycled after no longer use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
2. transaction commit refactor.
2. add new test for topic transaction buffer.
667a25d
to
37e56ec
Compare
import org.apache.bookkeeper.mledger.ManagedLedger; | ||
import org.apache.pulsar.broker.PulsarService; | ||
|
||
public class TransactionMessageDeduplication extends MessageDeduplication { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks not related to this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove this class.
@@ -73,9 +77,11 @@ | |||
private final ServiceConfiguration serviceConfig; | |||
private volatile ScheduledFuture<?> readOnActiveConsumerTask = null; | |||
|
|||
LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only instantiate when the transaction is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll fix this.
@@ -115,4 +115,9 @@ default void cursorIsReset() { | |||
default void acknowledgementWasProcessed() { | |||
// No-op | |||
} | |||
|
|||
default void addMessageToRedelivery(long ledgerId, long entryId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default void addMessageToRedelivery(long ledgerId, long entryId) { | |
default void addMessageToReplay(long ledgerId, long entryId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think use replay is more reasonable here. The redeliver looks like the consumer receiver the messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
transactionReader.read(messagesToRead, consumer, this); | ||
if (havePendingReplayRead) { | ||
if (log.isDebugEnabled()) { | ||
log.debug("have pending replay read"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to print the topic name ,subscription name in the log. You can refer to other log patterns in the PersistentDispatcherSingleActiveConsumer.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
private final Dispatcher dispatcher; | ||
private final Executor executor; | ||
|
||
private final ConcurrentLongPairSet pendingReadPosition; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have messageToRedeliver
in the Dispatcher, why need to add a pendingReadPosition
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove the class TransactionMessageReader
.
int totalMessages = 0; | ||
long totalBytes = 0; | ||
int totalChunkedMessages = 0; | ||
|
||
boolean afterTxnCommitMarker = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boolean afterTxnCommitMarker = false; | |
boolean isAfterTxnCommitMarker = false; |
Fix https://github.com/streamnative/pulsar/issues/1575 Fix issue apache#8378 ### Motivation ![image](https://user-images.githubusercontent.com/15029908/96908159-d38d3f80-14ce-11eb-9e52-ee066434d960.png) Use the above approach instead of the sidecar approach. ### Modifications 1. Produce transaction messages to the topic partition. 2. The commit marker needs to record the related message-id list of its transaction. 3. When the dispatcher read a transaction marker, get the messages of the transaction by message-id list in the marker and send them to the consumer. 2. TransactionBuffer doesn't maintain any index data.
Fix https://github.com/streamnative/pulsar/issues/1575 Fix issue apache#8378 ### Motivation ![image](https://user-images.githubusercontent.com/15029908/96908159-d38d3f80-14ce-11eb-9e52-ee066434d960.png) Use the above approach instead of the sidecar approach. ### Modifications 1. Produce transaction messages to the topic partition. 2. The commit marker needs to record the related message-id list of its transaction. 3. When the dispatcher read a transaction marker, get the messages of the transaction by message-id list in the marker and send them to the consumer. 2. TransactionBuffer doesn't maintain any index data.
### Motivation Currently, sending transaction messages in a sync way is disabled, because the design of the `TransactionBuffer` had some changes(refer to #8347), sending transaction messages in a sync way is allowed. ### Modifications Remove the transaction check in the `send` method of the class `TypedMessageBuilderImpl`.
Would you please tell me why pulsar finally chose to replace sidecar's appraoch with marker's appraoch, which seems different from the conclusion in the previous PIP-31 ? @gaoran10 |
@cytnju please ask your question on dev@pulsar.apache.org |
Fix https://github.com/streamnative/pulsar/issues/1575
Fix issue #8378
Motivation
Use the above approach instead of the sidecar approach.
Modifications
Verifying this change
This change added tests and can be verified as follows:
Added unit tests for produce transaction messages and end transaction and read the messages
org.apache.pulsar.broker.transaction.TransactionProduceTest
org.apache.pulsar.broker.transaction.TransactionConsumeTest
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation