Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][txn] Ack all message ids when ack chunk messages with transaction. #21268

Merged
merged 6 commits into from
Nov 8, 2023

Conversation

liangyepianzhou
Copy link
Contributor

@liangyepianzhou liangyepianzhou commented Sep 28, 2023

Motivation

Now, only the last chunk will be acknowledged when acknowledging chunk messages with transactions.
If the messageId is a ChunkMessageIdImpl, the ledger ID and entry ID will belong to the lastChunkMsgId.

private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId messageId, AckType ackType,
ValidationError validationError,
Map<String, Long> properties, TxnID txnID) {
long requestId = client.newRequestId();
final MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
final long ledgerId = messageIdAdv.getLedgerId();
final long entryId = messageIdAdv.getEntryId();
final ByteBuf cmd;
if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
bitSetRecyclable.set(0, messageIdAdv.getBatchSize());
if (ackType == AckType.Cumulative) {
MessageIdAdvUtils.acknowledge(messageIdAdv, false);
bitSetRecyclable.clear(0, messageIdAdv.getBatchIndex() + 1);
} else {
bitSetRecyclable.clear(messageIdAdv.getBatchIndex());
}
cmd = Commands.newAck(consumerId, ledgerId, entryId, bitSetRecyclable, ackType, validationError, properties,
txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId, messageIdAdv.getBatchSize());
bitSetRecyclable.recycle();
} else {
cmd = Commands.newAck(consumerId, ledgerId, entryId, null, ackType, validationError, properties,
txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId);
}

public ChunkMessageIdImpl(MessageIdImpl firstChunkMsgId, MessageIdImpl lastChunkMsgId) {
super(lastChunkMsgId.getLedgerId(), lastChunkMsgId.getEntryId(), lastChunkMsgId.getPartitionIndex());
this.firstChunkMsgId = firstChunkMsgId;
}

Modifications

Flow the common message acknowledge logic, ack all the chunks when acknowledging messages with transactions.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

This can make the fix easier understand and save memory.
@liangyepianzhou liangyepianzhou merged commit f581417 into apache:master Nov 8, 2023
50 checks passed
@liangyepianzhou liangyepianzhou deleted the chunk-txn branch November 8, 2023 03:51
liangyepianzhou added a commit that referenced this pull request Nov 8, 2023
…on. (#21268)

Now, only the last chunk will be acknowledged when acknowledging chunk messages with transactions.
If the messageId is a `ChunkMessageIdImpl`, the ledger ID and entry ID will belong to the `lastChunkMsgId`.
https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2791-L2814

https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java#L30-L33

Flow the common message acknowledge logic, ack all the chunks when acknowledging messages with transactions.

(cherry picked from commit f581417)
liangyepianzhou added a commit that referenced this pull request Nov 8, 2023
…on. (#21268)

Now, only the last chunk will be acknowledged when acknowledging chunk messages with transactions.
If the messageId is a `ChunkMessageIdImpl`, the ledger ID and entry ID will belong to the `lastChunkMsgId`.
https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2791-L2814

https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java#L30-L33

Flow the common message acknowledge logic, ack all the chunks when acknowledging messages with transactions.

(cherry picked from commit f581417)
(cherry picked from commit 9ccb3ac)
liangyepianzhou added a commit that referenced this pull request Nov 8, 2023
…on. (#21268)

### Motivation

Now, only the last chunk will be acknowledged when acknowledging chunk messages with transactions.
If the messageId is a `ChunkMessageIdImpl`, the ledger ID and entry ID will belong to the `lastChunkMsgId`.
https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2791-L2814

https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java#L30-L33
### Modifications

Flow the common message acknowledge logic, ack all the chunks when acknowledging messages with transactions.

(cherry picked from commit f581417)
liangyepianzhou added a commit that referenced this pull request Nov 9, 2023
…on. (#21268)

### Motivation

Now, only the last chunk will be acknowledged when acknowledging chunk messages with transactions.
If the messageId is a `ChunkMessageIdImpl`, the ledger ID and entry ID will belong to the `lastChunkMsgId`.
https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2791-L2814

https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java#L30-L33
### Modifications

Flow the common message acknowledge logic, ack all the chunks when acknowledging messages with transactions.

(cherry picked from commit f581417)
nborisov pushed a commit to nborisov/pulsar that referenced this pull request Nov 13, 2023
…on. (apache#21268)

### Motivation

Now, only the last chunk will be acknowledged when acknowledging chunk messages with transactions.
If the messageId is a `ChunkMessageIdImpl`, the ledger ID and entry ID will belong to the `lastChunkMsgId`.
https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2791-L2814

https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java#L30-L33
### Modifications

Flow the common message acknowledge logic, ack all the chunks when acknowledging messages with transactions.
nodece pushed a commit to ascentstream/pulsar that referenced this pull request Dec 2, 2023
…on. (apache#21268)

Now, only the last chunk will be acknowledged when acknowledging chunk messages with transactions.
If the messageId is a `ChunkMessageIdImpl`, the ledger ID and entry ID will belong to the `lastChunkMsgId`.
https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2791-L2814

https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java#L30-L33

Flow the common message acknowledge logic, ack all the chunks when acknowledging messages with transactions.

(cherry picked from commit f581417)
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 20, 2023
…on. (apache#21268)

### Motivation

Now, only the last chunk will be acknowledged when acknowledging chunk messages with transactions.
If the messageId is a `ChunkMessageIdImpl`, the ledger ID and entry ID will belong to the `lastChunkMsgId`.
https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2791-L2814

https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java#L30-L33
### Modifications

Flow the common message acknowledge logic, ack all the chunks when acknowledging messages with transactions.

(cherry picked from commit f581417)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 20, 2023
…on. (apache#21268)

### Motivation

Now, only the last chunk will be acknowledged when acknowledging chunk messages with transactions.
If the messageId is a `ChunkMessageIdImpl`, the ledger ID and entry ID will belong to the `lastChunkMsgId`.
https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L2791-L2814

https://github.com/apache/pulsar/blob/2b5c199053a5b2d7f849e6604d619bae9197a8c9/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ChunkMessageIdImpl.java#L30-L33
### Modifications

Flow the common message acknowledge logic, ack all the chunks when acknowledging messages with transactions.

(cherry picked from commit f581417)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants