Skip to content

Commit

Permalink
[fix][client] Fixes batch_size not checked in MessageId#fromByteArray…
Browse files Browse the repository at this point in the history
…WithTopic (apache#18405)

Fixes apache#18395

### Motivation

The old version Pulsar clients might not set the `batch_size` field in a
batched message id, it will cause `MessageId#fromByteArrayWithTopic`,
which only checks the `batch_index` field, fail with
IllegalStateException.

### Modifications

Check if the `batch_size` field exists in `fromByteArrayWithTopic`. If
it doesn't exist, create the `BatchMessageIdImpl` instance with the
default batch size (0) and the acker (disabled).

Move `MessageIdSerializationTest` to the `pulsar-client` module and add
the `testBatchSizeNotSet` to verify the change works.

(cherry picked from commit 8246e3b)
(cherry picked from commit ca4bdf1)
  • Loading branch information
BewareMyPower authored and nicoloboschi committed Jan 11, 2023
1 parent 20a38ae commit b51e24f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
Expand Up @@ -155,8 +155,14 @@ public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName)

MessageId messageId;
if (idData.hasBatchIndex()) {
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
idData.getBatchIndex(), idData.getBatchSize(), BatchMessageAcker.newAcker(idData.getBatchSize()));
if (idData.hasBatchSize()) {
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
idData.getBatchIndex(), idData.getBatchSize(),
BatchMessageAcker.newAcker(idData.getBatchSize()));
} else {
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
idData.getBatchIndex(), 0, BatchMessageAckerDisabled.INSTANCE);
}
} else {
messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition());
}
Expand Down
Expand Up @@ -16,29 +16,38 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import java.io.IOException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class MessageIdSerializationTest {

@Test
public void testProtobufSerialization1() throws Exception {
MessageId id = new MessageIdImpl(1, 2, 3);
byte[] serializedId = id.toByteArray();
assertEquals(MessageId.fromByteArray(serializedId), id);
assertEquals(MessageId.fromByteArrayWithTopic(serializedId, "my-topic"), id);
}

@Test
public void testProtobufSerialization2() throws Exception {
MessageId id = new MessageIdImpl(1, 2, -1);
byte[] serializedId = id.toByteArray();
assertEquals(MessageId.fromByteArray(serializedId), id);
assertEquals(MessageId.fromByteArrayWithTopic(serializedId, "my-topic"), id);
}

@Test
public void testBatchSizeNotSet() throws Exception {
MessageId id = new BatchMessageIdImpl(1L, 2L, 3, 4, -1,
BatchMessageAckerDisabled.INSTANCE);
byte[] serialized = id.toByteArray();
assertEquals(MessageId.fromByteArray(serialized), id);
assertEquals(MessageId.fromByteArrayWithTopic(serialized, "my-topic"), id);
}

@Test(expectedExceptions = NullPointerException.class)
Expand Down

0 comments on commit b51e24f

Please sign in to comment.