Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fixes batch_size not checked in MessageId#fromByteArrayWithTopic #18405

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -155,8 +155,14 @@ public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName)

MessageId messageId;
if (idData.hasBatchIndex()) {
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAcker.newAcker(idData.getBatchSize()));
if (idData.hasBatchSize()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any case in that we will have a batch index but no batch size?
Maybe we should throw an exception due to it should be the case that corrupted data is provided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch_size field was introduced from #8659, before that, the batch_index field already existed. For example, the 2.7.1 client could send such a message id, see #18395 (comment).

Other languages clients might also sent a message id that has the batch_index field but no batch_size field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And when I revisited the existing code, there are many places that set the batch_index field but doesn't set the batch_size field. The batch_size field is easily to be ignored because it's not considered in equals and hashCode methods. BTW, I'm not sure whether the default value of the batch_index field might increase the cases that batch_index is set while batch_size is not set. IMO, adding default values to the optional fields increases the chance of such an error. We should be careful for new fields if it's optional.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui FWIW we even have already done it for fromByteArray (as this patch modifies).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BewareMyPower So you may simplify the code as fromByteArray did:

            if (idData.hasBatchSize()) {
                messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
                    idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAcker.newAcker(idData.getBatchSize()));
            } else {
                messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
                    idData.getBatchIndex());
            }

That is, a simpler constructor. Or factor it out to a helper method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to refactor these two classes in another PR. IMO, BatchMessageIdImpl should be deprecated and I'm not willing to add new public APIs to it. The design of BatchMessageIdImpl is bad. We should add the optional batch_size and batch_index fields to MessageIdImpl instead. If we followed the design of BatchMessageIdImpl, when batch_size was introduced, we should add another class like BatchMessageIdImplWithSize that inherits the BatchMessageIdImpl and add the batch_size field as well as the acker.

messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
idData.getBatchIndex(), idData.getBatchSize(),
BatchMessageAcker.newAcker(idData.getBatchSize()));
} else {
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
idData.getBatchIndex(), 0, BatchMessageAckerDisabled.INSTANCE);
}
} else {
messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition());
}
Expand Down
Expand Up @@ -16,29 +16,38 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import java.io.IOException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class MessageIdSerializationTest {

@Test
public void testProtobufSerialization1() throws Exception {
MessageId id = new MessageIdImpl(1, 2, 3);
byte[] serializedId = id.toByteArray();
assertEquals(MessageId.fromByteArray(serializedId), id);
assertEquals(MessageId.fromByteArrayWithTopic(serializedId, "my-topic"), id);
}

@Test
public void testProtobufSerialization2() throws Exception {
MessageId id = new MessageIdImpl(1, 2, -1);
byte[] serializedId = id.toByteArray();
assertEquals(MessageId.fromByteArray(serializedId), id);
assertEquals(MessageId.fromByteArrayWithTopic(serializedId, "my-topic"), id);
}

@Test
public void testBatchSizeNotSet() throws Exception {
MessageId id = new BatchMessageIdImpl(1L, 2L, 3, 4, -1,
BatchMessageAckerDisabled.INSTANCE);
byte[] serialized = id.toByteArray();
assertEquals(MessageId.fromByteArray(serialized), id);
assertEquals(MessageId.fromByteArrayWithTopic(serialized, "my-topic"), id);
}

@Test(expectedExceptions = NullPointerException.class)
Expand Down