From b7fb57106891b7838cf7dd666fc68790aa40e520 Mon Sep 17 00:00:00 2001 From: lindzh Date: Thu, 11 Jan 2018 14:24:59 +0800 Subject: [PATCH 1/3] add sendMessage command --- .../tools/command/MQAdminStartup.java | 2 + ...mmond.java => DecodeMessageIdCommand.java} | 2 +- .../command/message/SendMessageCommand.java | 151 ++++++++++++++++++ .../broker/SendMessageCommandTest.java | 117 ++++++++++++++ 4 files changed, 271 insertions(+), 1 deletion(-) rename tools/src/main/java/org/apache/rocketmq/tools/command/message/{DecodeMessageIdCommond.java => DecodeMessageIdCommand.java} (97%) create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java create mode 100644 tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMessageCommandTest.java diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index d3342e818dd..7d7fce875f6 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -54,6 +54,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByKeySubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand; import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand; +import org.apache.rocketmq.tools.command.message.SendMessageCommand; import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.UpdateKvConfigCommand; @@ -193,6 +194,7 @@ public static void initCommand() { initCommand(new GetBrokerConfigCommand()); initCommand(new QueryConsumeQueueCommand()); + initCommand(new SendMessageCommand()); } private static void initLogback() throws JoranException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommand.java similarity index 97% rename from tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java rename to tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommand.java index 532508c0b26..c676151b0b2 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommand.java @@ -25,7 +25,7 @@ import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; -public class DecodeMessageIdCommond implements SubCommand { +public class DecodeMessageIdCommand implements SubCommand { @Override public String commandName() { return "DecodeMessageId"; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java new file mode 100644 index 00000000000..8a94589757f --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tools.command.message; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class SendMessageCommand implements SubCommand { + + private DefaultMQProducer producer; + + @Override + public String commandName() { + return "sendMessage"; + } + + @Override + public String commandDesc() { + return "Send Message to a topic"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("t", "topic", true, "topic name"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("b", "body", true, "message body string utf-8 format"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("k", "key", true, "message keys"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("g", "tags", true, "message tags"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("q", "qbroker", true, "send message to which broker"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("i", "qid", true, "send message to which queue id"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + public DefaultMQProducer createProducer(RPCHook rpcHook) { + if (this.producer != null) { + return producer; + } else { + producer = new DefaultMQProducer(rpcHook); + producer.setProducerGroup(Long.toString(System.currentTimeMillis())); + return producer; + } + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { + Message msg = null; + String topic = commandLine.getOptionValue('t').trim(); + String body = commandLine.getOptionValue('b').trim(); + String tag = null; + String keys = null; + String brokerName = null; + int queueId = -1; + try { + if (commandLine.hasOption('k')) { + keys = commandLine.getOptionValue('k').trim(); + } + if (commandLine.hasOption('g')) { + tag = commandLine.getOptionValue('g').trim(); + } + if (commandLine.hasOption('q')) { + brokerName = commandLine.getOptionValue('q').trim(); + } + if (commandLine.hasOption('i')) { + queueId = Integer.parseInt(commandLine.getOptionValue('i').trim()); + } + msg = new Message(topic, tag, keys, body.getBytes("utf-8")); + } catch (Exception e) { + throw new RuntimeException(this.getClass().getSimpleName() + " command failed", e); + } + + DefaultMQProducer producer = this.createProducer(rpcHook); + SendResult result = null; + try { + producer.start(); + if (brokerName != null && queueId > -1) { + MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId); + result = producer.send(msg, messageQueue); + } else { + result = producer.send(msg); + } + + } catch (Exception e) { + throw new RuntimeException(this.getClass().getSimpleName() + " command failed", e); + } finally { + producer.shutdown(); + } + + System.out.printf("%-32s %-4s %-20s %s%n", + "#Broker Name", + "#QID", + "#Send Result", + "#MsgId" + ); + + if (result != null) { + System.out.printf("%-32s %-4s %-20s %s%n", + result.getMessageQueue().getBrokerName(), + result.getMessageQueue().getQueueId(), + result.getSendStatus(), + result.getMsgId() + ); + } else { + System.out.printf("%-32s %-4s %-20s %s%n", + "Unknown", + "Unknown", + "Failed", + "None" + ); + } + } +} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMessageCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMessageCommandTest.java new file mode 100644 index 00000000000..0561c384c0a --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMessageCommandTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tools.command.broker; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.apache.rocketmq.tools.command.message.SendMessageCommand; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.lang.reflect.Field; + +import static org.mockito.Mockito.doAnswer; + +@RunWith(org.mockito.junit.MockitoJUnitRunner.class) +public class SendMessageCommandTest { + + private SendMessageCommand sendMessageCommand = new SendMessageCommand(); + @Mock + private DefaultMQProducer defaultMQProducer; + + private SendResult sendResult; + + private SendResult sendResultByQueue; + + @Before + public void init() throws MQClientException, SubCommandException, RemotingException, InterruptedException, MQBrokerException, NoSuchFieldException, IllegalAccessException { + sendResult = new SendResult(); + sendResult.setMessageQueue(new MessageQueue()); + sendResult.getMessageQueue().setBrokerName("broker1"); + sendResult.getMessageQueue().setQueueId(1); + sendResult.setSendStatus(SendStatus.SEND_OK); + sendResult.setMsgId("fgwejigherughwueyutyu4t4343t43"); + + sendResultByQueue = sendResult; + + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + return sendResult; + } + }).when(defaultMQProducer).send(Mockito.any(Message.class)); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + return sendResultByQueue; + } + }).when(defaultMQProducer).send(Mockito.any(Message.class),Mockito.any(MessageQueue.class)); + + Field producerField = SendMessageCommand.class.getDeclaredField("producer"); + producerField.setAccessible(true); + producerField.set(sendMessageCommand,defaultMQProducer); + } + + @After + public void terminate() { + + } + + @Test + public void testExecute() throws SubCommandException { + PrintStream out = System.out; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bos)); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] {"-t mytopic","-b 'send message test'","-g tagA","-k order-16546745756"}; + CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + sendMessageCommand.commandName(), subargs, sendMessageCommand.buildCommandlineOptions(options), new PosixParser()); + sendMessageCommand.execute(commandLine, options, null); + + subargs = new String[] {"-t mytopic","-b 'send message test'","-g tagA","-k order-16546745756","-q brokera","-i 1"}; + commandLine = ServerUtil.parseCmdLine("mqadmin " + sendMessageCommand.commandName(), subargs, sendMessageCommand.buildCommandlineOptions(options), new PosixParser()); + sendMessageCommand.execute(commandLine, options, null); + System.setOut(out); + String s = new String(bos.toByteArray()); + Assert.assertTrue(s.contains("SEND_OK")); + } + + +} From 746b27527d72dc2bc844c51d587aed65e073edc7 Mon Sep 17 00:00:00 2001 From: lindzh Date: Mon, 22 Jan 2018 10:56:33 +0800 Subject: [PATCH 2/3] fix command parameter comment --- .../rocketmq/tools/command/message/SendMessageCommand.java | 4 ++-- .../rocketmq/tools/command/broker/SendMessageCommandTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java index 8a94589757f..23e2d6a1dba 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java @@ -48,7 +48,7 @@ public Options buildCommandlineOptions(Options options) { opt.setRequired(true); options.addOption(opt); - opt = new Option("b", "body", true, "message body string utf-8 format"); + opt = new Option("b", "body", true, "utf-8 string format of the message body"); opt.setRequired(true); options.addOption(opt); @@ -60,7 +60,7 @@ public Options buildCommandlineOptions(Options options) { opt.setRequired(false); options.addOption(opt); - opt = new Option("q", "qbroker", true, "send message to which broker"); + opt = new Option("o", "qbroker", true, "send message to which broker"); opt.setRequired(false); options.addOption(opt); diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMessageCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMessageCommandTest.java index 0561c384c0a..c11c457531c 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMessageCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMessageCommandTest.java @@ -105,7 +105,7 @@ public void testExecute() throws SubCommandException { ServerUtil.parseCmdLine("mqadmin " + sendMessageCommand.commandName(), subargs, sendMessageCommand.buildCommandlineOptions(options), new PosixParser()); sendMessageCommand.execute(commandLine, options, null); - subargs = new String[] {"-t mytopic","-b 'send message test'","-g tagA","-k order-16546745756","-q brokera","-i 1"}; + subargs = new String[] {"-t mytopic","-b 'send message test'","-g tagA","-k order-16546745756","-o brokera","-i 1"}; commandLine = ServerUtil.parseCmdLine("mqadmin " + sendMessageCommand.commandName(), subargs, sendMessageCommand.buildCommandlineOptions(options), new PosixParser()); sendMessageCommand.execute(commandLine, options, null); System.setOut(out); From ea5f15ba22060cb30aaf5d2325792155718d6e67 Mon Sep 17 00:00:00 2001 From: lindzh Date: Fri, 26 Jan 2018 14:37:43 +0800 Subject: [PATCH 3/3] add tools send msg test case --- .../broker/SendMessageCommandTest.java | 19 ++++ .../src/test/resources/conf/logback_tools.xml | 93 +++++++++++++++++++ 2 files changed, 112 insertions(+) create mode 100644 tools/src/test/resources/conf/logback_tools.xml diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMessageCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMessageCommandTest.java index c11c457531c..00e9b5e28ff 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMessageCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/SendMessageCommandTest.java @@ -25,10 +25,12 @@ import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.command.MQAdminStartup; import org.apache.rocketmq.tools.command.SubCommandException; import org.apache.rocketmq.tools.command.message.SendMessageCommand; import org.junit.After; @@ -42,8 +44,10 @@ import org.mockito.stubbing.Answer; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.PrintStream; import java.lang.reflect.Field; +import java.net.URL; import static org.mockito.Mockito.doAnswer; @@ -113,5 +117,20 @@ public void testExecute() throws SubCommandException { Assert.assertTrue(s.contains("SEND_OK")); } + @Test + public void testSendMessageCommandAdd() { + URL resource = SendMessageCommandTest.class.getClassLoader().getResource("conf/logback_tools.xml"); + String file = resource.getFile(); + File file1 = new File(file); + String path = file1.getParentFile().getParentFile().getPath(); + System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, path); + PrintStream out = System.out; + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bos)); + MQAdminStartup.main(new String[]{"sendMessage"}); + System.setOut(out); + String s = new String(bos.toByteArray()); + Assert.assertTrue(s.contains("utf-8 string format of the message body")); + } } diff --git a/tools/src/test/resources/conf/logback_tools.xml b/tools/src/test/resources/conf/logback_tools.xml new file mode 100644 index 00000000000..28283ad1d1d --- /dev/null +++ b/tools/src/test/resources/conf/logback_tools.xml @@ -0,0 +1,93 @@ + + + + + + ${user.home}/logs/rocketmqlogs/tools_default.log + true + + ${user.home}/logs/rocketmqlogs/otherdays/tools_default.%i.log.gz + 1 + 5 + + + 100MB + + + %d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n + UTF-8 + + + + + ${user.home}/logs/rocketmqlogs/tools.log + true + + ${user.home}/logs/rocketmqlogs/otherdays/tools.%i.log.gz + 1 + 5 + + + 100MB + + + %d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n + UTF-8 + + + + + + + + + true + + %d{yyy-MM-dd HH\:mm\:ss,GMT+8} %p %t - %m%n + UTF-8 + + + + + + + + + + + + + + + + + + + + + + + + + + + +