Skip to content

Commit

Permalink
[ISSUE #2149] Apache RocketMQ rebalancing architecture optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
tsunghanjacktsai committed Aug 14, 2020
1 parent fac30c3 commit 3549cac
Show file tree
Hide file tree
Showing 40 changed files with 1,594 additions and 52 deletions.
Expand Up @@ -73,6 +73,7 @@
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.ServiceProvider;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion;
Expand Down Expand Up @@ -143,6 +144,8 @@ public class BrokerController {
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final ConcurrentMap<String /* Consumer Group */, AllocateMessageQueueStrategy /* Strategy Object */> allocateMessageQueueStrategyTable
= new ConcurrentHashMap<String, AllocateMessageQueueStrategy>();
private MessageStore messageStore;
private RemotingServer remotingServer;
private RemotingServer fastRemotingServer;
Expand Down Expand Up @@ -605,10 +608,12 @@ public void registerProcessor() {
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.ALLOCATE_MESSAGE_QUEUE, consumerManageProcessor, this.consumerManageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.ALLOCATE_MESSAGE_QUEUE, consumerManageProcessor, this.consumerManageExecutor);

/**
* EndTransactionProcessor
Expand Down Expand Up @@ -741,6 +746,10 @@ public SubscriptionGroupManager getSubscriptionGroupManager() {
return subscriptionGroupManager;
}

public ConcurrentMap<String, AllocateMessageQueueStrategy> getAllocateMessageQueueStrategyTable() {
return allocateMessageQueueStrategyTable;
}

public void shutdown() {
if (this.brokerStatsManager != null) {
this.brokerStatsManager.shutdown();
Expand Down
Expand Up @@ -17,20 +17,35 @@
package org.apache.rocketmq.broker.processor;

import io.netty.channel.ChannelHandlerContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragelyByCircle;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueByConfig;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueByMachineRoom;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueConsistentHash;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueSticky;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
Expand All @@ -57,6 +72,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
case RequestCode.ALLOCATE_MESSAGE_QUEUE:
return this.allocateMessageQueue(ctx, request);
default:
break;
}
Expand Down Expand Up @@ -152,4 +169,82 @@ private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingC

return response;
}

private synchronized RemotingCommand allocateMessageQueue(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(AllocateMessageQueueResponseHeader.class);
final AllocateMessageQueueRequestHeader requestHeader =
(AllocateMessageQueueRequestHeader) request.decodeCommandCustomHeader(AllocateMessageQueueRequestHeader.class);
final AllocateMessageQueueRequestBody requestBody = AllocateMessageQueueRequestBody.decode(request.getBody(),
AllocateMessageQueueRequestBody.class);

AllocateMessageQueueStrategy strategy = null;
String consumerGroup = requestHeader.getConsumerGroup();
String strategyName = requestHeader.getStrategyName();
Map<String, AllocateMessageQueueStrategy> strategyTable = this.brokerController.getAllocateMessageQueueStrategyTable();

if (strategyTable.containsKey(consumerGroup)) {
if (strategyName.equals(strategyTable.get(consumerGroup).getName())) {
strategy = strategyTable.get(consumerGroup);
} else {
response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
response.setRemark("AllocateMessageQueueStrategy cannot be distinct under the same consumerGroup[" + consumerGroup + "]");
return response;
}
} else {
if (strategyName.startsWith(AllocateMessageQueueStrategyConstants.ALLOCATE_MACHINE_ROOM_NEARBY)) {
response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
response.setRemark("AllocateMessageQueueStrategy[" + strategyName + "] is not supported by broker");
return response;
} else {
switch (strategyName) {
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY:
strategy = new AllocateMessageQueueAveragely();
break;
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY_BY_CIRCLE:
strategy = new AllocateMessageQueueAveragelyByCircle();
break;
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_CONFIG:
strategy = new AllocateMessageQueueByConfig();
break;
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_MACHINE_ROOM:
strategy = new AllocateMessageQueueByMachineRoom();
break;
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_CONSISTENT_HASH:
strategy = new AllocateMessageQueueConsistentHash();
break;
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY:
strategy = new AllocateMessageQueueSticky(new HashMap<String, List<MessageQueue>>());
default:
break;
}
}
strategyTable.put(consumerGroup, strategy);
}

ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(consumerGroup);
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
requestHeader.getConsumerGroup(),
requestHeader.getClientID(),
requestBody.getMqAll(),
consumerGroupInfo != null ? consumerGroupInfo.getAllClientId() : null);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}",
strategy.getName(), e);
response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
response.setRemark(e.getMessage());
return response;
}

AllocateMessageQueueResponseBody body = new AllocateMessageQueueResponseBody();
body.setAllocateResult(allocateResult);
response.setBody(body.encode());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);

return response;
}
}
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.processor;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;

import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
import static org.assertj.core.api.Assertions.assertThat;

@RunWith(MockitoJUnitRunner.class)
public class ConsumerManageProcessorTest {
private ConsumerManageProcessor consumerManageProcessor;
@Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
@Mock
private ChannelHandlerContext handlerContext;
@Mock
private Channel channel;

private ClientChannelInfo clientChannelInfo;
private String clientId = UUID.randomUUID().toString();
private String group = "FooBarGroup";
private String topic = "FooBar";
private List<MessageQueue> mqAll = new ArrayList<MessageQueue>();

@Before
public void init() {
consumerManageProcessor = new ConsumerManageProcessor(brokerController);
clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100);

mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 0));
mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 1));
mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 3));
mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 4));

ConsumerData consumerData = createConsumerData(group, topic);
brokerController.getConsumerManager().registerConsumer(
consumerData.getGroupName(),
clientChannelInfo,
consumerData.getConsumeType(),
consumerData.getMessageModel(),
consumerData.getConsumeFromWhere(),
consumerData.getSubscriptionDataSet(),
false);
}

@Test
public void testAllocateMessageQueue() throws RemotingCommandException {
String emptyClientId = "";
RemotingCommand request = buildAllocateMessageQueueRequest(emptyClientId, AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY);
RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
assertThat(response.getRemark()).isEqualTo("currentCID is empty");

request = buildAllocateMessageQueueRequest(clientId, AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY);
response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getBody()).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

@Test
public void testDistinctAllocateMessageQueueStrategyByGroup() throws RemotingCommandException {
RemotingCommand request = buildAllocateMessageQueueRequest(clientId, AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY);
RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getBody()).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);

request = buildAllocateMessageQueueRequest("CID-1", AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY_BY_CIRCLE);
response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
assertThat(response.getRemark()).isEqualTo("AllocateMessageQueueStrategy cannot be distinct under the same consumerGroup[" + group + "]");
}

private RemotingCommand buildAllocateMessageQueueRequest(String clientId, String strategy) {
AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader();
requestHeader.setConsumerGroup(group);
requestHeader.setClientID(clientId);
requestHeader.setStrategyName(strategy);

AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody();
requestBody.setMqAll(mqAll);

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader);
request.setBody(requestBody.encode());
request.makeCustomHeaderToNet();
return request;
}
}
Expand Up @@ -208,7 +208,7 @@ private RemotingCommand createPullMsgCommand(int requestCode) {
return request;
}

static ConsumerData createConsumerData(String group, String topic) {
public static ConsumerData createConsumerData(String group, String topic) {
ConsumerData consumerData = new ConsumerData();
consumerData.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumerData.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
Expand Down

0 comments on commit 3549cac

Please sign in to comment.