From 13f561ba87ecfe062613ac63e23249627d2cf40e Mon Sep 17 00:00:00 2001 From: zhangjidi2016 <1017543663@qq.com> Date: Thu, 3 Jun 2021 11:00:13 +0800 Subject: [PATCH] [ISSUE #2969] Add a skip accumulation message command in mqadmin. --- .../tools/command/MQAdminStartup.java | 2 + .../offset/SkipAccumulationSubCommand.java | 126 ++++++++++++++++++ .../offset/SkipAccumulationCommandTest.java | 77 +++++++++++ 3 files changed, 205 insertions(+) create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java create mode 100644 tools/src/test/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationCommandTest.java diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index f9477445bea..f70cb6f3df1 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -68,6 +68,7 @@ import org.apache.rocketmq.tools.command.namesrv.WipeWritePermSubCommand; import org.apache.rocketmq.tools.command.offset.CloneGroupOffsetCommand; import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand; +import org.apache.rocketmq.tools.command.offset.SkipAccumulationSubCommand; import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand; import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand; import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand; @@ -186,6 +187,7 @@ public static void initCommand() { initCommand(new WipeWritePermSubCommand()); initCommand(new ResetOffsetByTimeCommand()); + initCommand(new SkipAccumulationSubCommand()); initCommand(new UpdateOrderConfCommand()); initCommand(new CleanExpiredCQSubCommand()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java new file mode 100644 index 00000000000..95ab7e81a99 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.offset; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.admin.RollbackStats; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class SkipAccumulationSubCommand implements SubCommand { + + @Override + public String commandName() { + return "skipAccumulatedMessage"; + } + + @Override + public String commandDesc() { + return "Skip all messages that are accumulated (not consumed) currently"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("g", "group", true, "set the consumer group"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("t", "topic", true, "set the topic"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("f", "force", true, "set the force rollback by timestamp switch[true|false]"); + opt.setRequired(false); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { + long timestamp = -1; + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + String group = commandLine.getOptionValue("g").trim(); + String topic = commandLine.getOptionValue("t").trim(); + boolean force = true; + if (commandLine.hasOption('f')) { + force = Boolean.valueOf(commandLine.getOptionValue("f").trim()); + } + + defaultMQAdminExt.start(); + Map offsetTable; + try { + offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force); + } catch (MQClientException e) { + if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) { + List rollbackStatsList = defaultMQAdminExt.resetOffsetByTimestampOld(group, topic, timestamp, force); + System.out.printf("%-20s %-20s %-20s %-20s %-20s %-20s%n", + "#brokerName", + "#queueId", + "#brokerOffset", + "#consumerOffset", + "#timestampOffset", + "#rollbackOffset" + ); + + for (RollbackStats rollbackStats : rollbackStatsList) { + System.out.printf("%-20s %-20d %-20d %-20d %-20d %-20d%n", + UtilAll.frontStringAtLeast(rollbackStats.getBrokerName(), 32), + rollbackStats.getQueueId(), + rollbackStats.getBrokerOffset(), + rollbackStats.getConsumerOffset(), + rollbackStats.getTimestampOffset(), + rollbackStats.getRollbackOffset() + ); + } + return; + } + throw e; + } + + System.out.printf("%-40s %-40s %-40s%n", + "#brokerName", + "#queueId", + "#offset"); + + Iterator> iterator = offsetTable.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + System.out.printf("%-40s %-40d %-40d%n", + UtilAll.frontStringAtLeast(entry.getKey().getBrokerName(), 32), + entry.getKey().getQueueId(), + entry.getValue()); + } + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationCommandTest.java new file mode 100644 index 00000000000..38c23a3e857 --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationCommandTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.offset; + +import java.lang.reflect.Field; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +public class SkipAccumulationCommandTest { + private static DefaultMQAdminExt defaultMQAdminExt; + private static DefaultMQAdminExtImpl defaultMQAdminExtImpl; + private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); + private static MQClientAPIImpl mQClientAPIImpl; + + @BeforeClass + public static void init() throws Exception { + mQClientAPIImpl = mock(MQClientAPIImpl.class); + defaultMQAdminExt = new DefaultMQAdminExt(); + defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000); + + Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance"); + field.setAccessible(true); + field.set(defaultMQAdminExtImpl, mqClientInstance); + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mqClientInstance, mQClientAPIImpl); + field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl"); + field.setAccessible(true); + field.set(defaultMQAdminExt, defaultMQAdminExtImpl); + } + + @AfterClass + public static void terminate() { + defaultMQAdminExt.shutdown(); + } + + @Ignore + @Test + public void testExecute() throws SubCommandException { + System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876"); + SkipAccumulationSubCommand cmd = new SkipAccumulationSubCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-g group-test", "-t topic-test", "-f false"}; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); + cmd.execute(commandLine, options, null); + } +}