From 0076cdb715600f9c567118cefef76bd6d1a6dcd9 Mon Sep 17 00:00:00 2001 From: dongeforever Date: Fri, 10 Feb 2017 18:10:09 +0800 Subject: [PATCH] [ROCKETMQ-91] Reduce lock granularity for putMessage --- .../org/apache/rocketmq/store/CommitLog.java | 214 +++++++++++------- .../rocketmq/store/MessageExtBrokerInner.java | 11 + 2 files changed, 149 insertions(+), 76 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 7841febf708..d2678bc4acc 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -59,6 +59,8 @@ public class CommitLog { private final AppendMessageCallback appendMessageCallback; private final ThreadLocal batchEncoderThreadLocal; + + private final ThreadLocal messageEncoderThreadLocal; private HashMap topicQueueTable = new HashMap(1024); private volatile long confirmOffset = -1L; @@ -88,6 +90,13 @@ public CommitLog(final DefaultMessageStore defaultMessageStore) { return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); } }; + + this.messageEncoderThreadLocal = new ThreadLocal() { + @Override + protected MessageEncoder initialValue() { + return new MessageEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); + } + }; } public boolean load() { @@ -577,6 +586,11 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) { MappedFile unlockMappedFile = null; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); + //maybe need to wrap the exception + ByteBuffer encodedBuffer = messageEncoderThreadLocal.get().encode(msg); + + msg.setEncodedBuffer(encodedBuffer); + lockForPutMessage(); //spin... try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); @@ -1220,9 +1234,7 @@ public ByteBuffer getMsgStoreItemMemory() { return msgStoreItemMemory; } - public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, - final MessageExtBrokerInner msgInner) { - // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
+ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) { // PHY OFFSET long wroteOffset = fileFromOffset + byteBuffer.position(); @@ -1256,33 +1268,10 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer default: break; } - - /** - * Serialize message - */ - final byte[] propertiesData = - msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); - - final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; - - if (propertiesLength > Short.MAX_VALUE) { - log.warn("putMessage message properties length too long. length={}", propertiesData.length); - return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); - } - - final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); - final int topicLength = topicData.length; - - final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; - - final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength); - - // Exceeds the maximum message - if (msgLen > this.maxMessageSize) { - CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength - + ", maxMessageSize: " + this.maxMessageSize); - return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); - } + ByteBuffer messageByteBuff = msgInner.getEncodedBuffer(); + messageByteBuff.mark(); //just for mapped file changes + final int msgPos = messageByteBuff.position(); + final int msgLen = messageByteBuff.getInt(); // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { @@ -1294,6 +1283,7 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer // 3 The remaining space may be any value // + messageByteBuff.reset(); // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); @@ -1301,54 +1291,15 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } - // Initialization of storage space - this.resetByteBuffer(msgStoreItemMemory, msgLen); - // 1 TOTALSIZE - this.msgStoreItemMemory.putInt(msgLen); - // 2 MAGICCODE - this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); - // 3 BODYCRC - this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); - // 4 QUEUEID - this.msgStoreItemMemory.putInt(msgInner.getQueueId()); - // 5 FLAG - this.msgStoreItemMemory.putInt(msgInner.getFlag()); - // 6 QUEUEOFFSET - this.msgStoreItemMemory.putLong(queueOffset); - // 7 PHYSICALOFFSET - this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); - // 8 SYSFLAG - this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); - // 9 BORNTIMESTAMP - this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); - // 10 BORNHOST - this.resetByteBuffer(hostHolder, 8); - this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder)); - // 11 STORETIMESTAMP - this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); - // 12 STOREHOSTADDRESS - this.resetByteBuffer(hostHolder, 8); - this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder)); - //this.msgBatchMemory.put(msgInner.getStoreHostBytes()); - // 13 RECONSUMETIMES - this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); - // 14 Prepared Transaction Offset - this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); - // 15 BODY - this.msgStoreItemMemory.putInt(bodyLength); - if (bodyLength > 0) - this.msgStoreItemMemory.put(msgInner.getBody()); - // 16 TOPIC - this.msgStoreItemMemory.put((byte) topicLength); - this.msgStoreItemMemory.put(topicData); - // 17 PROPERTIES - this.msgStoreItemMemory.putShort((short) propertiesLength); - if (propertiesLength > 0) - this.msgStoreItemMemory.put(propertiesData); + messageByteBuff.position(msgPos + 20); + messageByteBuff.putLong(queueOffset); + messageByteBuff.putLong(wroteOffset); + final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); - // Write messages to the queue buffer - byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); + + byteBuffer.put(messageByteBuff.array(), 0, msgLen); + AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); @@ -1451,6 +1402,117 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer return result; } + + private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { + byteBuffer.flip(); + byteBuffer.limit(limit); + } + } + + + public static class MessageEncoder { + // File at the end of the minimum fixed length empty + private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4; + private final ByteBuffer msgIdMemory; + // Store the message content + private final ByteBuffer msgStoreItemMemory; + // The maximum length of the message + private final int maxMessageSize; + // Build Message Key + private final StringBuilder keyBuilder = new StringBuilder(); + + private final ByteBuffer hostHolder = ByteBuffer.allocate(8); + + MessageEncoder(final int size) { + this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH); + this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH); + this.maxMessageSize = size; + } + + public ByteBuffer getMsgStoreItemMemory() { + return msgStoreItemMemory; + } + + public ByteBuffer encode(final MessageExtBrokerInner msgInner) { + + /** + * Serialize message + */ + final byte[] propertiesData = + msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); + + final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; + + if (propertiesLength > Short.MAX_VALUE) { + log.warn("putMessage message properties length too long. length={}", propertiesData.length); + throw new RuntimeException("PROPERTIES_SIZE_EXCEEDED"); + } + + final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); + final int topicLength = topicData.length; + + final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; + + final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength); + + // Exceeds the maximum message + if (msgLen > this.maxMessageSize) { + CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + + ", maxMessageSize: " + this.maxMessageSize); + throw new RuntimeException("MESSAGE_SIZE_EXCEEDED"); + } + + // Initialization of storage space + this.resetByteBuffer(msgStoreItemMemory, msgLen); + // 1 TOTALSIZE + this.msgStoreItemMemory.putInt(msgLen); + // 2 MAGICCODE + this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); + // 3 BODYCRC + this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); + // 4 QUEUEID + this.msgStoreItemMemory.putInt(msgInner.getQueueId()); + // 5 FLAG + this.msgStoreItemMemory.putInt(msgInner.getFlag()); + // 6 QUEUEOFFSET + this.msgStoreItemMemory.putLong(0); + // 7 PHYSICALOFFSET + this.msgStoreItemMemory.putLong(0); + // 8 SYSFLAG + this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); + // 9 BORNTIMESTAMP + this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); + // 10 BORNHOST + this.resetByteBuffer(hostHolder, 8); + this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder)); + // 11 STORETIMESTAMP + this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); + // 12 STOREHOSTADDRESS + this.resetByteBuffer(hostHolder, 8); + this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder)); + //this.msgBatchMemory.put(msgInner.getStoreHostBytes()); + // 13 RECONSUMETIMES + this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); + // 14 Prepared Transaction Offset + this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); + // 15 BODY + this.msgStoreItemMemory.putInt(bodyLength); + if (bodyLength > 0) + this.msgStoreItemMemory.put(msgInner.getBody()); + // 16 TOPIC + this.msgStoreItemMemory.put((byte) topicLength); + this.msgStoreItemMemory.put(topicData); + // 17 PROPERTIES + this.msgStoreItemMemory.putShort((short) propertiesLength); + if (propertiesLength > 0) + this.msgStoreItemMemory.put(propertiesData); + + return ByteBuffer.wrap(this.msgStoreItemMemory.array(), 0, msgLen); + + + } + + private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { byteBuffer.flip(); byteBuffer.limit(limit); diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java index c7879af0645..39b34edfbec 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.store; +import java.nio.ByteBuffer; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.message.MessageExt; @@ -24,6 +25,8 @@ public class MessageExtBrokerInner extends MessageExt { private String propertiesString; private long tagsCode; + private ByteBuffer encodedBuffer; + public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) { if (null == tags || tags.length() == 0) return 0; @@ -46,4 +49,12 @@ public long getTagsCode() { public void setTagsCode(long tagsCode) { this.tagsCode = tagsCode; } + + public ByteBuffer getEncodedBuffer() { + return encodedBuffer; + } + + public void setEncodedBuffer(ByteBuffer encodedBuffer) { + this.encodedBuffer = encodedBuffer; + } }