From 0d0d32e524b4d4d2c67f62607a0ae5bcbe3944d8 Mon Sep 17 00:00:00 2001 From: lindzh Date: Wed, 13 Dec 2017 17:13:20 +0800 Subject: [PATCH 1/4] update retry topic and dlq topic queue nums --- .../processor/SendMessageProcessor.java | 17 +++- .../consumer/UpdateSubGroupSubCommand.java | 78 ++++++++++++++++++- 2 files changed, 90 insertions(+), 5 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 227a23e6b49..428e5d2ac76 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -133,7 +133,7 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin } String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); - int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums(); + int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % fixedCandidateQueueNums(newTopic, subscriptionGroupConfig.getRetryQueueNums()); int topicSysFlag = 0; if (requestHeader.isUnitMode()) { @@ -179,7 +179,7 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); - queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; + queueIdInt = Math.abs(this.random.nextInt() % 99999999) % fixedCandidateQueueNums(newTopic, DLQ_NUMS_PER_GROUP); topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, @@ -246,6 +246,17 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin return response; } + private int fixedCandidateQueueNums(String topic, int defaultQueueNums) { + if (topic == null) { + return defaultQueueNums; + } + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic); + if (topicConfig != null) { + return topicConfig.getWriteQueueNums(); + } + return defaultQueueNums; + } + private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, RemotingCommand request, MessageExt msg, TopicConfig topicConfig) { @@ -268,7 +279,7 @@ private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, Remoti int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); if (reconsumeTimes >= maxReconsumeTimes) { newTopic = MixAll.getDLQTopic(groupName); - int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; + int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % fixedCandidateQueueNums(newTopic, DLQ_NUMS_PER_GROUP); topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java index 7316526be01..a229c4ef8a6 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java @@ -16,12 +16,23 @@ */ package org.apache.rocketmq.tools.command.consumer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil; @@ -89,6 +100,32 @@ public Options buildCommandlineOptions(Options options) { return options; } + + private void updateRetryTopicQueueNums(DefaultMQAdminExt defaultMQAdminExt, String topic, String addr, Map brokerAddrMap, Map queueDataMap, int retryQueueNums) { + BrokerData brokerData = brokerAddrMap.get(addr); + if (brokerAddrMap == null) { + return; + } + + QueueData queueData = queueDataMap.get(brokerData.getBrokerName()); + if (queueData == null) { + return; + } + + if (retryQueueNums != queueData.getWriteQueueNums()) { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topic); + topicConfig.setPerm(queueData.getPerm()); + topicConfig.setWriteQueueNums(retryQueueNums); + topicConfig.setReadQueueNums(retryQueueNums); + try { + defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); + } catch (Exception e) { + System.out.print("update subscription retry topic " + topic + " queue nums to " + addr + " failed\r\n"); + } + } + } + @Override public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException { @@ -98,6 +135,7 @@ public void execute(final CommandLine commandLine, final Options options, try { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + int defaultRetryQueueNums = subscriptionGroupConfig.getRetryQueueNums(); subscriptionGroupConfig.setConsumeBroadcastEnable(false); subscriptionGroupConfig.setConsumeFromMinEnable(false); @@ -151,12 +189,45 @@ public void execute(final CommandLine commandLine, final Options options, .getOptionValue('a').trim())); } + + boolean needCheckAndUpdate = false; + String topic = MixAll.getRetryTopic(subscriptionGroupConfig.getGroupName()); + HashMap brokerAddrMap = new HashMap<>(); + HashMap queueDataMap = new HashMap<>(); + int retryQueueNums = subscriptionGroupConfig.getRetryQueueNums(); + if (retryQueueNums != defaultRetryQueueNums) { + needCheckAndUpdate = true; + TopicRouteData topicRouteData = null; + try { + topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic); + } catch (Exception e) { + System.out.print("get subscription retry topic route info null " + e.getClass() + ":" + e.getMessage() + "\r\n"); + } + if (topicRouteData != null) { + List brokerDatas = topicRouteData.getBrokerDatas(); + for (BrokerData brokerData : brokerDatas) { + brokerAddrMap.put(brokerData.getBrokerAddrs().get(MixAll.MASTER_ID), brokerData); + } + + List queueDatas = topicRouteData.getQueueDatas(); + for (QueueData queueData : queueDatas) { + String brokerName = queueData.getBrokerName(); + queueDataMap.put(brokerName, queueData); + } + } + } + if (commandLine.hasOption('b')) { String addr = commandLine.getOptionValue('b').trim(); defaultMQAdminExt.start(); defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig); + + if(needCheckAndUpdate){ + updateRetryTopicQueueNums(defaultMQAdminExt,topic,addr,brokerAddrMap,queueDataMap,subscriptionGroupConfig.getRetryQueueNums()); + } + System.out.printf("create subscription group to %s success.%n", addr); System.out.printf("%s", subscriptionGroupConfig); return; @@ -165,12 +236,15 @@ public void execute(final CommandLine commandLine, final Options options, String clusterName = commandLine.getOptionValue('c').trim(); defaultMQAdminExt.start(); - Set masterSet = - CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); for (String addr : masterSet) { try { defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig); System.out.printf("create subscription group to %s success.%n", addr); + + if(needCheckAndUpdate){ + updateRetryTopicQueueNums(defaultMQAdminExt,topic,addr,brokerAddrMap,queueDataMap,subscriptionGroupConfig.getRetryQueueNums()); + } } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000 * 1); From dc55f443ef2ede292b7cd9ca2f9900c67c520196 Mon Sep 17 00:00:00 2001 From: lindzh Date: Wed, 13 Dec 2017 17:34:46 +0800 Subject: [PATCH 2/4] add auto retry topic queue nums config --- .../broker/subscription/SubscriptionGroupManager.java | 1 + .../java/org/apache/rocketmq/common/BrokerConfig.java | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index 0cbb76172ca..254d4bfc0a0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -123,6 +123,7 @@ public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) { if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) { subscriptionGroupConfig = new SubscriptionGroupConfig(); subscriptionGroupConfig.setGroupName(group); + subscriptionGroupConfig.setRetryQueueNums(brokerController.getBrokerConfig().getAutoCreateRetryTopicQueueNums()); SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig); if (null == preConfig) { log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString()); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index c344a7ce6d9..87896df978c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -131,6 +131,8 @@ public class BrokerConfig { private boolean filterSupportRetry = false; private boolean enablePropertyFilter = false; + private int autoCreateRetryTopicQueueNums = 1; + public boolean isTraceOn() { return traceOn; } @@ -580,4 +582,12 @@ public boolean isEnablePropertyFilter() { public void setEnablePropertyFilter(boolean enablePropertyFilter) { this.enablePropertyFilter = enablePropertyFilter; } + + public int getAutoCreateRetryTopicQueueNums() { + return autoCreateRetryTopicQueueNums; + } + + public void setAutoCreateRetryTopicQueueNums(int autoCreateRetryTopicQueueNums) { + this.autoCreateRetryTopicQueueNums = autoCreateRetryTopicQueueNums; + } } From 81dc274d12477186d764a9f560339e99576af6e1 Mon Sep 17 00:00:00 2001 From: lindzh Date: Thu, 28 Dec 2017 14:58:36 +0800 Subject: [PATCH 3/4] add test case for update subgroup retry queue nums --- .../processor/SendMessageProcessor.java | 3 - .../consumer/UpdateSubGroupSubCommand.java | 26 ++-- .../UpdateSubGroupSubCommandTest.java | 113 ++++++++++++++++++ 3 files changed, 131 insertions(+), 11 deletions(-) create mode 100644 tools/src/test/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommandTest.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 428e5d2ac76..899fe95ec94 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -247,9 +247,6 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin } private int fixedCandidateQueueNums(String topic, int defaultQueueNums) { - if (topic == null) { - return defaultQueueNums; - } TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic); if (topicConfig != null) { return topicConfig.getWriteQueueNums(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java index a229c4ef8a6..50761a9aa57 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.Set; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; @@ -41,6 +42,8 @@ public class UpdateSubGroupSubCommand implements SubCommand { + private DefaultMQAdminExt defaultMQAdminExt; + @Override public String commandName() { return "updateSubGroup"; @@ -126,12 +129,19 @@ private void updateRetryTopicQueueNums(DefaultMQAdminExt defaultMQAdminExt, Stri } } + private DefaultMQAdminExt getMQAdminExt(RPCHook rpcHook) { + if (this.defaultMQAdminExt != null) { + return defaultMQAdminExt; + } + defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + return defaultMQAdminExt; + } + @Override public void execute(final CommandLine commandLine, final Options options, - RPCHook rpcHook) throws SubCommandException { - DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); - - defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + RPCHook rpcHook) throws SubCommandException { + DefaultMQAdminExt defaultMQAdminExt = getMQAdminExt(rpcHook); try { SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); @@ -224,8 +234,8 @@ public void execute(final CommandLine commandLine, final Options options, defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig); - if(needCheckAndUpdate){ - updateRetryTopicQueueNums(defaultMQAdminExt,topic,addr,brokerAddrMap,queueDataMap,subscriptionGroupConfig.getRetryQueueNums()); + if (needCheckAndUpdate) { + updateRetryTopicQueueNums(defaultMQAdminExt, topic, addr, brokerAddrMap, queueDataMap, subscriptionGroupConfig.getRetryQueueNums()); } System.out.printf("create subscription group to %s success.%n", addr); @@ -242,8 +252,8 @@ public void execute(final CommandLine commandLine, final Options options, defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig); System.out.printf("create subscription group to %s success.%n", addr); - if(needCheckAndUpdate){ - updateRetryTopicQueueNums(defaultMQAdminExt,topic,addr,brokerAddrMap,queueDataMap,subscriptionGroupConfig.getRetryQueueNums()); + if (needCheckAndUpdate) { + updateRetryTopicQueueNums(defaultMQAdminExt, topic, addr, brokerAddrMap, queueDataMap, subscriptionGroupConfig.getRetryQueueNums()); } } catch (Exception e) { e.printStackTrace(); diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommandTest.java new file mode 100644 index 00000000000..499952c7de8 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommandTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tools.command.consumer; + + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; + +@RunWith(MockitoJUnitRunner.class) +public class UpdateSubGroupSubCommandTest { + + @Mock + private DefaultMQAdminExt defaultMQAdminExt; + + @Mock + private CommandUtil commandUtil; + + private String consumerGroup = "consumer1"; + + private TopicConfig topicConfig; + + @Before + public void init() throws Exception { + TopicRouteData topicRouteData = new TopicRouteData(); + ArrayList brokerDatas = new ArrayList(); + + BrokerData brokerData = new BrokerData(); + brokerData.setBrokerName("broker1"); + brokerData.setCluster("cluster1"); + brokerData.setBrokerAddrs(new HashMap()); + brokerData.getBrokerAddrs().put(MixAll.MASTER_ID, "192.168.1.21:10911"); + + brokerDatas.add(brokerData); + + topicRouteData.setBrokerDatas(brokerDatas); + + ArrayList queueDatas = new ArrayList<>(); + QueueData queueData = new QueueData(); + queueData.setBrokerName("broker1"); + queueData.setPerm(6); + queueData.setReadQueueNums(1); + queueData.setWriteQueueNums(1); + queueDatas.add(queueData); + topicRouteData.setQueueDatas(queueDatas); + + Mockito.when(defaultMQAdminExt.examineTopicRouteInfo(Mockito.anyString())).thenReturn(topicRouteData); + Mockito.doNothing().when(defaultMQAdminExt).start(); + Mockito.doNothing().when(defaultMQAdminExt).shutdown(); + Mockito.doNothing().when(defaultMQAdminExt).createAndUpdateSubscriptionGroupConfig(Mockito.anyString(), Mockito.any(SubscriptionGroupConfig.class)); + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + topicConfig = (TopicConfig) invocationOnMock.getArguments()[1]; + return null; + } + }).when(defaultMQAdminExt).createAndUpdateTopicConfig(Mockito.anyString(), Mockito.any(TopicConfig.class)); + } + + @Test + public void testExec() throws SubCommandException, NoSuchFieldException, IllegalAccessException { + UpdateSubGroupSubCommand updateSubGroupSubCommand = new UpdateSubGroupSubCommand(); + Field defaultMQAdminExtMethod = UpdateSubGroupSubCommand.class.getDeclaredField("defaultMQAdminExt"); + defaultMQAdminExtMethod.setAccessible(true); + defaultMQAdminExtMethod.set(updateSubGroupSubCommand, defaultMQAdminExt); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[]{"-g " + consumerGroup, "-q 8", "-b 192.168.1.21:10911"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + updateSubGroupSubCommand.commandName(), subargs, updateSubGroupSubCommand.buildCommandlineOptions(options), new PosixParser()); + updateSubGroupSubCommand.execute(commandLine, options, null); + Assert.assertEquals(topicConfig.getWriteQueueNums(), 8); + } + + +} From 7fef3baa056dedbb2c4784c2eeb65243d9716989 Mon Sep 17 00:00:00 2001 From: lindzh Date: Thu, 28 Dec 2017 15:27:04 +0800 Subject: [PATCH 4/4] fix valid import --- .../tools/command/consumer/UpdateSubGroupSubCommand.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java index 50761a9aa57..04729a7daa4 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java @@ -24,8 +24,6 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; -import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.protocol.route.BrokerData; @@ -33,7 +31,6 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil;