From c6d078e01941e2ab6e38c57855403e961359ed82 Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Thu, 5 Jan 2017 11:54:01 +0800 Subject: [PATCH 1/6] not yet persist commit log store path --- .../rocketmq/broker/BrokerController.java | 18 +-- .../apache/rocketmq/broker/BrokerStartup.java | 8 +- .../processor/AdminBrokerProcessor.java | 75 ++++++++----- .../rocketmq/client/impl/MQAdminImpl.java | 10 +- .../rocketmq/client/impl/MQClientAPIImpl.java | 65 ++++------- .../org/apache/rocketmq/common/UtilAll.java | 40 ++++++- .../rocketmq/common/protocol/RequestCode.java | 2 + .../AddCommitLogStorePathRequestHeader.java | 41 +++++++ .../CheckTransactionStateRequestHeader.java | 4 - .../org/apache/rocketmq/store/CommitLog.java | 4 + .../rocketmq/store/DefaultMessageStore.java | 4 + .../rocketmq/store/MappedFileQueue.java | 106 ++++++++++++++++-- .../apache/rocketmq/store/MessageStore.java | 2 + .../tools/admin/DefaultMQAdminExt.java | 14 +++ .../tools/admin/DefaultMQAdminExtImpl.java | 9 ++ .../rocketmq/tools/admin/MQAdminExt.java | 11 ++ .../AddCommitLogStorePathSubCommand.java | 88 +++++++++++++++ 17 files changed, 398 insertions(+), 103 deletions(-) create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/header/AddCommitLogStorePathRequestHeader.java create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/broker/AddCommitLogStorePathSubCommand.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 9b89c85c143..3fc34960410 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -158,7 +158,7 @@ public BrokerController(// if (this.brokerConfig.getNamesrvAddr() != null) { this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); - log.info("user specfied name server address: {}", this.brokerConfig.getNamesrvAddr()); + log.info("user specified name server address: {}", this.brokerConfig.getNamesrvAddr()); } this.slaveSynchronize = new SlaveSynchronize(this); @@ -369,7 +369,7 @@ public void run() { } public void registerProcessor() { - /** + /* * SendMessageProcessor */ SendMessageProcessor sendProcessor = new SendMessageProcessor(this); @@ -382,13 +382,13 @@ public void registerProcessor() { this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); - /** + /* * PullMessageProcessor */ this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); - /** + /* * QueryMessageProcessor */ NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this); @@ -398,7 +398,7 @@ public void registerProcessor() { this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor); - /** + /* * ClientManageProcessor */ ClientManageProcessor clientProcessor = new ClientManageProcessor(this); @@ -408,7 +408,7 @@ public void registerProcessor() { this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); - /** + /* * ConsumerManageProcessor */ ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this); @@ -420,13 +420,13 @@ public void registerProcessor() { this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); - /** + /* * EndTransactionProcessor */ this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); - /** + /* * Default */ AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this); @@ -448,7 +448,7 @@ public void protectBroker() { while (it.hasNext()) { final Map.Entry next = it.next(); final long fallBehindBytes = next.getValue().getValue().get(); - if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) { + if (fallBehindBytes > this.brokerConfig.getConsumerFallBehindThreshold()) { final String[] split = next.getValue().getStatsKey().split("@"); final String group = split[2]; LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java index 5b15d7953c8..5ef9c11b572 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -125,7 +125,7 @@ public static BrokerController createBrokerController(String[] args) { properties = new Properties(); properties.load(in); - parsePropertie2SystemEnv(properties); + parseProperties2SystemEnv(properties); MixAll.properties2Object(properties, brokerConfig); MixAll.properties2Object(properties, nettyServerConfig); MixAll.properties2Object(properties, nettyClientConfig); @@ -212,9 +212,9 @@ public void run() { log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet()); if (!this.hasShutdown) { this.hasShutdown = true; - long begineTime = System.currentTimeMillis(); + long beginTime = System.currentTimeMillis(); controller.shutdown(); - long consumingTimeTotal = System.currentTimeMillis() - begineTime; + long consumingTimeTotal = System.currentTimeMillis() - beginTime; log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); } } @@ -230,7 +230,7 @@ public void run() { return null; } - private static void parsePropertie2SystemEnv(Properties properties) { + private static void parseProperties2SystemEnv(Properties properties) { if (properties == null) { return; } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 8bf48acc4dd..4571a5027da 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -18,6 +18,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; + +import java.io.File; import java.io.UnsupportedEncodingException; import java.net.UnknownHostException; import java.util.ArrayList; @@ -61,33 +63,7 @@ import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; -import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; -import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; -import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader; -import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader; -import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetAllTopicConfigResponseHeader; -import org.apache.rocketmq.common.protocol.header.GetBrokerConfigResponseHeader; -import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader; -import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; -import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader; -import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; -import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader; -import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader; -import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader; -import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader; -import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; -import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader; -import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; -import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader; +import org.apache.rocketmq.common.protocol.header.*; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; @@ -187,6 +163,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand return ViewBrokerStatsData(ctx, request); case RequestCode.GET_BROKER_CONSUME_STATS: return fetchAllConsumeStatsInBroker(ctx, request); + case RequestCode.ADD_COMMIT_LOG_STORE_PATH: + return addCommitLogStorePath(ctx, request); default: break; } @@ -1242,4 +1220,47 @@ private RemotingCommand callConsumer(// } } + private RemotingCommand addCommitLogStorePath(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final AddCommitLogStorePathRequestHeader requestHeader = (AddCommitLogStorePathRequestHeader) + request.decodeCommandCustomHeader(AddCommitLogStorePathRequestHeader.class); + log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + String existingCommitLogPath = brokerController.getMessageStoreConfig().getStorePathCommitLog(); + if (existingCommitLogPath.contains(requestHeader.getCommitLogPath())) { + String errorMessage = "Store path being added is in use"; + log.warn(errorMessage); + + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(errorMessage); + return response; + } + + File dir = new File(requestHeader.getCommitLogPath()); + boolean ok = true; + if (dir.exists()) { + ok = ok && dir.isDirectory(); + ok = ok && dir.canWrite(); + ok = ok && dir.canRead(); + } else { + ok = ok && dir.mkdirs(); + } + + if (!ok) { + String errorMessage = "Directory denoted by specified commit log path is not OK, check permission please"; + log.warn(errorMessage); + + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(errorMessage); + return response; + } + + brokerController.getMessageStoreConfig().setStorePathCommitLog(existingCommitLogPath + "," + + requestHeader.getCommitLogPath()); + brokerController.getMessageStore().updateCommitLogStorePath(); + brokerController.flushAllConfig(); + + response.setCode(ResponseCode.SUCCESS); + return response; + } + } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 983e5157ea9..c1d509e7c11 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -42,14 +42,14 @@ import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.AddCommitLogStorePathRequestHeader; import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader; import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.common.RemotingUtil; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.*; import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.slf4j.Logger; @@ -413,4 +413,10 @@ public void operationComplete(ResponseFuture responseFuture) { throw new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "The topic[" + topic + "] not matched route info"); } + + public void addCommitLogStorePath(String brokerAddress, AddCommitLogStorePathRequestHeader requestHeader) + throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException { + this.mQClientFactory.getMQClientAPIImpl().addCommitLogStorePath(brokerAddress, requestHeader); + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 12580c14977..284e215ecfc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -77,47 +77,7 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; -import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; -import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; -import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; -import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader; -import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader; -import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader; -import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader; -import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody; -import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; -import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader; -import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; -import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader; -import org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader; -import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; -import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; -import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader; -import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; -import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; -import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader; -import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader; -import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader; -import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; -import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader; -import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; -import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; -import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; -import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; -import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader; -import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; -import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader; -import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.*; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader; @@ -2008,4 +1968,27 @@ public Map getNameServerConfig(final List nameServer return configMap; } + public void addCommitLogStorePath(String brokerAddress, AddCommitLogStorePathRequestHeader requestHeader, long timeoutMills) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQBrokerException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ADD_COMMIT_LOG_STORE_PATH, + requestHeader); + RemotingCommand response = this.remotingClient.invokeSync(brokerAddress, request, timeoutMills); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: + return; + default: + break; + } + + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + + public void addCommitLogStorePath(String brokerAddress, AddCommitLogStorePathRequestHeader requestHeader) + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQBrokerException { + addCommitLogStorePath(brokerAddress, requestHeader, 3000); + } + } \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index 016da0b44c5..3bfd449f68d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -28,11 +28,7 @@ import java.text.NumberFormat; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.Date; -import java.util.Enumeration; -import java.util.Iterator; -import java.util.Map; +import java.util.*; import java.util.zip.CRC32; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; @@ -495,4 +491,38 @@ public static byte[] getIP() { throw new RuntimeException("Can not get local ip", e); } } + + + public static String selectCommitLogStorePath(String pathCSV) { + if (!pathCSV.contains(",")) { + return pathCSV; + } + + String[] paths = pathCSV.split(","); + + if (paths.length == 1) { + return paths[0]; + } + + List> pathList = new ArrayList>(); + for (String path : paths) { + + if (null == path || path.trim().length() == 0) { + continue; + } + + Pair pair = new Pair(path, new File(path).getFreeSpace()); + pathList.add(pair); + } + + Collections.sort(pathList, new Comparator>() { + @Override + public int compare(Pair lhs, Pair rhs) { + return lhs.getObject2() > rhs.getObject2() ? -1 : (lhs.getObject2().equals(rhs.getObject2()) ? lhs.getObject1().compareTo(rhs.getObject1()) : 1); + } + }); + + return pathList.get(0).getObject1(); + } + } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index 217e8df9ac7..03a4be3b01f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -159,4 +159,6 @@ public class RequestCode { * get config from name server */ public static final int GET_NAMESRV_CONFIG = 319; + + public static final int ADD_COMMIT_LOG_STORE_PATH = 320; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/AddCommitLogStorePathRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AddCommitLogStorePathRequestHeader.java new file mode 100644 index 00000000000..460e9959612 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AddCommitLogStorePathRequestHeader.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + +public class AddCommitLogStorePathRequestHeader implements CommandCustomHeader { + + @CFNotNull + private String commitLogPath; + + @Override + public void checkFields() throws RemotingCommandException { + + } + + public String getCommitLogPath() { + return commitLogPath; + } + + public void setCommitLogPath(String commitLogPath) { + this.commitLogPath = commitLogPath; + } +} + diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java index 76c7b42791f..db7516d5699 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java @@ -14,10 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -/** - * $Id: EndTransactionRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $ - */ package org.apache.rocketmq.common.protocol.header; import org.apache.rocketmq.remoting.CommandCustomHeader; diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 5ebab545137..7fb2f1968ad 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -821,6 +821,10 @@ private void releasePutMessageLock() { } } + public void updateCommitLogStorePath() { + mappedFileQueue.setStorePath(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog()); + } + public static class GroupCommitRequest { private final long nextOffset; private final CountDownLatch countDownLatch = new CountDownLatch(1); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 3a43c21b76d..3194270ca5c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -1306,6 +1306,10 @@ public void run() { }, 6, TimeUnit.SECONDS); } + public void updateCommitLogStorePath() { + this.commitLog.updateCommitLogStorePath(); + } + class CleanCommitLogService { private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20; diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index f98b26a194b..5bab65bc071 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -17,13 +17,19 @@ package org.apache.rocketmq.store; import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.ListIterator; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.slf4j.Logger; @@ -35,7 +41,10 @@ public class MappedFileQueue { private static final int DELETE_FILES_BATCH_MAX = 10; - private final String storePath; + /** + * commit log store path in form of CSV, allowing multiple commit log paths. + */ + private String storePath; private final int mappedFileSize; @@ -48,6 +57,9 @@ public class MappedFileQueue { private volatile long storeTimestamp = 0; + private static final String MAPPED_FILE_NAME_PATTERN_STRING = "\\d+"; + private static final Pattern MAPPED_FILE_NAME_PATTERN = Pattern.compile(MAPPED_FILE_NAME_PATTERN_STRING); + public MappedFileQueue(final String storePath, int mappedFileSize, AllocateMappedFileService allocateMappedFileService) { this.storePath = storePath; @@ -145,17 +157,70 @@ private void deleteExpiredFile(List files) { } public boolean load() { - File dir = new File(this.storePath); - File[] files = dir.listFiles(); - if (files != null) { + File[] dirs; + if (!this.storePath.contains(",")) { + dirs = new File[]{new File(this.storePath)}; + } else { + String[] storePaths = storePath.split(","); + dirs = new File[storePaths.length]; + for (int i = 0; i < dirs.length; i++) { + dirs[i] = new File(storePaths[i]); + } + } + + List files = new ArrayList<>(); + for (File dir : dirs) { + File[] commitLogFiles = dir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + Matcher matcher = MAPPED_FILE_NAME_PATTERN.matcher(name); + return matcher.matches(); + } + }); + if (null != commitLogFiles) { + files.addAll(Arrays.asList(commitLogFiles)); + } + } + + if (!files.isEmpty()) { // ascending order - Arrays.sort(files); + Collections.sort(files, new Comparator() { + @Override + public int compare(File lhs, File rhs) { + return lhs.getName().compareTo(rhs.getName()); + } + }); + + long prevOffset = 0; + long currentOffset; + boolean first = true; for (File file : files) { + // Validate length if (file.length() != this.mappedFileSize) { - log.warn(file + "\t" + file.length() - + " length not matched message store config value, ignore it"); - return true; + log.warn(file + "\t" + file.length() + " length does not match message store config value, ignore it"); + continue; + } + + currentOffset = Long.parseLong(file.getName()); + if (first) { + first = false; + prevOffset = currentOffset; + } else if (currentOffset - prevOffset != mappedFileSize) { + log.error("Mapped file queue is tampered. Previous offset: {}; current offset: {}", + prevOffset, currentOffset); + + // Debug info + log.warn("Commit log store paths are: {}", storePath); + log.warn("Files in order are:"); + for (File f : files) { + log.warn(f.getAbsolutePath()); + } + // End of debug + + return false; + } else { + prevOffset = currentOffset; } try { @@ -204,9 +269,10 @@ public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) } if (createOffset != -1 && needCreate) { - String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); - String nextNextFilePath = this.storePath + File.separator - + UtilAll.offset2FileName(createOffset + this.mappedFileSize); + String selectedMappedFileStorePath = UtilAll.selectCommitLogStorePath(this.storePath); + String nextFilePath = resolveMappedFilePath(selectedMappedFileStorePath, createOffset); + String nextNextFilePath = resolveMappedFilePath(selectedMappedFileStorePath, createOffset + + this.mappedFileSize); MappedFile mappedFile = null; if (this.allocateMappedFileService != null) { @@ -233,6 +299,20 @@ public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) return mappedFileLast; } + private String resolveMappedFilePath(final String selectedMappedFileStorePath, final long startOffset) { + // The file might have been created previously. + String[] paths = this.storePath.split(","); + for (String path : paths) { + String filePath = path + File.separator + UtilAll.offset2FileName(startOffset); + File file = new File(filePath); + if (file.exists()) { + return filePath; + } + } + + return selectedMappedFileStorePath + File.separator + UtilAll.offset2FileName(startOffset); + } + public MappedFile getLastMappedFile(final long startOffset) { return getLastMappedFile(startOffset, true); } @@ -588,4 +668,8 @@ public long getCommittedWhere() { public void setCommittedWhere(final long committedWhere) { this.committedWhere = committedWhere; } + + public void setStorePath(String storePath) { + this.storePath = storePath; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 79e3a8fd1c8..9e3d1f6f112 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -102,4 +102,6 @@ QueryMessageResult queryMessage(final String topic, final String key, final int long lockTimeMills(); boolean isTransientStorePoolDeficient(); + + void updateCommitLogStorePath(); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 19bff895b0a..ed74885290f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -469,4 +469,18 @@ public Map getNameServerConfig(final List nameServer UnsupportedEncodingException { return this.defaultMQAdminExtImpl.getNameServerConfig(nameServers); } + + /** + * Add a new commit log store path. + * @param brokerAddress Broker address with port specified. + * @param storePathCommitLog new commit log path. + * @throws RemotingException if there is any error in remoting communication. + * @throws InterruptedException if the execution is interrupted. + * @throws MQBrokerException if broker encounters any error + */ + @Override + public void addCommitLogStorePath(String brokerAddress, String storePathCommitLog) + throws RemotingException, InterruptedException, MQBrokerException { + this.defaultMQAdminExtImpl.addCommitLogStorePath(brokerAddress, storePathCommitLog); + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index a31b69d927e..81075c0feaf 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -67,6 +67,7 @@ import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; +import org.apache.rocketmq.common.protocol.header.AddCommitLogStorePathRequestHeader; import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.route.BrokerData; @@ -955,4 +956,12 @@ public Map getNameServerConfig(final List nameServer return this.mqClientInstance.getMQClientAPIImpl().getNameServerConfig(nameServers, timeoutMillis); } + @Override + public void addCommitLogStorePath(String brokerAddress, String storePathCommitLog) + throws RemotingException, InterruptedException, MQBrokerException { + AddCommitLogStorePathRequestHeader requestHeader = new AddCommitLogStorePathRequestHeader(); + requestHeader.setCommitLogPath(storePathCommitLog); + this.mqClientInstance.getMQAdminImpl().addCommitLogStorePath(brokerAddress, requestHeader); + + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 493cf5400d9..7b4f556c83a 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -241,4 +241,15 @@ void updateNameServerConfig(final Properties properties, final List name Map getNameServerConfig(final List nameServers) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException, UnsupportedEncodingException; + + /** + * Add a new commit log store path. + * @param brokerAddress Broker address with port specified. + * @param storePathCommitLog new commit log path. + * @throws RemotingException if there is any error in remoting communication. + * @throws InterruptedException if the execution is interrupted. + * @throws MQBrokerException if broker encounters any error + */ + void addCommitLogStorePath(String brokerAddress, String storePathCommitLog) + throws RemotingException, InterruptedException, MQBrokerException; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/AddCommitLogStorePathSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/AddCommitLogStorePathSubCommand.java new file mode 100644 index 00000000000..b9f28d5a04f --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/AddCommitLogStorePathSubCommand.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; + +public class AddCommitLogStorePathSubCommand implements SubCommand { + + @Override + public String commandName() { + return "addCommitLogStorePath"; + } + + @Override + public String commandDesc() { + return "Add store path for commit log"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option option = new Option("p", "path", true, "Commit log path"); + option.setRequired(true); + options.addOption(option); + + Option brokerAddressOption = new Option("b", "brokerAddress", true, "Broker address"); + brokerAddressOption.setRequired(true); + options.addOption(brokerAddressOption); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt adminExt = new DefaultMQAdminExt(String.valueOf(System.currentTimeMillis())); + + String brokerAddress = null; + + if (commandLine.hasOption("b")) { + brokerAddress = commandLine.getOptionValue("b"); + } else { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.printHelp("Broker address missing", options); + return; + } + + String storePathCommitLog = null; + if (commandLine.hasOption("p")) { + storePathCommitLog = commandLine.getOptionValue("p"); + } else { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.printHelp("storePathCommitLog missing", options); + return; + } + + try { + adminExt.start(); + adminExt.addCommitLogStorePath(brokerAddress, storePathCommitLog); + System.out.printf("Commit log store path added OK"); + } catch (MQClientException | InterruptedException | MQBrokerException | RemotingException e) { + e.printStackTrace(); + } finally { + adminExt.shutdown(); + } + } +} From 83c97849f7a06ff3c249824beb06bae543a7d6ec Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Thu, 5 Jan 2017 12:57:20 +0800 Subject: [PATCH 2/6] Persist commit log store path after changing. --- .../processor/AdminBrokerProcessor.java | 40 +++++++++++++++++-- .../store/config/MessageStoreConfig.java | 14 +++++-- 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 4571a5027da..9a64e253082 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -63,7 +63,34 @@ import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; -import org.apache.rocketmq.common.protocol.header.*; +import org.apache.rocketmq.common.protocol.header.AddCommitLogStorePathRequestHeader; +import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; +import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader; +import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader; +import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetAllTopicConfigResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetBrokerConfigResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader; +import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader; +import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader; +import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; @@ -1254,10 +1281,15 @@ private RemotingCommand addCommitLogStorePath(ChannelHandlerContext ctx, Remotin return response; } - brokerController.getMessageStoreConfig().setStorePathCommitLog(existingCommitLogPath + "," - + requestHeader.getCommitLogPath()); + String renewedCommitLogStorePath = existingCommitLogPath + "," + requestHeader.getCommitLogPath(); + + brokerController.getMessageStoreConfig().setStorePathCommitLog(renewedCommitLogStorePath); brokerController.getMessageStore().updateCommitLogStorePath(); - brokerController.flushAllConfig(); + + // Persist + Properties properties = new Properties(); + properties.put("storePathCommitLog", renewedCommitLogStorePath); + brokerController.getConfiguration().update(properties); response.setCode(ResponseCode.SUCCESS); return response; diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 9cfd1c34caa..db1f5bf4100 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -21,17 +21,25 @@ import org.apache.rocketmq.store.ConsumeQueue; public class MessageStoreConfig { - //The root directory in which the log data is kept + + /** + * The root directory in which the log data is kept + */ @ImportantField private String storePathRootDir = System.getProperty("user.home") + File.separator + "store"; - //The directory in which the commitlog is kept + /** + * The directory in which the commit log files are stored + */ @ImportantField private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog"; - // CommitLog file size,default is 1G + /** + * CommitLog file size,default is 1G + */ private int mapedFileSizeCommitLog = 1024 * 1024 * 1024; + // ConsumeQueue file size,default is 30W private int mapedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE; From 21accafca95e69eba372fc6ed85fd87d583e870f Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Thu, 5 Jan 2017 14:32:48 +0800 Subject: [PATCH 3/6] Fix code style. --- .../main/java/org/apache/rocketmq/common/UtilAll.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index 3bfd449f68d..52c69058e66 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -28,7 +28,15 @@ import java.text.NumberFormat; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.zip.CRC32; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; From c06de73239d377c187bff4cd6f152227c96eb011 Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Thu, 5 Jan 2017 14:37:32 +0800 Subject: [PATCH 4/6] Fix code style. --- .../rocketmq/client/impl/MQAdminImpl.java | 8 +++- .../rocketmq/client/impl/MQClientAPIImpl.java | 43 ++++++++++++++++++- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index c1d509e7c11..68dc476d2c6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -49,7 +49,11 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.common.RemotingUtil; -import org.apache.rocketmq.remoting.exception.*; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.slf4j.Logger; @@ -416,7 +420,7 @@ public void operationComplete(ResponseFuture responseFuture) { public void addCommitLogStorePath(String brokerAddress, AddCommitLogStorePathRequestHeader requestHeader) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, - RemotingConnectException { + RemotingConnectException { this.mQClientFactory.getMQClientAPIImpl().addCommitLogStorePath(brokerAddress, requestHeader); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 284e215ecfc..3cb680a22b4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -77,7 +77,48 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; -import org.apache.rocketmq.common.protocol.header.*; +import org.apache.rocketmq.common.protocol.header.AddCommitLogStorePathRequestHeader; +import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; +import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; +import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader; +import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader; +import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader; +import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody; +import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader; +import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader; +import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader; +import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; +import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; +import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader; +import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader; +import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader; From aedfc0d8d62b9b39a49a4e5a14ae6dc77297fc3f Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Thu, 5 Jan 2017 15:35:02 +0800 Subject: [PATCH 5/6] Add command to mqadmin --- .../java/org/apache/rocketmq/tools/command/MQAdminStartup.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index fdf8e219d6d..bcd5f6f50c6 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.command.broker.AddCommitLogStorePathSubCommand; import org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad; import org.apache.rocketmq.tools.command.broker.BrokerStatusSubCommand; import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand; @@ -189,6 +190,7 @@ public static void initCommand() { initCommand(new GetNamesrvConfigCommand()); initCommand(new UpdateNamesrvConfigCommand()); initCommand(new GetBrokerConfigCommand()); + initCommand(new AddCommitLogStorePathSubCommand()); } private static void initLogback() throws JoranException { From b542cb54b302ea3d2fdbe0d82960a8d7ffc90547 Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Thu, 5 Jan 2017 18:10:22 +0800 Subject: [PATCH 6/6] Fix merge issue. --- .../main/java/org/apache/rocketmq/broker/BrokerController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 3fc34960410..8bbfc55cbe4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -448,7 +448,7 @@ public void protectBroker() { while (it.hasNext()) { final Map.Entry next = it.next(); final long fallBehindBytes = next.getValue().getValue().get(); - if (fallBehindBytes > this.brokerConfig.getConsumerFallBehindThreshold()) { + if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) { final String[] split = next.getValue().getStatsKey().split("@"); final String group = split[2]; LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes);