From e798093e834ba9eca386b73da69f2d154f32575e Mon Sep 17 00:00:00 2001 From: hanzhaozhan Date: Fri, 23 Dec 2016 17:08:27 +0800 Subject: [PATCH 01/21] simplify if grammar --- .../alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java index 18450c6f588..d1caf3aa09f 100644 --- a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -622,7 +622,7 @@ public byte[] getSystemTopicList() { while (it.hasNext()) { BrokerData bd = brokerAddrTable.get(it.next()); HashMap brokerAddrs = bd.getBrokerAddrs(); - if (bd.getBrokerAddrs() != null && !bd.getBrokerAddrs().isEmpty()) { + if (brokerAddrs != null && !brokerAddrs.isEmpty()) { Iterator it2 = brokerAddrs.keySet().iterator(); topicList.setBrokerAddr(brokerAddrs.get(it2.next())); break; From 9cb0a0cd46cf91b324b7c6c66d7479d21073d6f6 Mon Sep 17 00:00:00 2001 From: shroman Date: Tue, 3 Jan 2017 17:35:47 +0800 Subject: [PATCH 02/21] [ROCKETMQ-5] Avoid creating directories in UtilAll#getDiskPartitionSpaceUsedPercent(), closes apache/incubator-rocketmq#23 --- .../java/org/apache/rocketmq/common/UtilAll.java | 15 +++++++-------- .../org/apache/rocketmq/common/UtilAllTest.java | 13 +++++++++++++ 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index 56015b36d56..016da0b44c5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -183,17 +183,16 @@ public static double getDiskPartitionSpaceUsedPercent(final String path) { try { File file = new File(path); - if (!file.exists()) { - boolean result = file.mkdirs(); - if (!result) { - //TO DO - } - } + + if (!file.exists()) + return -1; long totalSpace = file.getTotalSpace(); - long freeSpace = file.getFreeSpace(); - long usedSpace = totalSpace - freeSpace; + if (totalSpace > 0) { + long freeSpace = file.getFreeSpace(); + long usedSpace = totalSpace - freeSpace; + return usedSpace / (double) totalSpace; } } catch (Exception e) { diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java index 8d8cf79600b..0db84fe461f 100644 --- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java @@ -21,6 +21,8 @@ import java.util.Properties; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; public class UtilAllTest { @@ -78,6 +80,17 @@ public void test_getpid() { assertTrue(pid > 0); } + @Test + public void test_getDiskPartitionSpaceUsedPercent() { + assertEquals(-1, UtilAll.getDiskPartitionSpaceUsedPercent(null), 0); + assertEquals(-1, UtilAll.getDiskPartitionSpaceUsedPercent(""), 0); + + assertEquals(-1, UtilAll.getDiskPartitionSpaceUsedPercent("nonExistingPath"), 0); + + String tmpDir = System.getProperty("java.io.tmpdir"); + assertNotEquals(-1, UtilAll.getDiskPartitionSpaceUsedPercent(tmpDir), 0); + } + @Test public void test_isBlank() { { From d891d28349b00c5bc8be6333e319a815747c2330 Mon Sep 17 00:00:00 2001 From: hanzhaozhan Date: Fri, 23 Dec 2016 17:08:27 +0800 Subject: [PATCH 03/21] simplify if grammar --- .../org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index 69b64ca7ef3..16b7847ae92 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -610,7 +610,7 @@ public byte[] getSystemTopicList() { while (it.hasNext()) { BrokerData bd = brokerAddrTable.get(it.next()); HashMap brokerAddrs = bd.getBrokerAddrs(); - if (bd.getBrokerAddrs() != null && !bd.getBrokerAddrs().isEmpty()) { + if (brokerAddrs != null && !brokerAddrs.isEmpty()) { Iterator it2 = brokerAddrs.keySet().iterator(); topicList.setBrokerAddr(brokerAddrs.get(it2.next())); break; From b421d48c476e74a8c7bb8129979df1dc0cb5a5a5 Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Wed, 4 Jan 2017 15:20:29 +0800 Subject: [PATCH 04/21] Fix https://issues.apache.org/jira/browse/ROCKETMQ-25 --- .../org/apache/rocketmq/client/impl/MQAdminImpl.java | 11 ++++++++++- .../command/message/QueryMsgByKeySubCommand.java | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index fb948b7796b..983e5157ea9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -23,6 +23,9 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -285,6 +288,7 @@ protected QueryResult queryMessage(String topic, String key, int maxNum, long be if (!brokerAddrs.isEmpty()) { final CountDownLatch countDownLatch = new CountDownLatch(brokerAddrs.size()); final List queryResultList = new LinkedList(); + final ReadWriteLock lock = new ReentrantReadWriteLock(false); for (String addr : brokerAddrs) { try { @@ -318,7 +322,12 @@ public void operationComplete(ResponseFuture responseFuture) { MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true); QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers); - queryResultList.add(qr); + try { + lock.writeLock().lock(); + queryResultList.add(qr); + } finally { + lock.writeLock().unlock(); + } break; } default: diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java index 77dc6c418f2..bdc5f5202a2 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java @@ -69,7 +69,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { } } - void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key) + private void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key) throws MQClientException, InterruptedException { admin.start(); From 920dd314d9e34cef01fdf2107b49b9d44336a69f Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Wed, 4 Jan 2017 15:35:45 +0800 Subject: [PATCH 05/21] Fix hard-coded topic/subscription config file path. --- .../broker/subscription/SubscriptionGroupManager.java | 4 ++-- .../org/apache/rocketmq/broker/topic/TopicConfigManager.java | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index 4b6072c9f75..a9c978ac54d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -142,8 +142,8 @@ public String encode() { @Override public String configFilePath() { - //return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); - return BrokerPathConfigHelper.getSubscriptionGroupPath(System.getProperty("user.home") + File.separator + "store"); + return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig() + .getStorePathRootDir()); } @Override diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index d31ad4b8f85..5ebf1e667d9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -381,9 +381,8 @@ public String encode() { @Override public String configFilePath() { -// return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig() -// .getStorePathRootDir()); - return BrokerPathConfigHelper.getTopicConfigPath(System.getProperty("user.home") + File.separator + "store"); + return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig() + .getStorePathRootDir()); } @Override From 23e6c7ae52dedfbb0681803a0e1a7e25a6173d6f Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Wed, 4 Jan 2017 15:53:31 +0800 Subject: [PATCH 06/21] Fix checkstyle. --- .../rocketmq/broker/subscription/SubscriptionGroupManager.java | 1 - .../org/apache/rocketmq/broker/topic/TopicConfigManager.java | 1 - 2 files changed, 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index a9c978ac54d..92dc5e72b25 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.broker.subscription; -import java.io.File; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 5ebf1e667d9..93a631ac360 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.broker.topic; -import java.io.File; import java.util.HashSet; import java.util.Iterator; import java.util.Map; From 0c0b730b258b84d2e1affb10ecf6a0ee7cae2f4a Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Thu, 5 Jan 2017 11:59:11 +0800 Subject: [PATCH 07/21] Fix integer overflow. --- .../apache/rocketmq/common/BrokerConfig.java | 2 +- .../rocketmq/common/BrokerConfigTest.java | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) create mode 100644 common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 6ddb9e11d03..f79f7267c8c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -91,7 +91,7 @@ public class BrokerConfig { private boolean slaveReadEnable = false; private boolean disableConsumeIfConsumerReadSlowly = false; - private long consumerFallbehindThreshold = 1024 * 1024 * 1024 * 16; + private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16; private long waitTimeMillsInSendQueue = 200; diff --git a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java new file mode 100644 index 00000000000..c8cdaaf609e --- /dev/null +++ b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common; + +import org.junit.Assert; +import org.junit.Test; + +public class BrokerConfigTest { + + @Test + public void testConsumerFallBehindThresholdOverflow() { + long expect = 1024L * 1024 * 1024 * 16; + Assert.assertEquals(expect, new BrokerConfig().getConsumerFallbehindThreshold()); + } + +} \ No newline at end of file From 55d73e18a4f59416c9d5820bc3bb989cf9dd2a55 Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Fri, 6 Jan 2017 12:51:26 +0800 Subject: [PATCH 08/21] Fix release.xml to include files in sub-directories of conf --- release.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release.xml b/release.xml index dcef3abf1ae..40a56df09f1 100644 --- a/release.xml +++ b/release.xml @@ -27,7 +27,7 @@ bin/* - conf/* + conf/** benchmark/* LICENSE NOTICE From 9d76ea923159ff36d462b1809104164e270fce96 Mon Sep 17 00:00:00 2001 From: qinliujie <> Date: Mon, 9 Jan 2017 11:26:07 +0800 Subject: [PATCH 09/21] MASTER [ROCKETMQ-33] Resolve cpu occupy 100% issue in GroupCommitService/HAService, closes apache/incubator-rocketmq#31 --- store/src/main/java/org/apache/rocketmq/store/CommitLog.java | 2 +- store/src/main/java/org/apache/rocketmq/store/ha/HAService.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 5ebab545137..b4bf298ba80 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -1045,7 +1045,7 @@ public void run() { while (!this.isStopped()) { try { - this.waitForRunning(0); + this.waitForRunning(10); this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java index d2e8cd9f8b3..762bdb6adca 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java @@ -297,7 +297,7 @@ public void run() { while (!this.isStopped()) { try { - this.waitForRunning(0); + this.waitForRunning(10); this.doWaitTransfer(); } catch (Exception e) { log.warn(this.getServiceName() + " service has exception. ", e); From 85467dfd34d8ff379d2ddfec0489d78dcba20c27 Mon Sep 17 00:00:00 2001 From: qinliujie <765152203@qq.com> Date: Mon, 9 Jan 2017 12:06:23 +0800 Subject: [PATCH 10/21] Fix-35 [ROCKETMQ-35] Reslove underlying NPE in ConsumeRequest, closes apache/incubator-rocketmq#32 --- .../impl/consumer/ConsumeMessageConcurrentlyService.java | 6 +++++- .../client/impl/consumer/ConsumeMessageOrderlyService.java | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index 5440522fb23..f566ed0fcca 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -439,7 +439,11 @@ public void run() { } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } - consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); + + if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { + consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); + } + if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index 6c92315fe2f..1fa474caa1d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -501,7 +501,11 @@ public void run() { } else if (ConsumeOrderlyStatus.SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } - consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); + + if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { + consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); + } + if (null == status) { status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } From 776911d458d45280de3a4c0f4d6b2bd2ee98d6b2 Mon Sep 17 00:00:00 2001 From: shroman Date: Mon, 9 Jan 2017 16:09:02 +0800 Subject: [PATCH 11/21] [ROCKETMQ-34] Potential NPE in NettyConnetManageHandler#connect, closes apache/incubator-rocketmq#30 --- .../remoting/netty/NettyRemotingClient.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 437641629d2..9fdaccf7681 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -73,12 +73,12 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private final Bootstrap bootstrap = new Bootstrap(); private final EventLoopGroup eventLoopGroupWorker; private final Lock lockChannelTables = new ReentrantLock(); - private final ConcurrentHashMap channelTables = new ConcurrentHashMap(); + private final ConcurrentHashMap channelTables = new ConcurrentHashMap<>(); private final Timer timer = new Timer("ClientHouseKeepingService", true); - private final AtomicReference> namesrvAddrList = new AtomicReference>(); - private final AtomicReference namesrvAddrChoosed = new AtomicReference(); + private final AtomicReference> namesrvAddrList = new AtomicReference<>(); + private final AtomicReference namesrvAddrChoosed = new AtomicReference<>(); private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex()); private final Lock lockNamesrvChannel = new ReentrantLock(); @@ -155,7 +155,7 @@ public void initChannel(SocketChannel ch) throws Exception { new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), - new NettyConnetManageHandler(), + new NettyConnectManageHandler(), new NettyClientHandler()); } }); @@ -527,7 +527,7 @@ public void registerProcessor(int requestCode, NettyRequestProcessor processor, executorThis = this.publicExecutor; } - Pair pair = new Pair(processor, executorThis); + Pair pair = new Pair<>(processor, executorThis); this.processorTable.put(requestCode, pair); } @@ -596,17 +596,18 @@ protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) thro } } - class NettyConnetManageHandler extends ChannelDuplexHandler { + class NettyConnectManageHandler extends ChannelDuplexHandler { @Override - public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) - throws Exception { + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, + ChannelPromise promise) throws Exception { final String local = localAddress == null ? "UNKNOW" : localAddress.toString(); final String remote = remoteAddress == null ? "UNKNOW" : remoteAddress.toString(); log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote); + super.connect(ctx, remoteAddress, localAddress, promise); if (NettyRemotingClient.this.channelEventListener != null) { - NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel())); + NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel())); } } From f93605755f83b4a992b38538c33c1348517699a6 Mon Sep 17 00:00:00 2001 From: iskl Date: Mon, 9 Jan 2017 16:20:28 +0800 Subject: [PATCH 12/21] [ROCKETMQ-31] Fix 'No such file or directory' warning for /Users/zhouxinyu/rmq_bk_gc.log, closes apache/incubator-rocketmq#29 --- bin/mqbroker | 2 -- 1 file changed, 2 deletions(-) diff --git a/bin/mqbroker b/bin/mqbroker index b72310f9efd..6a79c392e8d 100644 --- a/bin/mqbroker +++ b/bin/mqbroker @@ -42,6 +42,4 @@ fi export ROCKETMQ_HOME -rm -f $HOME/rmq_bk_gc.log.bac -cp $HOME/rmq_bk_gc.log $HOME/rmq_bk_gc.log.bac sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@ From 1562bd0d11fbf4bbbca38921c6cc07ebe11ea15a Mon Sep 17 00:00:00 2001 From: shroman Date: Mon, 9 Jan 2017 16:28:10 +0800 Subject: [PATCH 13/21] [ROCKETMQ-30] Fixed method signature for Message Filter example and class loading from resources, closes apache/incubator-rocketmq#27 --- .../java/org/apache/rocketmq/example/filter/Consumer.java | 8 ++++++-- example/src/main/resources/MessageFilterImpl.java | 3 ++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java index 7b79b370e8e..d63435b5923 100644 --- a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.example.filter; +import java.io.File; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; @@ -30,8 +31,11 @@ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4"); - String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java"); - consumer.subscribe("TopicFilter7", "org.apache.rocketmq.example.filter.MessageFilterImpl", + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + File classFile = new File(classLoader.getResource("MessageFilterImpl.java").getFile()); + + String filterCode = MixAll.file2String(classFile); + consumer.subscribe("TopicTest", "org.apache.rocketmq.example.filter.MessageFilterImpl", filterCode); consumer.registerMessageListener(new MessageListenerConcurrently() { diff --git a/example/src/main/resources/MessageFilterImpl.java b/example/src/main/resources/MessageFilterImpl.java index 83ca00ef2cb..23e4a79b942 100644 --- a/example/src/main/resources/MessageFilterImpl.java +++ b/example/src/main/resources/MessageFilterImpl.java @@ -17,13 +17,14 @@ package org.apache.rocketmq.example.filter; +import org.apache.rocketmq.common.filter.FilterContext; import org.apache.rocketmq.common.filter.MessageFilter; import org.apache.rocketmq.common.message.MessageExt; public class MessageFilterImpl implements MessageFilter { @Override - public boolean match(MessageExt msg) { + public boolean match(MessageExt msg, FilterContext context) { String property = msg.getProperty("SequenceId"); if (property != null) { int id = Integer.parseInt(property); From 6e31d864e3f49b1296bad2e24955bee4d918d31d Mon Sep 17 00:00:00 2001 From: yukon Date: Mon, 9 Jan 2017 21:58:38 +0800 Subject: [PATCH 14/21] [ROCKETMQ-22] Resolve ClassCastException issue in printWaterMark. --- .../rocketmq/broker/BrokerController.java | 2 +- .../broker/latency/BrokerFastFailure.java | 4 +- .../longpolling/PullRequestHoldService.java | 4 +- .../processor/PullMessageProcessor.java | 6 +- .../broker/api/BrokerFastFailureTest.java | 61 +++++++++++++++++++ 5 files changed, 69 insertions(+), 8 deletions(-) create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 9b89c85c143..af69001c3de 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -463,7 +463,7 @@ public long headSlowTimeMills(BlockingQueue q) { final Runnable peek = q.peek(); if (peek != null) { RequestTask rt = BrokerFastFailure.castRunnable(peek); - slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp(); + slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp(); } if (slowTimeMills < 0) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index d7d127699e5..a2a1aa095f7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -71,7 +71,7 @@ private void cleanExpiredRequest() { } else { break; } - } catch (Throwable e) { + } catch (Throwable ignored) { } } @@ -99,7 +99,7 @@ private void cleanExpiredRequest() { } else { break; } - } catch (Throwable e) { + } catch (Throwable ignored) { } } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java index ff068d26b30..fdba50daaf6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java @@ -130,7 +130,7 @@ public void notifyMessageArriving(final String topic, final int queueId, final l if (newestOffset > request.getPullFromThisOffset()) { if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) { try { - this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), + this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); @@ -141,7 +141,7 @@ public void notifyMessageArriving(final String topic, final int queueId, final l if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { try { - this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), + this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 382030be478..7d158943534 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -50,6 +50,7 @@ import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.MessageExtBrokerInner; @@ -481,7 +482,7 @@ private void generateOffsetMovedEvent(final OffsetMovedEvent event) { } } - public void excuteRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { + public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { Runnable run = new Runnable() { @Override public void run() { @@ -513,8 +514,7 @@ public void operationComplete(ChannelFuture future) throws Exception { } } }; - - this.brokerController.getPullMessageExecutor().submit(run); + this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request)); } public void registerConsumeMessageHook(List sendMessageHookList) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java b/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java new file mode 100644 index 00000000000..bec0af5a0d6 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.api; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.broker.BrokerTestHarness; +import org.apache.rocketmq.broker.latency.BrokerFastFailure; +import org.apache.rocketmq.broker.latency.FutureTaskExt; +import org.apache.rocketmq.remoting.netty.RequestTask; +import org.junit.Assert; +import org.junit.Test; + +public class BrokerFastFailureTest extends BrokerTestHarness { + + @Test + public void testHeadSlowTimeMills() throws InterruptedException { + BlockingQueue blockingQueue = new LinkedBlockingQueue<>(); + blockingQueue.add(new FutureTaskExt<>(new RequestTask(null, null, null), null)); + TimeUnit.MILLISECONDS.sleep(10); + Assert.assertTrue(this.brokerController.headSlowTimeMills(blockingQueue) > 0); + + blockingQueue.clear(); + blockingQueue.add(new Runnable() { + @Override public void run() { + + } + }); + Assert.assertTrue(this.brokerController.headSlowTimeMills(blockingQueue) == 0); + } + + @Test + public void testCastRunnable() { + Runnable runnable = new Runnable() { + @Override public void run() { + + } + }; + Assert.assertNull(BrokerFastFailure.castRunnable(runnable)); + + RequestTask requestTask = new RequestTask(null, null, null); + runnable = new FutureTaskExt<>(requestTask, null); + + Assert.assertEquals(requestTask, BrokerFastFailure.castRunnable(runnable)); + } +} From 6baa2ed599fb9a1ad0d860efd2c438a9078fc2c6 Mon Sep 17 00:00:00 2001 From: yukon Date: Tue, 10 Jan 2017 22:00:00 +0800 Subject: [PATCH 15/21] [ROCKETMQ-1] Update notice file. --- NOTICE | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/NOTICE b/NOTICE index fea5b4cc9cc..638868a5005 100644 --- a/NOTICE +++ b/NOTICE @@ -1,5 +1,5 @@ -RocketMQ -Copyright 2016 Alibaba Group. +Apache RocketMQ (incubating) +Copyright 2016 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file From 7fcf2f1dec0943ea540551b73eb9bd13e3bec59d Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Wed, 11 Jan 2017 12:12:15 +0800 Subject: [PATCH 16/21] Add javadoc to DefaultMQProducer --- .../client/producer/DefaultMQProducer.java | 198 ++++++++++++++++++ 1 file changed, 198 insertions(+) diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index c6773245473..a71a743d019 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -31,92 +31,290 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; +/** + * This class is the entry point for applications intending to send messages. + *

+ * + * It's fine to tune fields which exposes getter/setter methods, but keep in mind, all of them should work well out of + * box for most scenarios. + *

+ * + * This class aggregates various send methods to deliver messages to brokers. Each of them has pros and + * cons; you'd better understand strengths and weakness of them before actually coding. + *

+ * + *

+ * Thread Safety: After configuring and starting process, this class can be regarded as thread-safe + * and used among multiple threads context. + *

+ */ public class DefaultMQProducer extends ClientConfig implements MQProducer { + + /** + * Wrapping internal implementations for virtually all methods presented in this class. + */ protected final transient DefaultMQProducerImpl defaultMQProducerImpl; + + /** + * Producer group conceptually aggregates all producer instances of exactly same role, which is particularly + * important when transactional messages are involved. + *

+ * + * For non-transactional messages, it does not matter as long as it's unique per process. + *

+ * + * See {@linktourl http://rocketmq.incubator.apache.org/docs/core-concept/} for more discussion. + */ private String producerGroup; + /** * Just for testing or demo program */ private String createTopicKey = MixAll.DEFAULT_TOPIC; + + /** + * Number of queues to create per default topic. + */ private volatile int defaultTopicQueueNums = 4; + + /** + * Timeout for sending messages. + */ private int sendMsgTimeout = 3000; + + /** + * Compress message body threshold, namely, message body larger than 4k will be compressed on default. + */ private int compressMsgBodyOverHowmuch = 1024 * 4; + + /** + * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. + *

+ * + * This may potentially cause message duplication which is up to application developers to resolve. + */ private int retryTimesWhenSendFailed = 2; + + /** + * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. + *

+ * + * This may potentially cause message duplication which is up to application developers to resolve. + */ private int retryTimesWhenSendAsyncFailed = 2; + /** + * Indicate whether to retry another broker on sending failure internally. + */ private boolean retryAnotherBrokerWhenNotStoreOK = false; + + /** + * Maximum allowed message size in bytes. + */ private int maxMessageSize = 1024 * 1024 * 4; // 4M + /** + * Default constructor. + */ public DefaultMQProducer() { this(MixAll.DEFAULT_PRODUCER_GROUP, null); } + /** + * Constructor specifying both producer group and RPC hook. + * + * @param producerGroup Producer group, see the name-sake field. + * @param rpcHook RPC hook to execute per each remoting command execution. + */ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) { this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); } + /** + * Constructor specifying producer group. + * @param producerGroup Producer group, see the name-sake field. + */ public DefaultMQProducer(final String producerGroup) { this(producerGroup, null); } + /** + * Constructor specifying the RPC hook. + * @param rpcHook RPC hook to execute per each remoting command execution. + */ public DefaultMQProducer(RPCHook rpcHook) { this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook); } + /** + * Start this producer instance. + *

+ * + * + * Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke + * this method before sending or querying messages. + * + *

+ * + * @throws MQClientException if there is any unexpected error. + */ @Override public void start() throws MQClientException { this.defaultMQProducerImpl.start(); } + /** + * This method shuts down this producer instance and releases related resources. + */ @Override public void shutdown() { this.defaultMQProducerImpl.shutdown(); } + /** + * Fetch message queues of topic topic, to which we may send/publish messages. + * @param topic Topic to fetch. + * @return List of message queues readily to send messages to + * @throws MQClientException if there is any client error. + */ @Override public List fetchPublishMessageQueues(String topic) throws MQClientException { return this.defaultMQProducerImpl.fetchPublishMessageQueues(topic); } + /** + * Send message in synchronous mode. This method returns only when the sending procedure totally completes. + *

+ * + * Warn: this method has internal retry-mechanism, that is, internal implementation will retry + * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially + * delivered to broker(s). It's up to the application developers to resolve potential duplication issue. + * + * @param msg Message to send. + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any error with broker. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg); } + /** + * Same to {@link #send(Message)} with send timeout specified in addition. + * @param msg Message to send. + * @param timeout send timeout. + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any error with broker. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, timeout); } + /** + * Send message to broker asynchronously. + *

+ * + * This method returns immediately. On sending completion, sendCallback will be executed. + *

+ * + * Similar to {@link #send(Message)}, internal implementation would potentially retry up to + * {@link #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication + * and application developers are the one to resolve this potential issue. + * @param msg Message to send. + * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback); } + /** + * Same to {@link #send(Message, SendCallback)} with send timeout specified in addition. + * @param msg message to send. + * @param sendCallback Callback to execute. + * @param timeout send timeout. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public void send(Message msg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback, timeout); } + /** + * Similar to UDP, this method won't wait for + * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss. + * @param msg Message to send. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.sendOneway(msg); } + /** + * Same to {@link #send(Message)} with target message queue specified in addition. + * @param msg Message to send. + * @param mq Target message queue. + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any error with broker. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public SendResult send(Message msg, MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, mq); } + /** + * Same to {@link #send(Message)} with target message queue and send timeout specified. + * + * @param msg Message to send. + * @param mq Target message queue. + * @param timeout send timeout. + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any error with broker. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public SendResult send(Message msg, MessageQueue mq, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, mq, timeout); } + /** + * Same to {@link #send(Message, SendCallback)} with target message queue specified. + * + * @param msg Message to send. + * @param mq Target message queue. + * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public void send(Message msg, MessageQueue mq, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { From de628a444b239093486b5141a3714f35f09e77ae Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Wed, 11 Jan 2017 14:50:20 +0800 Subject: [PATCH 17/21] Add javadoc to DefaultMQProducer --- .../client/producer/DefaultMQProducer.java | 158 ++++++++++++++++++ 1 file changed, 158 insertions(+) diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index a71a743d019..3480c920e89 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -321,94 +321,252 @@ public void send(Message msg, MessageQueue mq, SendCallback sendCallback) this.defaultMQProducerImpl.send(msg, mq, sendCallback); } + /** + * Same to {@link #send(Message, SendCallback)} with target message queue and send timeout specified. + * @param msg Message to send. + * @param mq Target message queue. + * @param sendCallback Callback to execute on sending completed, either successful or unsuccessful. + * @param timeout Send timeout. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout); } + /** + * Same to {@link #sendOneway(Message)} with target message queue specified. + * @param msg Message to send. + * @param mq Target message queue. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.sendOneway(msg, mq); } + /** + * Same to {@link #send(Message)} with message queue selector specified. + * + * @param msg Message to send. + * @param selector Message queue selector, through which we get target message queue to deliver message to. + * @param arg Argument to work along with message queue selector. + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any error with broker. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, selector, arg); } + /** + * Same to {@link #send(Message, MessageQueueSelector, Object)} with send timeout specified. + * + * @param msg Message to send. + * @param selector Message queue selector, through which we get target message queue to deliver message to. + * @param arg Argument to work along with message queue selector. + * @param timeout Send timeout. + * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message, + * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any error with broker. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(msg, selector, arg, timeout); } + /** + * Same to {@link #send(Message, SendCallback)} with message queue selector specified. + * + * @param msg Message to send. + * @param selector Message selector through which to get target message queue. + * @param arg Argument used along with message queue selector. + * @param sendCallback callback to execute on sending completion. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback); } + /** + * Same to {@link #send(Message, MessageQueueSelector, Object, SendCallback)} with timeout specified. + * + * @param msg Message to send. + * @param selector Message selector through which to get target message queue. + * @param arg Argument used along with message queue selector. + * @param sendCallback callback to execute on sending completion. + * @param timeout Send timeout. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout); } + /** + * Same to {@link #sendOneway(Message)} with message queue selector specified. + * @param msg Message to send. + * @param selector Message queue selector, through which to determine target message queue to deliver message + * @param arg Argument used along with message queue selector. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public void sendOneway(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.sendOneway(msg, selector, arg); } + /** + * This method is to send transactional messages. + * + * @param msg Transactional message to send. + * @param tranExecuter local transaction executor. + * @param arg Argument used along with local transaction executor. + * @return Transaction result. + * @throws MQClientException if there is any client error. + */ @Override public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); } + /** + * Create a topic on broker. + * @param key accesskey + * @param newTopic topic name + * @param queueNum topic's queue number + * @throws MQClientException if there is any client error. + */ @Override public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { createTopic(key, newTopic, queueNum, 0); } + /** + * Create a topic on broker. + * @param key accesskey + * @param newTopic topic name + * @param queueNum topic's queue number + * @param topicSysFlag topic system flag + * @throws MQClientException if there is any client error. + */ @Override public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { this.defaultMQProducerImpl.createTopic(key, newTopic, queueNum, topicSysFlag); } + /** + * Search consume queue offset of the given time stamp. + * @param mq Instance of MessageQueue + * @param timestamp from when in milliseconds. + * @return Consume queue offset. + * @throws MQClientException if there is any client error. + */ @Override public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { return this.defaultMQProducerImpl.searchOffset(mq, timestamp); } + /** + * Query maximum offset of the given message queue. + * + * @param mq Instance of MessageQueue + * @return maximum offset of the given consume queue. + * @throws MQClientException if there is any client error. + */ @Override public long maxOffset(MessageQueue mq) throws MQClientException { return this.defaultMQProducerImpl.maxOffset(mq); } + /** + * Query minimum offset of the given message queue. + * @param mq Instance of MessageQueue + * @return minimum offset of the given message queue. + * @throws MQClientException if there is any client error. + */ @Override public long minOffset(MessageQueue mq) throws MQClientException { return this.defaultMQProducerImpl.minOffset(mq); } + /** + * Query earliest message store time. + * @param mq Instance of MessageQueue + * @return earliest message store time. + * @throws MQClientException if there is any client error. + */ @Override public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { return this.defaultMQProducerImpl.earliestMsgStoreTime(mq); } + /** + * Query message of the given offset message ID. + * @param offsetMsgId message id + * @return Message specified. + * @throws MQBrokerException if there is any broker error. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { return this.defaultMQProducerImpl.viewMessage(offsetMsgId); } + /** + * Query message by key. + * @param topic message topic + * @param key message key index word + * @param maxNum max message number + * @param begin from when + * @param end to when + * @return QueryResult instance contains matched messages. + * @throws MQClientException if there is any client error. + * @throws InterruptedException if the thread is interrupted. + */ @Override public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException { return this.defaultMQProducerImpl.queryMessage(topic, key, maxNum, begin, end); } + /** + * Query message of the given message ID. + * + * @param topic Topic + * @param msgId Message ID + * @return Message specified. + * @throws MQBrokerException if there is any broker error. + * @throws MQClientException if there is any client error. + * @throws RemotingException if there is any network-tier error. + * @throws InterruptedException if the sending thread is interrupted. + */ @Override public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { From f32e0b9dc37b52e89ce75cf419e2933a8cccde06 Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Fri, 13 Jan 2017 11:20:29 +0800 Subject: [PATCH 18/21] Add javadoc to DefaultMQPushConsumer --- .../consumer/DefaultMQPushConsumer.java | 187 +++++++++++++++++- 1 file changed, 178 insertions(+), 9 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 45f23a7ef0c..2cce03d3473 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -40,51 +40,116 @@ import org.apache.rocketmq.remoting.exception.RemotingException; /** - * Wrapped push consumer.in fact,it works as remarkable as the pull consumer + * In most scenarios, this is the mostly recommended class to consume messages. + *

+ * + * Technically speaking, this push client is virtually a wrapper of the underlying pull service. Specifically, on + * arrival of messages pulled from brokers, it roughly invokes the registered callback handler to feed the messages. + *

+ * + * See quickstart/Consumer in the example module for a typical usage. + *

+ * + *

+ * Thread Safety: After initialization, the instance can be regarded as thread-safe. + *

*/ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer { + + /** + * Internal implementation. Most of the functions herein are delegated to it. + */ protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; + /** - * Do the same thing for the same Group, the application must be set,and - * guarantee Globally unique + * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve + * load balance. It's required and needs to be globally unique. + *

+ * + * See here for further discussion. */ private String consumerGroup; + /** - * Consumption pattern,default is clustering + * Message model defines the way how messages are delivered to each consumer clients. + *

+ * + * RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with + * the same {@link #consumerGroup} would only consume shards of the messages subscribed, which achieves load + * balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages + * separately. + *

+ * + * This field defaults to clustering. */ private MessageModel messageModel = MessageModel.CLUSTERING; + /** - * Consumption offset + * Consuming point on consumer booting. + *

+ * + * There are three consuming points: + *
    + *
  • + * CONSUME_FROM_LAST_OFFSET: consumer clients pick up where it stopped previously. + * If it were a newly booting up consumer client, according aging of the consumer group, there are two + * cases: + *
      + *
    1. + * if the consumer group is created so recently that the earliest message being subscribed has yet + * expired, which means the consumer group represents a lately launched business, consuming will + * start from the very beginning; + *
    2. + *
    3. + * if the earliest message being subscribed has expired, consuming will start from the latest + * messages, meaning messages born prior to the booting timestamp would be ignored. + *
    4. + *
    + *
  • + *
  • + * CONSUME_FROM_FIRST_OFFSET: Consumer client will start from earliest messages available. + *
  • + *
  • + * CONSUME_FROM_TIMESTAMP: Consumer client will start from specified timestamp, which means + * messages born prior to {@link #consumeTimestamp} will be ignored + *
  • + *
*/ private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; + /** - * Backtracking consumption time with second precision.time format is + * Backtracking consumption time with second precision. Time format is * 20131223171201
* Implying Seventeen twelve and 01 seconds on December 23, 2013 year
- * Default backtracking consumption time Half an hour ago + * Default backtracking consumption time Half an hour ago. */ private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30)); + /** - * Queue allocation algorithm + * Queue allocation algorithm specifying how message queues are allocated to each consumer clients. */ private AllocateMessageQueueStrategy allocateMessageQueueStrategy; /** * Subscription relationship */ - private Map subscription = new HashMap(); + private Map subscription = new HashMap<>(); + /** * Message listener */ private MessageListener messageListener; + /** * Offset Storage */ private OffsetStore offsetStore; + /** * Minimum consumer thread number */ private int consumeThreadMin = 20; + /** * Max consumer thread number */ @@ -99,18 +164,22 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * Concurrently max span offset.it has no effect on sequential consumption */ private int consumeConcurrentlyMaxSpan = 2000; + /** * Flow control threshold */ private int pullThresholdForQueue = 1000; + /** * Message pull Interval */ private long pullInterval = 0; + /** * Batch consumption size */ private int consumeMessageBatchMaxSize = 1; + /** * Batch pull size */ @@ -126,24 +195,56 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ private boolean unitMode = false; + /** + * Max re-consume times. -1 means 16 times. + *

+ * + * If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion + * queue waiting. + */ private int maxReconsumeTimes = -1; + + /** + * Suspending pulling time for cases requiring slow pulling like flow-control scenario. + */ private long suspendCurrentQueueTimeMillis = 1000; + + /** + * Maximum amount of time in minutes a message may block the consuming thread. + */ private long consumeTimeout = 15; + /** + * Default constructor. + */ public DefaultMQPushConsumer() { this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely()); } + /** + * Constructor specifying consumer group, RPC hook and message queue allocating algorithm. + * @param consumerGroup Consume queue. + * @param rpcHook RPC hook to execute before each remoting command. + * @param allocateMessageQueueStrategy message queue allocating algorithm. + */ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.consumerGroup = consumerGroup; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); } + /** + * Constructor specifying RPC hook. + * @param rpcHook RPC hook to execute before each remoting command. + */ public DefaultMQPushConsumer(RPCHook rpcHook) { this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely()); } + /** + * Constructor specifying consumer group. + * @param consumerGroup Consumer group. + */ public DefaultMQPushConsumer(final String consumerGroup) { this(consumerGroup, null, new AllocateMessageQueueAveragely()); } @@ -308,12 +409,33 @@ public void setSubscription(Map subscription) { this.subscription = subscription; } + /** + * Send message back to broker which will be re-delivered in future. + * @param msg Message to send back. + * @param delayLevel delay level. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any broker error. + * @throws InterruptedException if the thread is interrupted. + * @throws MQClientException if there is any client error. + */ @Override public void sendMessageBack(MessageExt msg, int delayLevel) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null); } + /** + * Send message back to the broker whose name is brokerName and the message will be re-delivered in + * future. + * + * @param msg Message to send back. + * @param delayLevel delay level. + * @param brokerName broker name. + * @throws RemotingException if there is any network-tier error. + * @throws MQBrokerException if there is any broker error. + * @throws InterruptedException if the thread is interrupted. + * @throws MQClientException if there is any client error. + */ @Override public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { @@ -325,11 +447,18 @@ public Set fetchSubscribeMessageQueues(String topic) throws MQClie return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic); } + /** + * This method gets internal infrastructure readily to serve. Instances must call this method after configuration. + * @throws MQClientException if there is any client error. + */ @Override public void start() throws MQClientException { this.defaultMQPushConsumerImpl.start(); } + /** + * Shut down this client and releasing underlying resources. + */ @Override public void shutdown() { this.defaultMQPushConsumerImpl.shutdown(); @@ -342,43 +471,83 @@ public void registerMessageListener(MessageListener messageListener) { this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); } + /** + * Register a callback to execute on message arrival for concurrent consuming. + * + * @param messageListener message handling callback. + */ @Override public void registerMessageListener(MessageListenerConcurrently messageListener) { this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); } + /** + * Register a callback to execute on message arrival for orderly consuming. + * + * @param messageListener message handling callback. + */ @Override public void registerMessageListener(MessageListenerOrderly messageListener) { this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); } + /** + * Subscribe a topic to consuming subscription. + * + * @param topic topic to subscribe. + * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3"
+ * if null or * expression,meaning subscribe all + * @throws MQClientException if there is any client error. + */ @Override public void subscribe(String topic, String subExpression) throws MQClientException { this.defaultMQPushConsumerImpl.subscribe(topic, subExpression); } + /** + * Subscribe a topic to consuming subscription. + * @param topic topic to consume. + * @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter + * @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety + * @throws MQClientException + */ @Override public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource); } + /** + * Un-subscribe the specified topic from subscription. + * @param topic message topic + */ @Override public void unsubscribe(String topic) { this.defaultMQPushConsumerImpl.unsubscribe(topic); } + /** + * Update the message consuming thread core pool size. + * + * @param corePoolSize new core pool size. + */ @Override public void updateCorePoolSize(int corePoolSize) { this.defaultMQPushConsumerImpl.updateCorePoolSize(corePoolSize); } + /** + * Suspend pulling new messages. + */ @Override public void suspend() { this.defaultMQPushConsumerImpl.suspend(); } + /** + * Resume pulling. + */ @Override public void resume() { this.defaultMQPushConsumerImpl.resume(); From 703745500eb505ce3801c03b1ef62457b84c13a9 Mon Sep 17 00:00:00 2001 From: yukon Date: Fri, 13 Jan 2017 17:33:52 +0800 Subject: [PATCH 19/21] Update code style. --- style/rmq_codeStyle.xml | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/style/rmq_codeStyle.xml b/style/rmq_codeStyle.xml index 9db075e3606..67044f0ff82 100644 --- a/style/rmq_codeStyle.xml +++ b/style/rmq_codeStyle.xml @@ -52,10 +52,7 @@