Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fixes batch_size not checked in MessageId#fromByteArrayWithTopic #18405

Merged

Conversation

BewareMyPower
Copy link
Contributor

Fixes #18395

Motivation

The old version Pulsar clients might not set the batch_size field in a batched message id, it will cause MessageId#fromByteArrayWithTopic, which only checks the batch_index field, fail with IllegalStateException.

Modifications

Check if the batch_size field exists in fromByteArrayWithTopic. If it doesn't exist, create the BatchMessageIdImpl instance with the default batch size (0) and the acker (disabled).

Move MessageIdSerializationTest to the pulsar-client module and add the testBatchSizeNotSet to verify the change works.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: BewareMyPower#7

…WithTopic

Fixes apache#18395

### Motivation

The old version Pulsar clients might not set the `batch_size` field in a
batched message id, it will cause `MessageId#fromByteArrayWithTopic`,
which only checks the `batch_index` field, fail with
IllegalStateException.

### Modifications

Check if the `batch_size` field exists in `fromByteArrayWithTopic`. If
it doesn't exist, create the `BatchMessageIdImpl` instance with the
default batch size (0) and the acker (disabled).

Move `MessageIdSerializationTest` to the `pulsar-client` module and add
the `testBatchSizeNotSet` to verify the change works.
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -155,8 +155,14 @@ public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName)

MessageId messageId;
if (idData.hasBatchIndex()) {
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAcker.newAcker(idData.getBatchSize()));
if (idData.hasBatchSize()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any case in that we will have a batch index but no batch size?
Maybe we should throw an exception due to it should be the case that corrupted data is provided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch_size field was introduced from #8659, before that, the batch_index field already existed. For example, the 2.7.1 client could send such a message id, see #18395 (comment).

Other languages clients might also sent a message id that has the batch_index field but no batch_size field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And when I revisited the existing code, there are many places that set the batch_index field but doesn't set the batch_size field. The batch_size field is easily to be ignored because it's not considered in equals and hashCode methods. BTW, I'm not sure whether the default value of the batch_index field might increase the cases that batch_index is set while batch_size is not set. IMO, adding default values to the optional fields increases the chance of such an error. We should be careful for new fields if it's optional.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui FWIW we even have already done it for fromByteArray (as this patch modifies).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BewareMyPower So you may simplify the code as fromByteArray did:

            if (idData.hasBatchSize()) {
                messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
                    idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAcker.newAcker(idData.getBatchSize()));
            } else {
                messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
                    idData.getBatchIndex());
            }

That is, a simpler constructor. Or factor it out to a helper method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to refactor these two classes in another PR. IMO, BatchMessageIdImpl should be deprecated and I'm not willing to add new public APIs to it. The design of BatchMessageIdImpl is bad. We should add the optional batch_size and batch_index fields to MessageIdImpl instead. If we followed the design of BatchMessageIdImpl, when batch_size was introduced, we should add another class like BatchMessageIdImplWithSize that inherits the BatchMessageIdImpl and add the batch_size field as well as the acker.

@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@codecov-commenter
Copy link

codecov-commenter commented Nov 11, 2022

Codecov Report

Merging #18405 (928e845) into master (b31c5a6) will increase coverage by 0.12%.
The diff coverage is 11.11%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #18405      +/-   ##
============================================
+ Coverage     46.98%   47.11%   +0.12%     
- Complexity    10343    10388      +45     
============================================
  Files           692      692              
  Lines         67766    67770       +4     
  Branches       7259     7259              
============================================
+ Hits          31842    31929      +87     
+ Misses        32344    32275      -69     
+ Partials       3580     3566      -14     
Flag Coverage Δ
unittests 47.11% <11.11%> (+0.12%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...va/org/apache/pulsar/broker/service/ServerCnx.java 50.05% <ø> (+1.36%) ⬆️
.../java/org/apache/pulsar/client/impl/ClientCnx.java 30.16% <ø> (ø)
...a/org/apache/pulsar/client/impl/TableViewImpl.java 0.00% <0.00%> (ø)
...ar/client/impl/conf/ProducerConfigurationData.java 84.70% <ø> (-0.18%) ⬇️
...a/org/apache/pulsar/client/impl/MessageIdImpl.java 80.55% <33.33%> (+18.05%) ⬆️
.../pulsar/broker/service/SharedConsumerAssignor.java 3.70% <0.00%> (-64.82%) ⬇️
...apache/pulsar/broker/service/EntryAndMetadata.java 0.00% <0.00%> (-40.75%) ⬇️
.../apache/pulsar/broker/loadbalance/LoadManager.java 61.11% <0.00%> (-16.67%) ⬇️
...rg/apache/pulsar/broker/lookup/v1/TopicLookup.java 60.00% <0.00%> (-13.34%) ⬇️
...roker/service/persistent/MessageDeduplication.java 46.28% <0.00%> (-8.30%) ⬇️
... and 53 more

Copy link
Member

@tisonkun tisonkun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the comments below are only on styles. +1 to merge.

@@ -155,8 +155,14 @@ public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName)

MessageId messageId;
if (idData.hasBatchIndex()) {
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAcker.newAcker(idData.getBatchSize()));
if (idData.hasBatchSize()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui FWIW we even have already done it for fromByteArray (as this patch modifies).

@@ -155,8 +155,14 @@ public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName)

MessageId messageId;
if (idData.hasBatchIndex()) {
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAcker.newAcker(idData.getBatchSize()));
if (idData.hasBatchSize()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BewareMyPower So you may simplify the code as fromByteArray did:

            if (idData.hasBatchSize()) {
                messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
                    idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAcker.newAcker(idData.getBatchSize()));
            } else {
                messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
                    idData.getBatchIndex());
            }

That is, a simpler constructor. Or factor it out to a helper method.

@BewareMyPower BewareMyPower merged commit 8246e3b into apache:master Nov 14, 2022
congbobo184 pushed a commit that referenced this pull request Nov 14, 2022
…WithTopic (#18405)

Fixes #18395

### Motivation

The old version Pulsar clients might not set the `batch_size` field in a
batched message id, it will cause `MessageId#fromByteArrayWithTopic`,
which only checks the `batch_index` field, fail with
IllegalStateException.

### Modifications

Check if the `batch_size` field exists in `fromByteArrayWithTopic`. If
it doesn't exist, create the `BatchMessageIdImpl` instance with the
default batch size (0) and the acker (disabled).

Move `MessageIdSerializationTest` to the `pulsar-client` module and add
the `testBatchSizeNotSet` to verify the change works.

(cherry picked from commit 8246e3b)
@congbobo184 congbobo184 added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Nov 14, 2022
congbobo184 pushed a commit that referenced this pull request Dec 7, 2022
…WithTopic (#18405)

Fixes #18395

### Motivation

The old version Pulsar clients might not set the `batch_size` field in a
batched message id, it will cause `MessageId#fromByteArrayWithTopic`,
which only checks the `batch_index` field, fail with
IllegalStateException.

### Modifications

Check if the `batch_size` field exists in `fromByteArrayWithTopic`. If
it doesn't exist, create the `BatchMessageIdImpl` instance with the
default batch size (0) and the acker (disabled).

Move `MessageIdSerializationTest` to the `pulsar-client` module and add
the `testBatchSizeNotSet` to verify the change works.

(cherry picked from commit 8246e3b)
liangyepianzhou pushed a commit that referenced this pull request Dec 14, 2022
…WithTopic (#18405)

Fixes #18395

### Motivation

The old version Pulsar clients might not set the `batch_size` field in a
batched message id, it will cause `MessageId#fromByteArrayWithTopic`,
which only checks the `batch_index` field, fail with
IllegalStateException.

### Modifications

Check if the `batch_size` field exists in `fromByteArrayWithTopic`. If
it doesn't exist, create the `BatchMessageIdImpl` instance with the
default batch size (0) and the acker (disabled).

Move `MessageIdSerializationTest` to the `pulsar-client` module and add
the `testBatchSizeNotSet` to verify the change works.

(cherry picked from commit 8246e3b)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Jan 10, 2023
…WithTopic (apache#18405)

Fixes apache#18395

### Motivation

The old version Pulsar clients might not set the `batch_size` field in a
batched message id, it will cause `MessageId#fromByteArrayWithTopic`,
which only checks the `batch_index` field, fail with
IllegalStateException.

### Modifications

Check if the `batch_size` field exists in `fromByteArrayWithTopic`. If
it doesn't exist, create the `BatchMessageIdImpl` instance with the
default batch size (0) and the acker (disabled).

Move `MessageIdSerializationTest` to the `pulsar-client` module and add
the `testBatchSizeNotSet` to verify the change works.

(cherry picked from commit 8246e3b)
(cherry picked from commit ca4bdf1)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Jan 11, 2023
…WithTopic (apache#18405)

Fixes apache#18395

### Motivation

The old version Pulsar clients might not set the `batch_size` field in a
batched message id, it will cause `MessageId#fromByteArrayWithTopic`,
which only checks the `batch_index` field, fail with
IllegalStateException.

### Modifications

Check if the `batch_size` field exists in `fromByteArrayWithTopic`. If
it doesn't exist, create the `BatchMessageIdImpl` instance with the
default batch size (0) and the acker (disabled).

Move `MessageIdSerializationTest` to the `pulsar-client` module and add
the `testBatchSizeNotSet` to verify the change works.

(cherry picked from commit 8246e3b)
(cherry picked from commit ca4bdf1)
@Technoboy- Technoboy- added this to the 3.0.0 milestone Feb 8, 2023
Technoboy- pushed a commit that referenced this pull request Feb 8, 2023
…WithTopic (#18405)

Fixes #18395

### Motivation

The old version Pulsar clients might not set the `batch_size` field in a
batched message id, it will cause `MessageId#fromByteArrayWithTopic`,
which only checks the `batch_index` field, fail with
IllegalStateException.

### Modifications

Check if the `batch_size` field exists in `fromByteArrayWithTopic`. If
it doesn't exist, create the `BatchMessageIdImpl` instance with the
default batch size (0) and the acker (disabled).

Move `MessageIdSerializationTest` to the `pulsar-client` module and add
the `testBatchSizeNotSet` to verify the change works.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] Pulsar client - MessageId.fromByteArrayWithTopic is not backward compatible from 2.7.2
8 participants