Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transaction]Transaction pendingack server implement patch #8426

Merged
merged 13 commits into from
Nov 11, 2020
Merged

[Transaction]Transaction pendingack server implement patch #8426

merged 13 commits into from
Nov 11, 2020

Conversation

congbobo184
Copy link
Contributor

@congbobo184 congbobo184 commented Nov 2, 2020

Fix #7981
this PR patch #8256

Motivation

  • we need to support transaction pending-ack stat, and we need to support bacth ack.

implement

  • client ack with transaction will carry only this ack bit set 1 1 1 1 1 0 1 1,the 6 point is this transaction ack bit set point.

  • we will find the batch size from consumer pendingAcks, when we don't find it ,the ack will fail.

  • the normal individual ack will sync the bitch size to pending ack handle.

  • we will remove the position from consumer pending acks after normal ack sync finish then check if batch position in pending ack handle all acked.

  • when the position acked finish, we will check it is less than mark delete position then we will clear the position from pending ack handle.

  • abort the transaction, we will recover the transaction acked batch point to 1.

Verifying this change

Add the tests for it

Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes

Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)

@congbobo184
Copy link
Contributor Author

/pulsarbot run-failure-checks

1 similar comment
@congbobo184
Copy link
Contributor Author

/pulsarbot run-failure-checks

Comment on lines 61 to 63
if (currentPosition == null || otherPosition ==null) {
return -1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to throw an exception here, if return -1 it means the currentPosition is lower than otherPosition

Comment on lines 68 to 70
if (otherPosition.getAckSet() == null) {
return result;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why otherPosition's ackSet is null then return the result?

Comment on lines 72 to 91
BitSetRecyclable otherAckSet;
if (currentPosition.getAckSet() == null) {
if (otherPosition.getAckSet() != null) {
otherAckSet = BitSetRecyclable.valueOf(otherPosition.getAckSet());
if (otherAckSet.isEmpty()) {
otherAckSet.recycle();
return result;
} else {
otherAckSet.recycle();
return 1;
}

}
return result;
}
otherAckSet = BitSetRecyclable.valueOf(otherPosition.getAckSet());
BitSetRecyclable thisAckSet = BitSetRecyclable.valueOf(currentPosition.getAckSet());
result = thisAckSet.nextSetBit(0) - otherAckSet.nextSetBit(0);
otherAckSet.recycle();
thisAckSet.recycle();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use an empty ackset to simplify these lines.

Comment on lines 460 to 461
return FutureUtil.failedFuture(
new TransactionConflictException(error));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this for normal ack, shall we need to return TransactionConflictException?



//this method is for individual ack carry the transaction
private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks there is much duplicate code with the individualAckNormal, is it possible to refine it?

@@ -131,12 +132,13 @@
private static final double avgPercent = 0.9;
private boolean preciseDispatcherFlowControl;
private PositionImpl readPositionWhenJoining;
private final boolean isTransactionEnabled;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the isTransactionEnabled to the topic or subscription? We cannot treat consumers differently in transaction.

@wolfstudy wolfstudy merged commit dafa7e0 into apache:master Nov 11, 2020
flowchartsman pushed a commit to flowchartsman/pulsar that referenced this pull request Nov 17, 2020
Fix apache#7981 
this PR patch apache#8256 
## Motivation
- we need to support transaction pending-ack stat, and we need to support bacth ack.

## implement

- client ack with transaction will carry only this ack bit set 1 1 1 1 1 0 1 1,the 6 point is this transaction ack bit set point.
- commit or abort will recover the bit set point to 1

### Verifying this change
Add the tests for it

Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes

Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Transaction] Support pending-ack state for batch messages
5 participants