diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java index e0a3b699230..94ebe4f1268 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -54,7 +54,9 @@ public static void main(String[] args) { public static BrokerController start(BrokerController controller) { try { + controller.start(); + String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", " + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); @@ -242,7 +244,7 @@ private static void properties2SystemEnv(Properties properties) { System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup); } - public static Options buildCommandlineOptions(final Options options) { + private static Options buildCommandlineOptions(final Options options) { Option opt = new Option("c", "configFile", true, "Broker config properties file"); opt.setRequired(false); options.addOption(opt); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java index 2f94de20c34..64c28ece3e6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java @@ -70,7 +70,7 @@ public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit // by tags code. if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { - if (tagsCode == null || tagsCode < 0L) { + if (tagsCode == null) { return true; } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java index a5ad3ac39e2..c8da08d28df 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java @@ -25,6 +25,8 @@ public class BrokerStartupTest { + private String storePathRootDir = "."; + @Test public void testProperties2SystemEnv() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { @@ -36,5 +38,4 @@ public void testProperties2SystemEnv() throws NoSuchMethodException, InvocationT method.invoke(null, properties); Assert.assertEquals("value", System.getProperty("rocketmq.namesrv.domain")); } - } \ No newline at end of file diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java index 7978942e5bb..e544d90a124 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java @@ -24,12 +24,14 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.store.CommitLogDispatcher; +import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.MessageArrivingListener; import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; @@ -77,24 +79,17 @@ public class MessageStoreWithFilterTest { try { StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); } catch (UnknownHostException e) { - e.printStackTrace(); } try { BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); } catch (UnknownHostException e) { - e.printStackTrace(); } } @Before - public void init() { + public void init() throws Exception { filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic); - try { - master = gen(filterManager); - } catch (Exception e) { - e.printStackTrace(); - assertThat(true).isFalse(); - } + master = gen(filterManager); } @After @@ -107,7 +102,7 @@ public void destroy() { public MessageExtBrokerInner buildMessage() { MessageExtBrokerInner msg = new MessageExtBrokerInner(); msg.setTopic(topic); - msg.setTags("TAG1"); + msg.setTags(System.currentTimeMillis() + "TAG"); msg.setKeys("Hello"); msg.setBody(msgBody); msg.setKeys(String.valueOf(System.currentTimeMillis())); @@ -125,7 +120,7 @@ public MessageExtBrokerInner buildMessage() { } public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize, - boolean enableCqExt, int cqExtFileSize) { + boolean enableCqExt, int cqExtFileSize) { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize); messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize); @@ -155,9 +150,7 @@ protected DefaultMessageStore gen(ConsumerFilterManager filterManager) throws Ex new MessageArrivingListener() { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, - long msgStoreTime, byte[] filterBitMap, Map properties) { -// System.out.println(String.format("Msg coming: %s, %d, %d, %d", -// topic, queueId, logicOffset, tagsCode)); + long msgStoreTime, byte[] filterBitMap, Map properties) { } } , brokerConfig); @@ -166,8 +159,6 @@ public void arriving(String topic, int queueId, long logicOffset, long tagsCode, @Override public void dispatch(DispatchRequest request) { try { -// System.out.println(String.format("offset:%d, bitMap:%s", request.getCommitLogOffset(), -// BitsArray.create(request.getBitMap()).toString())); } catch (Throwable e) { e.printStackTrace(); } @@ -183,7 +174,7 @@ public void dispatch(DispatchRequest request) { } protected List putMsg(DefaultMessageStore master, int topicCount, - int msgCountPerTopic) throws Exception { + int msgCountPerTopic) throws Exception { List msgs = new ArrayList(); for (int i = 0; i < topicCount; i++) { String realTopic = topic + i; @@ -229,22 +220,10 @@ protected List filtered(List msgs, } @Test - public void testGetMessage_withFilterBitMapAndConsumerChanged() { - List msgs = null; - try { - msgs = putMsg(master, topicCount, msgPerTopic); - } catch (Exception e) { - e.printStackTrace(); - assertThat(true).isFalse(); - } + public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception { + List msgs = putMsg(master, topicCount, msgPerTopic); - // sleep to wait for consume queue has been constructed. - try { - Thread.sleep(200); - } catch (InterruptedException e) { - e.printStackTrace(); - assertThat(true).isFalse(); - } + Thread.sleep(200); // reset consumer; String topic = "topic" + 0; @@ -303,16 +282,10 @@ public void testGetMessage_withFilterBitMapAndConsumerChanged() { } @Test - public void testGetMessage_withFilterBitMap() { - List msgs = null; - try { - msgs = putMsg(master, topicCount, msgPerTopic); - // sleep to wait for consume queue has been constructed. - Thread.sleep(200); - } catch (Exception e) { - e.printStackTrace(); - assertThat(true).isFalse(); - } + public void testGetMessage_withFilterBitMap() throws Exception { + List msgs = putMsg(master, topicCount, msgPerTopic); + + Thread.sleep(100); for (int i = 0; i < topicCount; i++) { String realTopic = topic + i; @@ -369,4 +342,32 @@ public void testGetMessage_withFilterBitMap() { } } } + + @Test + public void testGetMessage_withFilter_checkTagsCode() throws Exception { + putMsg(master, topicCount, msgPerTopic); + + Thread.sleep(200); + + for (int i = 0; i < topicCount; i++) { + String realTopic = topic + i; + + GetMessageResult getMessageResult = master.getMessage("test", realTopic, queueId, 0, 10000, + new MessageFilter() { + @Override + public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) { + if (tagsCode != null && tagsCode <= ConsumeQueueExt.MAX_ADDR) { + return false; + } + return true; + } + + @Override + public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map properties) { + return true; + } + }); + assertThat(getMessageResult.getMessageCount()).isEqualTo(msgPerTopic); + } + } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java index f49d2b31ffe..a8ee0e08ed5 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java @@ -32,7 +32,6 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.namesrv.NamesrvConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; -import org.apache.rocketmq.remoting.netty.NettySystemConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.srvutil.ShutdownHookThread; @@ -49,15 +48,6 @@ public static void main(String[] args) { public static NamesrvController main0(String[] args) { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); - - if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { - NettySystemConfig.socketSndbufSize = 4096; - } - - if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { - NettySystemConfig.socketRcvbufSize = 4096; - } - try { //PackageConflictDetect.detectFastjson(); diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 379162d7ddb..4922e3d97b5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -446,6 +446,13 @@ private boolean putMessagePositionInfo(final long offset, final int size, final if (cqOffset != 0) { long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); + + if (expectLogicOffset < currentLogicOffset) { + log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", + expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset); + return true; + } + if (expectLogicOffset != currentLogicOffset) { LOG_ERROR.warn( "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", @@ -569,6 +576,6 @@ protected boolean isExtWriteEnable() { * Check {@code tagsCode} is address of extend file or tags code. */ public boolean isExtAddr(long tagsCode) { - return isExtReadEnable() && this.consumeQueueExt.isExtAddr(tagsCode); + return ConsumeQueueExt.isExtAddr(tagsCode); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java index a118cde73b7..aeb2803e23f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java @@ -95,7 +95,7 @@ public ConsumeQueueExt(final String topic, * Just test {@code address} is less than 0. *

*/ - public boolean isExtAddr(final long address) { + public static boolean isExtAddr(final long address) { return address <= MAX_ADDR; } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 95a017aee6f..59ef49045a2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -18,8 +18,10 @@ import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.FileLock; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -104,6 +106,10 @@ public class DefaultMessageStore implements MessageStore { private final LinkedList dispatcherList; + private RandomAccessFile lockFile; + + private FileLock lock; + public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { this.messageArrivingListener = messageArrivingListener; @@ -138,6 +144,10 @@ public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final Br this.dispatcherList = new LinkedList<>(); this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); + + File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir())); + MappedFile.ensureDirOK(file.getParent()); + lockFile = new RandomAccessFile(file, "rw"); } public void truncateDirtyLogicFiles(long phyOffset) { @@ -196,6 +206,15 @@ public boolean load() { * @throws Exception */ public void start() throws Exception { + + lock = lockFile.getChannel().tryLock(0, 1, false); + if (lock == null || lock.isShared() || !lock.isValid()) { + throw new RuntimeException("Lock failed,MQ already started"); + } + + lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes())); + lockFile.getChannel().force(true); + this.flushConsumeQueueService.start(); this.commitLog.start(); this.storeStatsService.start(); @@ -254,6 +273,14 @@ public void shutdown() { } this.transientStorePool.destroy(); + + if (lockFile != null && lock != null) { + try { + lock.release(); + lockFile.close(); + } catch (IOException e) { + } + } } public void destroy() { @@ -487,7 +514,7 @@ public GetMessageResult getMessage(final String group, final String topic, final break; } - boolean extRet = false; + boolean extRet = false, isTagsCodeLegal = true; if (consumeQueue.isExtAddr(tagsCode)) { extRet = consumeQueue.getExt(tagsCode, cqExtUnit); if (extRet) { @@ -496,11 +523,12 @@ public GetMessageResult getMessage(final String group, final String topic, final // can't find ext content.Client will filter messages by tag also. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}", tagsCode, offsetPy, sizePy, topic, group); + isTagsCodeLegal = false; } } if (messageFilter != null - && !messageFilter.isMatchedByConsumeQueue(tagsCode, extRet ? cqExtUnit : null)) { + && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java index ef1d670a2b8..ccd76c4f092 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java @@ -40,6 +40,10 @@ public static String getAbortFile(final String rootDir) { return rootDir + File.separator + "abort"; } + public static String getLockFile(final String rootDir) { + return rootDir + File.separator + "lock"; + } + public static String getDelayOffsetStorePath(final String rootDir) { return rootDir + File.separator + "config" + File.separator + "delayOffset.json"; } diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java index b03f2fce7a0..b7d38f8c78f 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java @@ -17,22 +17,21 @@ package org.apache.rocketmq.store; -import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.apache.rocketmq.store.stats.BrokerStatsManager; -import org.junit.Test; - import java.io.File; +import java.lang.reflect.Method; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Map; - +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.stats.BrokerStatsManager; import static org.assertj.core.api.Assertions.assertThat; +import org.junit.Test; public class ConsumeQueueTest { @@ -131,6 +130,65 @@ protected void putMsg(DefaultMessageStore master) throws Exception { } } + protected void deleteDirectory(String rootPath) { + File file = new File(rootPath); + deleteFile(file); + } + + protected void deleteFile(File file) { + File[] subFiles = file.listFiles(); + if (subFiles != null) { + for (File sub : subFiles) { + deleteFile(sub); + } + } + + file.delete(); + } + + @Test + public void testPutMessagePositionInfo_buildCQRepeatedly() throws Exception { + DefaultMessageStore messageStore = null; + try { + + messageStore = gen(); + + int totalMessages = 10; + + for (int i = 0; i < totalMessages; i++) { + putMsg(messageStore); + } + Thread.sleep(5); + + ConsumeQueue cq = messageStore.getConsumeQueueTable().get(topic).get(queueId); + Method method = cq.getClass().getDeclaredMethod("putMessagePositionInfo", long.class, int.class, long.class, long.class); + + assertThat(method).isNotNull(); + + method.setAccessible(true); + + SelectMappedBufferResult result = messageStore.getCommitLog().getData(0); + assertThat(result != null).isTrue(); + + DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(), false, false); + + assertThat(cq).isNotNull(); + + Object dispatchResult = method.invoke(cq, dispatchRequest.getCommitLogOffset(), + dispatchRequest.getMsgSize(), dispatchRequest.getTagsCode(), dispatchRequest.getConsumeQueueOffset()); + + assertThat(Boolean.parseBoolean(dispatchResult.toString())).isTrue(); + + } finally { + if (messageStore != null) { + messageStore.shutdown(); + messageStore.destroy(); + } + deleteDirectory(storePath); + } + + } + @Test public void testConsumeQueueWithExtendData() { DefaultMessageStore master = null; diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 6e37b705f74..9269cdfa772 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -21,6 +21,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.channels.OverlappingFileLockException; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -56,6 +57,31 @@ public void init() throws Exception { messageStore.start(); } + @Test(expected = OverlappingFileLockException.class) + public void test_repate_restart() throws Exception { + long totalMsgs = 100; + QUEUE_TOTAL = 1; + MessageBody = StoreMessage.getBytes(); + + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8); + messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4); + messageStoreConfig.setMaxHashSlotNum(100); + messageStoreConfig.setMaxIndexNum(100 * 10); + MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig()); + + boolean load = master.load(); + assertTrue(load); + + try { + master.start(); + master.start(); + } finally { + master.shutdown(); + master.destroy(); + } + } + @After public void destory() { messageStore.shutdown(); @@ -164,7 +190,7 @@ public void testPullSize() throws Exception { private class MyMessageArrivingListener implements MessageArrivingListener { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, - byte[] filterBitMap, Map properties) { + byte[] filterBitMap, Map properties) { } } }