Skip to content

Comments

[Transaction] Add the batch size in transaction ack command.#8612

Closed
congbobo184 wants to merge 8 commits intoapache:masterfrom
congbobo184:congbobo184_txn_batch_pendingack_modify
Closed

[Transaction] Add the batch size in transaction ack command.#8612
congbobo184 wants to merge 8 commits intoapache:masterfrom
congbobo184:congbobo184_txn_batch_pendingack_modify

Conversation

@congbobo184
Copy link
Contributor

Motivation

Now in Failover sub we ack with transaction will not get the batch size from consumer pendingAcks, so we should ack request carry the batch size.

implement

We ack with transaction will carry the bath size for individual ack delete the consumer pendingAcks.

Verifying this change

Add the tests for it

Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes

Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (yes)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)

if (pendingAckHandle == null) {
return FutureUtil.failedFuture(
new TransactionConflictException("Broker does't support Transaction pending ack!"));
new NotAllowedException("Broker does't support Transaction pending ack!"));
Copy link
Contributor

@codelipenghui codelipenghui Nov 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
new NotAllowedException("Broker does't support Transaction pending ack!"));
new NotAllowedException("The transaction is disabled"));

optional int32 partition = 3 [default = -1];
optional int32 batch_index = 4 [default = -1];
repeated int64 ack_set = 5;
optional uint64 batch_size = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
optional uint64 batch_size = 6;
optional int32 batch_size = 6;

int32 is enough for batch size.

@@ -435,9 +435,6 @@ CompletableFuture<Void> messageAcked(CommandAck ack) {
private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String,Long> properties) {
List<Position> positionsAcked = new ArrayList<>();
List<PositionImpl> checkBatchPositions = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line can be deleted?

Comment on lines +2421 to +2422
cmd = Commands.newAck(consumerId, ledgerId, entryId, bitSetRecyclable, ackType, validationError, properties,
txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId, batchMessageId.getBatchSize());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the txn is not null needs to set the batch size.

@congbobo184
Copy link
Contributor Author

/pulsarbot run-failure-checks

1 similar comment
@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@congbobo184
Copy link
Contributor Author

/pulsarbot run-failure-checks

1 similar comment
@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants