Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1ae0693
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 11, 2017
f0e243c
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 11, 2017
1810be4
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 14, 2017
53dcd8d
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 22, 2017
4abfa4f
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 28, 2017
d576e38
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Aug 30, 2017
bb446c4
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Sep 21, 2017
11d40c2
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Sep 25, 2017
ae5c41e
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Oct 11, 2017
89dbf04
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Oct 12, 2017
bc1c880
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Oct 25, 2017
48d93e8
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Dec 7, 2017
962dfbf
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Dec 13, 2017
0d0d32e
update retry topic and dlq topic queue nums
lindzh Dec 13, 2017
dc55f44
add auto retry topic queue nums config
lindzh Dec 13, 2017
844f0a9
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Dec 14, 2017
cc212d9
Merge branch 'develop' into fix_retry_dlq_nums
lindzh Dec 14, 2017
5567377
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Dec 18, 2017
c1f503c
Merge branch 'develop' of github.com:apache/incubator-rocketmq into d…
lindzh Dec 20, 2017
2d696c0
Merge branch 'develop' into fix_retry_dlq_nums
lindzh Dec 27, 2017
81dc274
add test case for update subgroup retry queue nums
lindzh Dec 28, 2017
7fef3ba
fix valid import
lindzh Dec 28, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin
}

String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % fixedCandidateQueueNums(newTopic, subscriptionGroupConfig.getRetryQueueNums());

int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
Expand Down Expand Up @@ -179,7 +179,7 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % fixedCandidateQueueNums(newTopic, DLQ_NUMS_PER_GROUP);

topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
Expand Down Expand Up @@ -246,6 +246,14 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin
return response;
}

private int fixedCandidateQueueNums(String topic, int defaultQueueNums) {
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic);
if (topicConfig != null) {
return topicConfig.getWriteQueueNums();
}
return defaultQueueNums;
}

private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
RemotingCommand request,
MessageExt msg, TopicConfig topicConfig) {
Expand All @@ -268,7 +276,7 @@ private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, Remoti
int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
if (reconsumeTimes >= maxReconsumeTimes) {
newTopic = MixAll.getDLQTopic(groupName);
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % fixedCandidateQueueNums(newTopic, DLQ_NUMS_PER_GROUP);
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(group);
subscriptionGroupConfig.setRetryQueueNums(brokerController.getBrokerConfig().getAutoCreateRetryTopicQueueNums());
SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);
if (null == preConfig) {
log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
Expand Down
10 changes: 10 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ public class BrokerConfig {
private boolean filterSupportRetry = false;
private boolean enablePropertyFilter = false;

private int autoCreateRetryTopicQueueNums = 1;

public boolean isTraceOn() {
return traceOn;
}
Expand Down Expand Up @@ -598,4 +600,12 @@ public boolean isEnablePropertyFilter() {
public void setEnablePropertyFilter(boolean enablePropertyFilter) {
this.enablePropertyFilter = enablePropertyFilter;
}

public int getAutoCreateRetryTopicQueueNums() {
return autoCreateRetryTopicQueueNums;
}

public void setAutoCreateRetryTopicQueueNums(int autoCreateRetryTopicQueueNums) {
this.autoCreateRetryTopicQueueNums = autoCreateRetryTopicQueueNums;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,19 @@
*/
package org.apache.rocketmq.tools.command.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
Expand All @@ -30,6 +39,8 @@

public class UpdateSubGroupSubCommand implements SubCommand {

private DefaultMQAdminExt defaultMQAdminExt;

@Override
public String commandName() {
return "updateSubGroup";
Expand Down Expand Up @@ -89,15 +100,49 @@ public Options buildCommandlineOptions(Options options) {
return options;
}

@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);

private void updateRetryTopicQueueNums(DefaultMQAdminExt defaultMQAdminExt, String topic, String addr, Map<String, BrokerData> brokerAddrMap, Map<String, QueueData> queueDataMap, int retryQueueNums) {
BrokerData brokerData = brokerAddrMap.get(addr);
if (brokerAddrMap == null) {
return;
}

QueueData queueData = queueDataMap.get(brokerData.getBrokerName());
if (queueData == null) {
return;
}

if (retryQueueNums != queueData.getWriteQueueNums()) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setTopicName(topic);
topicConfig.setPerm(queueData.getPerm());
topicConfig.setWriteQueueNums(retryQueueNums);
topicConfig.setReadQueueNums(retryQueueNums);
try {
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
} catch (Exception e) {
System.out.print("update subscription retry topic " + topic + " queue nums to " + addr + " failed\r\n");
}
}
}

private DefaultMQAdminExt getMQAdminExt(RPCHook rpcHook) {
if (this.defaultMQAdminExt != null) {
return defaultMQAdminExt;
}
defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
return defaultMQAdminExt;
}

@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = getMQAdminExt(rpcHook);

try {
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
int defaultRetryQueueNums = subscriptionGroupConfig.getRetryQueueNums();
subscriptionGroupConfig.setConsumeBroadcastEnable(false);
subscriptionGroupConfig.setConsumeFromMinEnable(false);

Expand Down Expand Up @@ -151,12 +196,45 @@ public void execute(final CommandLine commandLine, final Options options,
.getOptionValue('a').trim()));
}


boolean needCheckAndUpdate = false;
String topic = MixAll.getRetryTopic(subscriptionGroupConfig.getGroupName());
HashMap<String, BrokerData> brokerAddrMap = new HashMap<>();
HashMap<String, QueueData> queueDataMap = new HashMap<>();
int retryQueueNums = subscriptionGroupConfig.getRetryQueueNums();
if (retryQueueNums != defaultRetryQueueNums) {
needCheckAndUpdate = true;
TopicRouteData topicRouteData = null;
try {
topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
} catch (Exception e) {
System.out.print("get subscription retry topic route info null " + e.getClass() + ":" + e.getMessage() + "\r\n");
}
if (topicRouteData != null) {
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
for (BrokerData brokerData : brokerDatas) {
brokerAddrMap.put(brokerData.getBrokerAddrs().get(MixAll.MASTER_ID), brokerData);
}

List<QueueData> queueDatas = topicRouteData.getQueueDatas();
for (QueueData queueData : queueDatas) {
String brokerName = queueData.getBrokerName();
queueDataMap.put(brokerName, queueData);
}
}
}

if (commandLine.hasOption('b')) {
String addr = commandLine.getOptionValue('b').trim();

defaultMQAdminExt.start();

defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig);

if (needCheckAndUpdate) {
updateRetryTopicQueueNums(defaultMQAdminExt, topic, addr, brokerAddrMap, queueDataMap, subscriptionGroupConfig.getRetryQueueNums());
}

System.out.printf("create subscription group to %s success.%n", addr);
System.out.printf("%s", subscriptionGroupConfig);
return;
Expand All @@ -165,12 +243,15 @@ public void execute(final CommandLine commandLine, final Options options,
String clusterName = commandLine.getOptionValue('c').trim();

defaultMQAdminExt.start();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
try {
defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig);
System.out.printf("create subscription group to %s success.%n", addr);

if (needCheckAndUpdate) {
updateRetryTopicQueueNums(defaultMQAdminExt, topic, addr, brokerAddrMap, queueDataMap, subscriptionGroupConfig.getRetryQueueNums());
}
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000 * 1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.tools.command.consumer;


import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;

@RunWith(MockitoJUnitRunner.class)
public class UpdateSubGroupSubCommandTest {

@Mock
private DefaultMQAdminExt defaultMQAdminExt;

@Mock
private CommandUtil commandUtil;

private String consumerGroup = "consumer1";

private TopicConfig topicConfig;

@Before
public void init() throws Exception {
TopicRouteData topicRouteData = new TopicRouteData();
ArrayList<BrokerData> brokerDatas = new ArrayList<BrokerData>();

BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("broker1");
brokerData.setCluster("cluster1");
brokerData.setBrokerAddrs(new HashMap<Long, String>());
brokerData.getBrokerAddrs().put(MixAll.MASTER_ID, "192.168.1.21:10911");

brokerDatas.add(brokerData);

topicRouteData.setBrokerDatas(brokerDatas);

ArrayList<QueueData> queueDatas = new ArrayList<>();
QueueData queueData = new QueueData();
queueData.setBrokerName("broker1");
queueData.setPerm(6);
queueData.setReadQueueNums(1);
queueData.setWriteQueueNums(1);
queueDatas.add(queueData);
topicRouteData.setQueueDatas(queueDatas);

Mockito.when(defaultMQAdminExt.examineTopicRouteInfo(Mockito.anyString())).thenReturn(topicRouteData);
Mockito.doNothing().when(defaultMQAdminExt).start();
Mockito.doNothing().when(defaultMQAdminExt).shutdown();
Mockito.doNothing().when(defaultMQAdminExt).createAndUpdateSubscriptionGroupConfig(Mockito.anyString(), Mockito.any(SubscriptionGroupConfig.class));
Mockito.doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
topicConfig = (TopicConfig) invocationOnMock.getArguments()[1];
return null;
}
}).when(defaultMQAdminExt).createAndUpdateTopicConfig(Mockito.anyString(), Mockito.any(TopicConfig.class));
}

@Test
public void testExec() throws SubCommandException, NoSuchFieldException, IllegalAccessException {
UpdateSubGroupSubCommand updateSubGroupSubCommand = new UpdateSubGroupSubCommand();
Field defaultMQAdminExtMethod = UpdateSubGroupSubCommand.class.getDeclaredField("defaultMQAdminExt");
defaultMQAdminExtMethod.setAccessible(true);
defaultMQAdminExtMethod.set(updateSubGroupSubCommand, defaultMQAdminExt);
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[]{"-g " + consumerGroup, "-q 8", "-b 192.168.1.21:10911"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + updateSubGroupSubCommand.commandName(), subargs, updateSubGroupSubCommand.buildCommandlineOptions(options), new PosixParser());
updateSubGroupSubCommand.execute(commandLine, options, null);
Assert.assertEquals(topicConfig.getWriteQueueNums(), 8);
}


}