Skip to content

Commit

Permalink
Merge 9f0b5cc into 35a15b6
Browse files Browse the repository at this point in the history
  • Loading branch information
panzhi33 committed Jul 20, 2021
2 parents 35a15b6 + 9f0b5cc commit 658dacc
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -495,10 +495,10 @@ public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
}

@Override
public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
return this.defaultMQAdminExtImpl.getAllTopicGroup(brokerAddr, timeoutMillis);
return this.defaultMQAdminExtImpl.getAllTopicConfig(brokerAddr, timeoutMillis);
}

/* (non-Javadoc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
}

@Override
public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr, timeoutMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException;

TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.rocketmq.tools.command.consumer.ConsumerProgressSubCommand;
import org.apache.rocketmq.tools.command.consumer.ConsumerStatusSubCommand;
import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand;
import org.apache.rocketmq.tools.command.consumer.ExportSubscriptionCommand;
import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
Expand All @@ -72,6 +73,7 @@
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand;
import org.apache.rocketmq.tools.command.topic.ExportTopicCommand;
import org.apache.rocketmq.tools.command.topic.TopicClusterSubCommand;
import org.apache.rocketmq.tools.command.topic.TopicListSubCommand;
import org.apache.rocketmq.tools.command.topic.TopicRouteSubCommand;
Expand Down Expand Up @@ -213,6 +215,9 @@ public static void initCommand() {
initCommand(new ClusterAclConfigVersionListSubCommand());
initCommand(new UpdateGlobalWhiteAddrSubCommand());
initCommand(new GetAccessConfigSubCommand());

initCommand(new ExportSubscriptionCommand());
initCommand(new ExportTopicCommand());
}

private static void initLogback() throws JoranException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.consumer;

import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;

public class ExportSubscriptionCommand implements SubCommand {
@Override
public String commandName() {
return "exportSubscription";
}

@Override
public String commandDesc() {
return "export subscription.csv";
}

@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("c", "clusterName", true, "choose a cluster to export");
opt.setRequired(true);
options.addOption(opt);

opt = new Option("f", "filePath", true, "export subscription.csv path | default /tmp/rocketmq/config");
opt.setRequired(false);
options.addOption(opt);
return options;
}

@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);

defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
defaultMQAdminExt.start();

String clusterName = commandLine.getOptionValue('c').trim();
String filePath = !commandLine.hasOption('f') ? "/tmp/rocketmq/config" : commandLine.getOptionValue('f')
.trim();

Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
ConcurrentMap<String, SubscriptionGroupConfig> configMap = new ConcurrentHashMap<>();

for (String addr : masterSet) {
SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getAllSubscriptionGroup(
addr, 10000);
for (Map.Entry<String, SubscriptionGroupConfig> entry : subscriptionGroupWrapper
.getSubscriptionGroupTable().entrySet()) {
if (!MixAll.isSysConsumerGroup(entry.getKey())) {
SubscriptionGroupConfig subscriptionGroupConfig = configMap.get(entry.getKey());
if (null != subscriptionGroupConfig) {
entry.getValue().setRetryQueueNums(
subscriptionGroupConfig.getRetryQueueNums() + entry.getValue().getRetryQueueNums());
}
configMap.put(entry.getKey(), entry.getValue());
}
}
}

StringBuilder subConfigStr = new StringBuilder(
"groupName,consumeEnable,consumeFromMinEnable,consumeBroadcastEnable,retryQueueNums,retryMaxTimes,"
+ "brokerId,whichBrokerWhenConsumeSlowly,notifyConsumerIdsChangedEnable\r\n");

for (Map.Entry<String, SubscriptionGroupConfig> entry : configMap.entrySet()) {
subConfigStr.append(entry.getValue().getGroupName());
subConfigStr.append(",");
subConfigStr.append(entry.getValue().isConsumeEnable());
subConfigStr.append(",");
subConfigStr.append(entry.getValue().isConsumeFromMinEnable());
subConfigStr.append(",");
subConfigStr.append(entry.getValue().isConsumeBroadcastEnable());
subConfigStr.append(",");
subConfigStr.append(entry.getValue().getRetryQueueNums());
subConfigStr.append(",");
subConfigStr.append(entry.getValue().getRetryMaxTimes());
subConfigStr.append(",");
subConfigStr.append(entry.getValue().getBrokerId());
subConfigStr.append(",");
subConfigStr.append(entry.getValue().getWhichBrokerWhenConsumeSlowly());
subConfigStr.append(",");
subConfigStr.append(entry.getValue().isNotifyConsumerIdsChangedEnable());
subConfigStr.append("\r\n");
}

String path = filePath + "/subscription.csv";

write2CSV(subConfigStr.toString(), path);

System.out.printf("export %s success", path);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}

private void write2CSV(final String subConfigStr, final String path) throws IOException {
FileWriter fileWriter = null;

try {
fileWriter = new FileWriter(path);
fileWriter.write(subConfigStr);
} catch (IOException e) {
throw e;
} finally {
if (fileWriter != null) {
fileWriter.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.topic;

import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;

public class ExportTopicCommand implements SubCommand {
@Override
public String commandName() {
return "exportTopic";
}

@Override
public String commandDesc() {
return "export topic.csv";
}

@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("c", "clusterName", true, "choose a cluster to export");
opt.setRequired(true);
options.addOption(opt);

opt = new Option("f", "filePath", true, "export topic.csv path | default /tmp/rocketmq/config");
opt.setRequired(false);
options.addOption(opt);
return options;
}

@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);

defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));

try {
defaultMQAdminExt.start();

String clusterName = commandLine.getOptionValue('c').trim();
String filePath = !commandLine.hasOption('f') ? "/tmp/rocketmq/config" : commandLine.getOptionValue('f')
.trim();

Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);

ConcurrentMap<String, TopicConfig> topicConfigMap = new ConcurrentHashMap<>();
for (String addr : masterSet) {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getAllTopicConfig(
addr, 10000);
for (Map.Entry<String, TopicConfig> entry : topicConfigSerializeWrapper.getTopicConfigTable()
.entrySet()) {
if (!TopicValidator.isSystemTopic(entry.getKey()) && !entry.getKey().startsWith(
MixAll.RETRY_GROUP_TOPIC_PREFIX) && !entry.getKey().startsWith(
MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
TopicConfig topicConfig = topicConfigMap.get(entry.getKey());
if (null != topicConfig) {
entry.getValue().setWriteQueueNums(
topicConfig.getWriteQueueNums() + entry.getValue().getWriteQueueNums());
entry.getValue().setReadQueueNums(
topicConfig.getReadQueueNums() + entry.getValue().getReadQueueNums());
}
topicConfigMap.put(entry.getKey(), entry.getValue());
}
}
}

StringBuilder topicConfigStr = new StringBuilder(
"topicName,readQueueNums,writeQueueNums,perm,order,topicFilterType,topicSysFlag\r\n");
for (Map.Entry<String, TopicConfig> entry : topicConfigMap.entrySet()) {
topicConfigStr.append(entry.getValue().getTopicName());
topicConfigStr.append(",");
topicConfigStr.append(entry.getValue().getReadQueueNums());
topicConfigStr.append(",");
topicConfigStr.append(entry.getValue().getWriteQueueNums());
topicConfigStr.append(",");
topicConfigStr.append(entry.getValue().getPerm());
topicConfigStr.append(",");
topicConfigStr.append(entry.getValue().isOrder());
topicConfigStr.append(",");
topicConfigStr.append(entry.getValue().getTopicFilterType());
topicConfigStr.append(",");
topicConfigStr.append(entry.getValue().getTopicSysFlag());
topicConfigStr.append("\r\n");
}

String path = filePath + "/topic.csv";

write2CSV(topicConfigStr.toString(), path);

System.out.printf("export %s success", path);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}

private void write2CSV(final String topicConfigStr, final String path) throws IOException {
FileWriter fileWriter = null;

try {
fileWriter = new FileWriter(path);
fileWriter.write(topicConfigStr);
} catch (IOException e) {
throw e;
} finally {
if (fileWriter != null) {
fileWriter.close();
}
}
}
}

0 comments on commit 658dacc

Please sign in to comment.