From ef09bcc13fb9fee71255f78568e5c59fd252064b Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Mon, 6 Mar 2017 17:46:53 +0800 Subject: [PATCH 1/3] Add doc --- .../apache/rocketmq/broker/BrokerStartup.java | 49 ++++++++++++++++++- .../apache/rocketmq/srvutil/ServerUtil.java | 5 +- 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java index 39ee8dd2e58..5a78ea72e2c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -42,16 +42,48 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + *

+ * This class is to read and parse command line arguments then start broker servers. + *

+ */ public class BrokerStartup { + + /** + * {@link Properties} instance to store key-value pairs of specified broker configuration file. + */ public static Properties properties = null; + + /** + * Command line argument parser. + */ public static CommandLine commandLine = null; + + /** + * Broker configuration file. + */ public static String configFile = null; + + /** + * Logger instance. + */ public static Logger log; + /** + * Main method, all things start here. + * + * @param args command line argument. + */ public static void main(String[] args) { start(createBrokerController(args)); } + /** + * Starts the {@link BrokerController} instance. + * + * @param controller BrokerController instance. + * @return The same broker controller instance if successful; null otherwise. + */ public static BrokerController start(BrokerController controller) { try { controller.start(); @@ -72,13 +104,21 @@ public static BrokerController start(BrokerController controller) { return null; } + /** + * This method creates {@link BrokerController} instance using command line arguments. + * + * @param args Command line arguments. + * @return Instance of {@link BrokerController}. + */ public static BrokerController createBrokerController(String[] args) { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); + // Default send buffer size to 128KB if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { NettySystemConfig.socketSndbufSize = 131072; } + // Default receive buffer size to 128KB if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { NettySystemConfig.socketRcvbufSize = 131072; } @@ -95,15 +135,19 @@ public static BrokerController createBrokerController(String[] args) { final BrokerConfig brokerConfig = new BrokerConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); final NettyClientConfig nettyClientConfig = new NettyClientConfig(); - nettyServerConfig.setListenPort(10911); + nettyServerConfig.setListenPort(10911); // Set default broker listen port final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) { + + // Use a conservative value for slave brokers so that consumer groups won't frequently change targeting + // brokers when they are consuming messages at the boundary between being probably resident in physical + // memory and swapped out. int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10; messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio); } - if (commandLine.hasOption('p')) { + if (commandLine.hasOption('p')) { // print configurable options. MixAll.printObjectProperties(null, brokerConfig); MixAll.printObjectProperties(null, nettyServerConfig); MixAll.printObjectProperties(null, nettyClientConfig); @@ -117,6 +161,7 @@ public static BrokerController createBrokerController(String[] args) { System.exit(0); } + // Set broker via specified configuration file. if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java index 066d36cedd9..d17b14b1f7c 100644 --- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java +++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java @@ -31,9 +31,8 @@ public static Options buildCommandlineOptions(final Options options) { opt.setRequired(false); options.addOption(opt); - opt = - new Option("n", "namesrvAddr", true, - "Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876"); + opt = new Option("n", "namesrvAddr", true, + "Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876"); opt.setRequired(false); options.addOption(opt); From 678ea43fa77ebe213ea4f4c76fc3a3693dd44d2f Mon Sep 17 00:00:00 2001 From: Zhanhui Li Date: Fri, 10 Mar 2017 18:26:01 +0800 Subject: [PATCH 2/3] Add javadoc. --- .../rocketmq/broker/BrokerController.java | 10 +++ .../apache/rocketmq/broker/BrokerStartup.java | 69 ++++++++++++------- .../AbstractSendMessageProcessor.java | 13 ++-- .../processor/SendMessageProcessor.java | 30 +++++++- .../remoting/netty/NettyRemotingAbstract.java | 30 ++++++-- .../remoting/netty/NettyRemotingClient.java | 6 +- .../remoting/netty/NettyRemotingServer.java | 38 ++++++---- .../remoting/netty/NettyRequestProcessor.java | 26 ++++++- 8 files changed, 165 insertions(+), 57 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index b656870b9e1..984fcfa0d46 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -86,6 +86,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + *

+ * This class, as its name implies, is the core controller of the broker server, which manages various internal + * services and resources. + *

+ * + *

+ * Thread Safety: Technically, this class itself is not thread-safe. + *

+ */ public class BrokerController { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java index 5a78ea72e2c..f9f7d66bc3e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -19,6 +19,7 @@ import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; import java.io.BufferedInputStream; +import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import java.util.Properties; @@ -44,7 +45,12 @@ /** *

- * This class is to read and parse command line arguments then start broker servers. + * This class is to read and parse command line arguments then start broker servers. It's the entry class of the + * broker servers. + *

+ * + *

+ * Thread Safety: This class is the bootstrap for broker servers, no thread-safety is required. *

*/ public class BrokerStartup { @@ -147,13 +153,14 @@ public static BrokerController createBrokerController(String[] args) { messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio); } - if (commandLine.hasOption('p')) { // print configurable options. + // FIXME: log should not be null + if (commandLine.hasOption('p')) { // print all configurables. MixAll.printObjectProperties(null, brokerConfig); MixAll.printObjectProperties(null, nettyServerConfig); MixAll.printObjectProperties(null, nettyClientConfig); MixAll.printObjectProperties(null, messageStoreConfig); System.exit(0); - } else if (commandLine.hasOption('m')) { + } else if (commandLine.hasOption('m')) { // print all important configurations MixAll.printObjectProperties(null, brokerConfig, true); MixAll.printObjectProperties(null, nettyServerConfig, true); MixAll.printObjectProperties(null, nettyClientConfig, true); @@ -161,31 +168,38 @@ public static BrokerController createBrokerController(String[] args) { System.exit(0); } - // Set broker via specified configuration file. + // Load and apply configurations through file. if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); - if (file != null) { - configFile = file; - InputStream in = new BufferedInputStream(new FileInputStream(file)); - properties = new Properties(); - properties.load(in); - - properties2SystemEnv(properties); - MixAll.properties2Object(properties, brokerConfig); - MixAll.properties2Object(properties, nettyServerConfig); - MixAll.properties2Object(properties, nettyClientConfig); - MixAll.properties2Object(properties, messageStoreConfig); - - BrokerPathConfigHelper.setBrokerConfigPath(file); - in.close(); + if (file != null && !file.trim().isEmpty()) { + configFile = file.trim(); + File f = new File(configFile); + if (f.exists() && f.canRead()) { + try (InputStream in = new BufferedInputStream(new FileInputStream(f))) { + properties = new Properties(); + properties.load(in); + + properties2SystemEnv(properties); + MixAll.properties2Object(properties, brokerConfig); + MixAll.properties2Object(properties, nettyServerConfig); + MixAll.properties2Object(properties, nettyClientConfig); + MixAll.properties2Object(properties, messageStoreConfig); + + BrokerPathConfigHelper.setBrokerConfigPath(file); + } + } else { + System.out.printf("%s does not exist, abort.", file); + System.exit(-2); + } } } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig); + // Check ROCKETMQ_HOME environment variable. if (null == brokerConfig.getRocketmqHome()) { - System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV - + " variable in your environment to match the location of the RocketMQ installation"); + System.out.printf("Please set the %s variable in your environment to match the location of the " + + "RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } @@ -197,8 +211,7 @@ public static BrokerController createBrokerController(String[] args) { RemotingUtil.string2SocketAddress(addr); } } catch (Exception e) { - System.out.printf( - "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n", + System.out.printf("The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n", namesrvAddr); System.exit(-3); } @@ -206,21 +219,26 @@ public static BrokerController createBrokerController(String[] args) { switch (messageStoreConfig.getBrokerRole()) { case ASYNC_MASTER: + // fall through on purpose. case SYNC_MASTER: brokerConfig.setBrokerId(MixAll.MASTER_ID); break; + case SLAVE: if (brokerConfig.getBrokerId() <= 0) { - System.out.printf("Slave's brokerId must be > 0"); + System.out.printf("brokerId of slave broker must be a positive integer"); System.exit(-3); } - break; + default: break; } + // Set HA port messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); + + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); @@ -238,15 +256,18 @@ public static BrokerController createBrokerController(String[] args) { nettyServerConfig, // nettyClientConfig, // messageStoreConfig); + // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); + // Initialize broker controller. boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } + // Register shutdown hook to shutdown various services. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; private AtomicInteger shutdownTimes = new AtomicInteger(0); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 9f23bade96d..829cd2037b9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -273,21 +273,18 @@ public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final } } - protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request) - throws RemotingCommandException { + protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request) throws RemotingCommandException { SendMessageRequestHeaderV2 requestHeaderV2 = null; SendMessageRequestHeader requestHeader = null; switch (request.getCode()) { case RequestCode.SEND_MESSAGE_V2: - requestHeaderV2 = - (SendMessageRequestHeaderV2) request - .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); + requestHeaderV2 = (SendMessageRequestHeaderV2) + request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); case RequestCode.SEND_MESSAGE: if (null == requestHeaderV2) { - requestHeader = - (SendMessageRequestHeader) request - .decodeCommandCustomHeader(SendMessageRequestHeader.class); + requestHeader = (SendMessageRequestHeader) request + .decodeCommandCustomHeader(SendMessageRequestHeader.class); } else { requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index a4404621c0d..b1cc805a9bc 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -50,6 +50,13 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.stats.BrokerStatsManager; +/** + * This processor handles all send message requests. + * + *

+ * Thread Safety: This class is thread safe. + *

+ */ public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { private List consumeMessageHookList; @@ -58,12 +65,21 @@ public SendMessageProcessor(final BrokerController brokerController) { super(brokerController); } + /** + * Process various send message request per request code. + * + * @param ctx The channel handler context. + * @param request Incoming request. + * @return process result. + * @throws RemotingCommandException if there is any error. + */ @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this.consumerSendMsgBack(ctx, request); + default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { @@ -79,6 +95,11 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand } } + /** + * Reject the request when system is over-loaded. + * + * @return true if rejecting the request; false otherwise. + */ @Override public boolean rejectRequest() { return this.brokerController.getMessageStore().isOSPageCacheBusy() || @@ -246,6 +267,7 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); + // Set opaque value in case of returning prematurely response.setOpaque(request.getOpaque()); response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); @@ -255,13 +277,15 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // log.debug("receive SendMessage request command, {}", request); } - final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); - if (this.brokerController.getMessageStore().now() < startTimstamp) { + // confirm the current broker is currently serving. + final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); + if (this.brokerController.getMessageStore().now() < startTimestamp) { response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); + response.setRemark(String.format("broker will not serve until %s", UtilAll.timeMillisToHumanString2(startTimestamp))); return response; } + // validate message response.setCode(-1); super.msgCheck(ctx, requestHeader, response); if (response.getCode() != -1) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 08b51db3103..76ab5c775c6 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -59,7 +59,7 @@ public abstract class NettyRemotingAbstract { protected final HashMap> processorTable = new HashMap>(64); - protected final NettyEventExecuter nettyEventExecuter = new NettyEventExecuter(); + protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor(); protected Pair defaultRequestProcessor; @@ -71,25 +71,47 @@ public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) { public abstract ChannelEventListener getChannelEventListener(); public void putNettyEvent(final NettyEvent event) { - this.nettyEventExecuter.putNettyEvent(event); + this.nettyEventExecutor.putNettyEvent(event); } public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { + // handle incoming requests, for example, new message arrival. case REQUEST_COMMAND: processRequestCommand(ctx, cmd); break; + + // handle response from name servers, consumers and producers. case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; + default: break; } } } + /** + *

+ * This method handles incoming requests. + *

+ * + *
    + *
  1. + * Look up [processor, thread-pool] pair among registered processorTable by request code, + * if not found, default processor is used. + *
  2. + *
  3. + * Wrap the request into a task and then submit the task into the thread-pool. + *
  4. + *
+ * + * @param ctx The channel handler context. + * @param cmd The remoting command instance, containing all required data per protocol. + */ public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { final Pair matched = this.processorTable.get(cmd.getCode()); final Pair pair = null == matched ? this.defaultRequestProcessor : matched; @@ -389,7 +411,7 @@ public void operationComplete(ChannelFuture f) throws Exception { } } - class NettyEventExecuter extends ServiceThread { + class NettyEventExecutor extends ServiceThread { private final LinkedBlockingQueue eventQueue = new LinkedBlockingQueue(); private final int maxSize = 10000; @@ -439,7 +461,7 @@ public void run() { @Override public String getServiceName() { - return NettyEventExecuter.class.getSimpleName(); + return NettyEventExecutor.class.getSimpleName(); } } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 85f9244d2d0..a83e23ae47b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -172,7 +172,7 @@ public void run() { }, 1000 * 3, 1000); if (this.channelEventListener != null) { - this.nettyEventExecuter.start(); + this.nettyEventExecutor.start(); } } @@ -189,8 +189,8 @@ public void shutdown() { this.eventLoopGroupWorker.shutdownGracefully(); - if (this.nettyEventExecuter != null) { - this.nettyEventExecuter.shutdown(); + if (this.nettyEventExecutor != null) { + this.nettyEventExecutor.shutdown(); } if (this.defaultEventExecutorGroup != null) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 6a6df374b2e..207726c14e1 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -160,7 +160,7 @@ public void initChannel(SocketChannel ch) throws Exception { new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), - new NettyConnetManageHandler(), + new NettyConnectManageHandler(), new NettyServerHandler()); } }); @@ -178,7 +178,7 @@ public void initChannel(SocketChannel ch) throws Exception { } if (this.channelEventListener != null) { - this.nettyEventExecuter.start(); + this.nettyEventExecutor.start(); } this.timer.scheduleAtFixedRate(new TimerTask() { @@ -205,8 +205,8 @@ public void shutdown() { this.eventLoopGroupSelector.shutdownGracefully(); - if (this.nettyEventExecuter != null) { - this.nettyEventExecuter.shutdown(); + if (this.nettyEventExecutor != null) { + this.nettyEventExecutor.shutdown(); } if (this.defaultEventExecutorGroup != null) { @@ -289,15 +289,27 @@ public ExecutorService getCallbackExecutor() { return this.publicExecutor; } + /** + *

+ * This is the last inbound handler, which dispatches msg to corresponding processor according to + * RemotingCommand.code. + *

+ * + *

+ * Thread Safety: This class is thread safe. + *

+ */ class NettyServerHandler extends SimpleChannelInboundHandler { - @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processMessageReceived(ctx, msg); } } - class NettyConnetManageHandler extends ChannelDuplexHandler { + /** + * This handler manages connection tier event, aka, log event like connection establishment, closing, timeout etc. + */ + class NettyConnectManageHandler extends ChannelDuplexHandler { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); @@ -319,7 +331,8 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); if (NettyRemotingServer.this.channelEventListener != null) { - NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel())); + NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), + ctx.channel())); } } @@ -330,21 +343,22 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); if (NettyRemotingServer.this.channelEventListener != null) { - NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel())); + NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), + ctx.channel())); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { - IdleStateEvent evnet = (IdleStateEvent) evt; - if (evnet.state().equals(IdleState.ALL_IDLE)) { + IdleStateEvent event = (IdleStateEvent) evt; + if (event.state().equals(IdleState.ALL_IDLE)) { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress); RemotingUtil.closeChannel(ctx.channel()); if (NettyRemotingServer.this.channelEventListener != null) { - NettyRemotingServer.this - .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel())); + NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.IDLE, + remoteAddress.toString(), ctx.channel())); } } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java index 040f7684883..0b80469dbfc 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java @@ -20,11 +20,31 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; /** - * Common remoting command processor + *

+ * This interface defines a contract for request processor. + *

+ * + *

+ * Thread Safety: implementation of this interface MUST be thread-safe as the processor is normally + * executed concurrently. + *

*/ public interface NettyRequestProcessor { - RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) - throws Exception; + /** + * In the high level, this method would process the incoming request then generate a response + * accordingly. + * + * @param ctx The channel handler context. + * @param request Incoming request. + * @return Response command. + * @throws Exception if there is any error. + */ + RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception; + + /** + * Check if current request should be rejected. + * @return true if reject; false otherwise. + */ boolean rejectRequest(); } From 5662cd46a46d0e8c7dacb5ce4be3beecefca78f2 Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Thu, 16 Mar 2017 11:24:27 +0800 Subject: [PATCH 3/3] Add javadoc to send message processor. --- .../processor/SendMessageProcessor.java | 50 ++++++++++++++++++- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index b1cc805a9bc..870b735de6e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -66,7 +66,7 @@ public SendMessageProcessor(final BrokerController brokerController) { } /** - * Process various send message request per request code. + * Process various send message requests per request code. * * @param ctx The channel handler context. * @param request Incoming request. @@ -106,9 +106,31 @@ public boolean rejectRequest() { this.brokerController.getMessageStore().isTransientStorePoolDeficient(); } + /** + *

+ * Handle messages sent back by consumer clients, which are assumed to be re-consumed later. + *

+ * + *

+ * If the retry times do not exceed maximum specified, the message will be store in a topic named after consumer + * group name, see {@link MixAll#getRetryTopic(String)}. These messages will then be delivered to consumers of + * this group after a delay. + *

+ * + *

+ * If the retry times have exceeded maximum times allowed, the message will be stored in topic named + * {@link MixAll#getDLQTopic(String)}, which functions as Dead-Letter-Queue. + *

+ * + * @param ctx The channel handler context. + * @param request Request from client. + * @return Response {@link RemotingCommand} instance. + * @throws RemotingCommandException If there is any error. + */ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setOpaque(request.getOpaque()); final ConsumerSendMsgBackRequestHeader requestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); @@ -124,6 +146,7 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin this.executeConsumeMessageHookAfter(context); } + // Check existence of the consumer group being used. SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); if (null == subscriptionGroupConfig) { @@ -133,18 +156,21 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin return response; } + // Make sure message store of the broker is writable. if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden"); return response; } + // TODO: we should warn client here because OP may carelessly get things wrong here. if (subscriptionGroupConfig.getRetryQueueNums() <= 0) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } + // Get the retry topic and the target queue ID. String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums(); @@ -169,6 +195,7 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin return response; } + // Look up the original message. MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); if (null == msgExt) { response.setCode(ResponseCode.SYSTEM_ERROR); @@ -184,11 +211,14 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin int delayLevel = requestHeader.getDelayLevel(); + // Figure out maximum retry times. Note final call of latest release is up to the client while version prior to + // 3.4.9 is determined by consumer group configuration on broker. int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } + // Check if the message should be put into Dead-Letter-Queue if (msgExt.getReconsumeTimes() >= maxReconsumeTimes// || delayLevel < 0) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); @@ -203,7 +233,7 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin response.setRemark("topic[" + newTopic + "] not exist"); return response; } - } else { + } else { // Compute delay period, after which to re-deliver this message to consumer clients. if (0 == delayLevel) { delayLevel = 3 + msgExt.getReconsumeTimes(); } @@ -211,6 +241,7 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin msgExt.setDelayTimeLevel(delayLevel); } + // Construct internal message representation MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(newTopic); msgInner.setBody(msgExt.getBody()); @@ -229,7 +260,9 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin String originMsgId = MessageAccessor.getOriginMessageId(msgExt); MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId); + // Store the message. PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); + if (putMessageResult != null) { switch (putMessageResult.getPutMessageStatus()) { case PUT_OK: @@ -239,6 +272,7 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin backTopic = correctTopic; } + // Update statistics this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic); response.setCode(ResponseCode.SUCCESS); @@ -259,6 +293,18 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin return response; } + /** + *

+ * Handle send message request. + *

+ * + * @param ctx channel handler context. + * @param request The send message request. + * @param sendMessageContext Send message context + * @param requestHeader Header part of the send message request. + * @return Broker response + * @throws RemotingCommandException if there is any unexpected error. + */ private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // final RemotingCommand request, // final SendMessageContext sendMessageContext, //