Skip to content

Commit

Permalink
[ISSUE#4468] Optimize broker buffer length initialization (#4469)
Browse files Browse the repository at this point in the history
* optimize broker buffer length initialization

* delete constructor with MaxMessageSize parameter in commitLog

* add junit test testEncodeLongMessage()

* Modify unit test to remove the implementation of reflection

* [maven-release-plugin] prepare release rocketmq-all-4.9.4

* [maven-release-plugin] prepare for next development iteration

* add a blank line to do a new CI test

* add some illegal message's junit test cases

* modify batch message test, using assertThrows

Co-authored-by: hill007299 <hill007299@126.com>
  • Loading branch information
shengminw and hill007299 committed Jun 22, 2022
1 parent 32733e7 commit da01deb
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 4 deletions.
19 changes: 15 additions & 4 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -1482,11 +1482,15 @@ public static class MessageExtEncoder {
private final ByteBuf byteBuf;
// The maximum length of the message body.
private final int maxMessageBodySize;

// The maximum length of the full message.
private final int maxMessageSize;
MessageExtEncoder(final int maxMessageBodySize) {
ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
byteBuf = alloc.directBuffer(maxMessageBodySize);
//Reserve 64kb for encoding buffer outside body
int maxMessageSize = maxMessageBodySize + 64 * 1024;
byteBuf = alloc.directBuffer(maxMessageSize);
this.maxMessageBodySize = maxMessageBodySize;
this.maxMessageSize = maxMessageSize;
}

protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
Expand All @@ -1511,13 +1515,20 @@ protected PutMessageResult encode(MessageExtBrokerInner msgInner) {

final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);

// Exceeds the maximum message
// Exceeds the maximum message body
if (bodyLength > this.maxMessageBodySize) {
CommitLog.log.warn("message body size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageBodySize);
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}

// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}

// 1 TOTALSIZE
this.byteBuf.writeInt(msgLen);
// 2 MAGICCODE
Expand Down Expand Up @@ -1575,7 +1586,7 @@ protected ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageCon
int totalLength = messagesByteBuff.limit();
if (totalLength > this.maxMessageBodySize) {
CommitLog.log.warn("message body size exceeded, msg body size: " + totalLength + ", maxMessageSize: " + this.maxMessageBodySize);
throw new RuntimeException("message size exceeded");
throw new RuntimeException("message body size exceeded");
}

// properties from MessageExtBatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import static org.apache.rocketmq.common.message.MessageDecoder.messageProperties2String;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

public class BatchPutMessageTest {
Expand Down Expand Up @@ -186,6 +187,31 @@ public void testPutIPv6HostMessages() throws Exception {
}

}
private String generateKey(StringBuilder keyBuilder, MessageExt messageExt) {
keyBuilder.setLength(0);
keyBuilder.append(messageExt.getTopic());
keyBuilder.append('-');
keyBuilder.append(messageExt.getQueueId());
return keyBuilder.toString();
}

@Test
public void testPutLongBatchMessage() throws Exception{
String topic = "batch-long-topic";
MessageStoreConfig messageStoreConfig = ((DefaultMessageStore) messageStore).getMessageStoreConfig();
CommitLog commitLog = ((DefaultMessageStore) messageStore).getCommitLog();
CommitLog.PutMessageThreadLocal putMessageThreadLocal = commitLog.getPutMessageThreadLocal().get();

MessageExtBatch messageExtBatch = new MessageExtBatch();
messageExtBatch.setBody(new byte[messageStoreConfig.getMaxMessageSize() + 1]);
messageExtBatch.setTopic(topic);
CommitLog.PutMessageContext putMessageContext = new CommitLog.PutMessageContext(generateKey(
putMessageThreadLocal.getKeyBuilder(), messageExtBatch));
RuntimeException runtimeException = assertThrows(RuntimeException.class,
() -> putMessageThreadLocal.getEncoder().encode(messageExtBatch, putMessageContext));
assertThat("message body size exceeded").isEqualTo(runtimeException.getMessage());
}


private int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
final int msgLen = 4 //TOTALSIZE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.io.RandomAccessFile;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetAddress;
Expand Down Expand Up @@ -657,6 +658,44 @@ public void testCleanUnusedLmqTopic() throws Exception {

}

@Test
public void testPutLongMessage() throws Exception{
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
CommitLog commitLog = ((DefaultMessageStore) messageStore).getCommitLog();
MessageStoreConfig messageStoreConfig = ((DefaultMessageStore) messageStore).getMessageStoreConfig();
CommitLog.PutMessageThreadLocal putMessageThreadLocal = commitLog.getPutMessageThreadLocal().get();

//body size, topic size, properties size exactly equal to max size
messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize()]);
messageExtBrokerInner.setTopic(new String(new byte[127]));
messageExtBrokerInner.setPropertiesString(new String(new byte[Short.MAX_VALUE]));
PutMessageResult encodeResult1 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
assertTrue(encodeResult1 == null);

//body size exactly more than max message body size
messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize() + 1]);
PutMessageResult encodeResult2 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
assertTrue(encodeResult2.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL);

//body size exactly equal to max message size
messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize() + 64 * 1024]);
PutMessageResult encodeResult3 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
assertTrue(encodeResult3.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL);

//message properties length more than properties maxSize
messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize()]);
messageExtBrokerInner.setPropertiesString(new String(new byte[Short.MAX_VALUE+1]));
PutMessageResult encodeResult4 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
assertTrue(encodeResult4.getPutMessageStatus() == PutMessageStatus.PROPERTIES_SIZE_EXCEEDED);

//message length more than buffer length capacity
messageExtBrokerInner.setBody(new byte[messageStoreConfig.getMaxMessageSize()]);
messageExtBrokerInner.setTopic(new String(new byte[Short.MAX_VALUE]));
messageExtBrokerInner.setPropertiesString(new String(new byte[Short.MAX_VALUE]));
PutMessageResult encodeResult5 = putMessageThreadLocal.getEncoder().encode(messageExtBrokerInner);
assertTrue(encodeResult5.getPutMessageStatus() == PutMessageStatus.MESSAGE_ILLEGAL);
}

private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
Expand Down

0 comments on commit da01deb

Please sign in to comment.