From 318eb5d03df20b50f044244c69db484f8b23fae6 Mon Sep 17 00:00:00 2001 From: yanglibo Date: Wed, 20 Feb 2019 19:31:26 +0800 Subject: [PATCH 1/4] Add test case for CommitLog.handleHA --- .../org/apache/rocketmq/store/HATest.java | 149 ++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 store/src/test/java/org/apache/rocketmq/store/HATest.java diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java new file mode 100644 index 00000000000..be4984dbadf --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.store.config.BrokerRole; +import org.apache.rocketmq.store.config.FlushDiskType; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.ha.HAService; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * HATest + * + * @author yanglibo@qccr.com + * @version HATest.java 2019年01月14日 17:34:31 + */ +public class HATest { + private final String StoreMessage = "Once, there was a chance for me!"; + private int QUEUE_TOTAL = 100; + private AtomicInteger QueueId = new AtomicInteger(0); + private SocketAddress BornHost; + private SocketAddress StoreHost; + private byte[] MessageBody; + + private MessageStore messageStore; + private MessageStore slaveMessageStore; + private MessageStoreConfig masterMessageStoreConfig; + private MessageStoreConfig slaveStoreConfig; + private BrokerStatsManager brokerStatsManager = new BrokerStatsManager("simpleTest"); + private String storePathRootDir = System.getProperty("user.home") + File.separator + "store"; + @Before + public void init() throws Exception { + StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); + BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); + masterMessageStoreConfig = new MessageStoreConfig(); + masterMessageStoreConfig.setBrokerRole(BrokerRole.SYNC_MASTER); + masterMessageStoreConfig.setStorePathRootDir(storePathRootDir+File.separator+"master"); + masterMessageStoreConfig.setStorePathCommitLog(storePathRootDir+File.separator+"master"+ File.separator+"commitlog"); + buildMessageStoreConfig(masterMessageStoreConfig); + slaveStoreConfig = new MessageStoreConfig(); + slaveStoreConfig.setBrokerRole(BrokerRole.SLAVE); + slaveStoreConfig.setStorePathRootDir(storePathRootDir+File.separator+"slave"); + slaveStoreConfig.setStorePathCommitLog(storePathRootDir+File.separator+"slave"+ File.separator+"commitlog"); + slaveStoreConfig.setHaListenPort(10943); + buildMessageStoreConfig(slaveStoreConfig); + messageStore = buildMessageStore(masterMessageStoreConfig,0L); + slaveMessageStore = buildMessageStore(slaveStoreConfig,1L); + boolean load = messageStore.load(); + boolean slaveLoad = slaveMessageStore.load(); + slaveMessageStore.updateHaMasterAddress("127.0.0.1:10912"); + assertTrue(load); + assertTrue(slaveLoad); + messageStore.start(); + slaveMessageStore.start(); + Thread.sleep(6000L);//because the haClient will wait 5s after the first connectMaster failed,sleep 6s + } + + @Test + public void testHandleHA() throws Exception{ + long totalMsgs = 10; + QUEUE_TOTAL = 1; + MessageBody = StoreMessage.getBytes(); + for (long i = 0; i < totalMsgs; i++) { + messageStore.putMessage(buildMessage()); + } + + Thread.sleep(1000L);//sleep 1000 ms + for (long i = 0; i < totalMsgs; i++) { + GetMessageResult result = slaveMessageStore.getMessage("GROUP_A", "FooBar", 0, i, 1024 * 1024, null); + assertThat(result).isNotNull(); + assertTrue(GetMessageStatus.FOUND.equals(result.getStatus())); + result.release(); + } + } + + @After + public void destroy() throws Exception{ + Thread.sleep(5000L); + slaveMessageStore.shutdown(); + slaveMessageStore.destroy(); + messageStore.shutdown(); + messageStore.destroy(); + File file = new File(storePathRootDir); + UtilAll.deleteFile(file); + } + + private MessageStore buildMessageStore(MessageStoreConfig messageStoreConfig,long brokerId) throws Exception { + BrokerConfig brokerConfig = new BrokerConfig(); + brokerConfig.setBrokerId(brokerId); + return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, null, brokerConfig); + } + + private void buildMessageStoreConfig(MessageStoreConfig messageStoreConfig){ + messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10); + messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 1024 * 10); + messageStoreConfig.setMaxHashSlotNum(10000); + messageStoreConfig.setMaxIndexNum(100 * 100); + messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); + messageStoreConfig.setFlushIntervalConsumeQueue(1); + } + + private MessageExtBrokerInner buildMessage() { + MessageExtBrokerInner msg = new MessageExtBrokerInner(); + msg.setTopic("FooBar"); + msg.setTags("TAG1"); + msg.setBody(MessageBody); + msg.setKeys(String.valueOf(System.currentTimeMillis())); + msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); + msg.setSysFlag(0); + msg.setBornTimestamp(System.currentTimeMillis()); + msg.setStoreHost(StoreHost); + msg.setBornHost(BornHost); + return msg; + } + +} From bcbe0c04fa1db8c9eb7ef3a8318252ddb1027158 Mon Sep 17 00:00:00 2001 From: yanglibo Date: Thu, 21 Feb 2019 17:02:47 +0800 Subject: [PATCH 2/4] Add one UUID parent path for RocketMQ's files --- store/src/test/java/org/apache/rocketmq/store/HATest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java index be4984dbadf..49ab5b9b93f 100644 --- a/store/src/test/java/org/apache/rocketmq/store/HATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java @@ -34,6 +34,7 @@ import java.net.SocketAddress; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -60,7 +61,9 @@ public class HATest { private MessageStoreConfig masterMessageStoreConfig; private MessageStoreConfig slaveStoreConfig; private BrokerStatsManager brokerStatsManager = new BrokerStatsManager("simpleTest"); - private String storePathRootDir = System.getProperty("user.home") + File.separator + "store"; + private String storePathRootParentDir = System.getProperty("user.home") + File.separator + + UUID.randomUUID().toString().replace("-", ""); + private String storePathRootDir = storePathRootParentDir + File.separator + "store"; @Before public void init() throws Exception { StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); @@ -113,7 +116,7 @@ public void destroy() throws Exception{ slaveMessageStore.destroy(); messageStore.shutdown(); messageStore.destroy(); - File file = new File(storePathRootDir); + File file = new File(storePathRootParentDir); UtilAll.deleteFile(file); } From 529cb44df615aa45ea2fb7b513d07690ffc0df18 Mon Sep 17 00:00:00 2001 From: yanglibo Date: Thu, 21 Feb 2019 18:01:00 +0800 Subject: [PATCH 3/4] remove the auth name --- store/src/test/java/org/apache/rocketmq/store/HATest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java index 49ab5b9b93f..0a166d9551b 100644 --- a/store/src/test/java/org/apache/rocketmq/store/HATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java @@ -45,8 +45,6 @@ /** * HATest * - * @author yanglibo@qccr.com - * @version HATest.java 2019年01月14日 17:34:31 */ public class HATest { private final String StoreMessage = "Once, there was a chance for me!"; From dba6ba741c427cdc913ff81129d14e2b5353c3cf Mon Sep 17 00:00:00 2001 From: ylb Date: Thu, 21 Feb 2019 21:36:48 +0800 Subject: [PATCH 4/4] sleep 3s before getMessage --- .../test/java/org/apache/rocketmq/store/ConsumeQueueTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java index 7e01b8513c9..59bd90405dc 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java @@ -212,6 +212,7 @@ public void dispatch(DispatchRequest request) { try { try { putMsg(master); + Thread.sleep(3000L);//wait ConsumeQueue create success. } catch (Exception e) { e.printStackTrace(); assertThat(Boolean.FALSE).isTrue();