Skip to content

Commit

Permalink
[ISSUE #2149] Apache RocketMQ rebalancing architecture optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
tsunghanjacktsai committed Aug 10, 2020
1 parent fac30c3 commit 96b7a00
Show file tree
Hide file tree
Showing 39 changed files with 1,225 additions and 51 deletions.
Expand Up @@ -73,6 +73,7 @@
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.ServiceProvider;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion;
Expand Down Expand Up @@ -143,6 +144,8 @@ public class BrokerController {
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final ConcurrentMap<String /* Consumer Group */, AllocateMessageQueueStrategy /* Strategy Object */> allocateMessageQueueStrategyTable
= new ConcurrentHashMap<String, AllocateMessageQueueStrategy>();
private MessageStore messageStore;
private RemotingServer remotingServer;
private RemotingServer fastRemotingServer;
Expand Down Expand Up @@ -605,10 +608,12 @@ public void registerProcessor() {
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.ALLOCATE_MESSAGE_QUEUE, consumerManageProcessor, this.consumerManageExecutor);

this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.ALLOCATE_MESSAGE_QUEUE, consumerManageProcessor, this.consumerManageExecutor);

/**
* EndTransactionProcessor
Expand Down Expand Up @@ -741,6 +746,10 @@ public SubscriptionGroupManager getSubscriptionGroupManager() {
return subscriptionGroupManager;
}

public ConcurrentMap<String, AllocateMessageQueueStrategy> getAllocateMessageQueueStrategyTable() {
return allocateMessageQueueStrategyTable;
}

public void shutdown() {
if (this.brokerStatsManager != null) {
this.brokerStatsManager.shutdown();
Expand Down
Expand Up @@ -17,20 +17,34 @@
package org.apache.rocketmq.broker.processor;

import io.netty.channel.ChannelHandlerContext;
import java.util.HashMap;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragelyByCircle;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueByConfig;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueByMachineRoom;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueConsistentHash;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueSticky;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
Expand All @@ -57,6 +71,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
case RequestCode.ALLOCATE_MESSAGE_QUEUE:
return this.allocateMessageQueue(ctx, request);
default:
break;
}
Expand Down Expand Up @@ -152,4 +168,75 @@ private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingC

return response;
}

private RemotingCommand allocateMessageQueue(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(AllocateMessageQueueResponseHeader.class);
final AllocateMessageQueueRequestHeader requestHeader =
(AllocateMessageQueueRequestHeader) request.decodeCommandCustomHeader(AllocateMessageQueueRequestHeader.class);
final AllocateMessageQueueRequestBody requestBody = AllocateMessageQueueRequestBody.decode(request.getBody(),
AllocateMessageQueueRequestBody.class);

AllocateMessageQueueStrategy strategy = null;
String consumerGroup = requestHeader.getConsumerGroup();
String strategyName = requestHeader.getStrategyName();

if (this.brokerController.getAllocateMessageQueueStrategyTable().containsKey(consumerGroup)) {
strategy = this.brokerController.getAllocateMessageQueueStrategyTable().get(consumerGroup);
} else {
if (strategyName.startsWith(AllocateMessageQueueStrategyConstants.ALLOCATE_MACHINE_ROOM_NEARBY)) {
response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_STRATEGY_NOT_SUPPORTED);
response.setRemark("The broker does not support message queue strategy " + strategyName);
return response;
} else {
switch (strategyName) {
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY:
strategy = new AllocateMessageQueueAveragely();
break;
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY_BY_CIRCLE:
strategy = new AllocateMessageQueueAveragelyByCircle();
break;
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_CONFIG:
strategy = new AllocateMessageQueueByConfig();
break;
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_MACHINE_ROOM:
strategy = new AllocateMessageQueueByMachineRoom();
break;
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_CONSISTENT_HASH:
strategy = new AllocateMessageQueueConsistentHash();
break;
case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY:
strategy = new AllocateMessageQueueSticky(new HashMap<String, List<MessageQueue>>());
default:
break;
}
}
this.brokerController.getAllocateMessageQueueStrategyTable().put(consumerGroup, strategy);
}

List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
requestHeader.getConsumerGroup(),
requestHeader.getClientID(),
requestBody.getMqAll(),
this.brokerController.getConsumerManager().getConsumerGroupInfo(
requestHeader.getConsumerGroup()).getAllClientId());
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}",
strategy.getName(), e);
response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
response.setRemark(e.getMessage());
return response;
}

AllocateMessageQueueResponseBody body = new AllocateMessageQueueResponseBody();
body.setAllocateResult(allocateResult);
response.setBody(body.encode());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);

return response;
}
}
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.processor;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;

import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class ConsumerManageProcessorTest {
private ConsumerManageProcessor consumerManageProcessor;
@Spy
private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
@Mock
private ChannelHandlerContext handlerContext;
@Mock
private Channel channel;

private ClientChannelInfo clientChannelInfo;
private String clientId = UUID.randomUUID().toString();
private String group = "FooBarGroup";
private String topic = "FooBar";
private List<MessageQueue> mqAll = new ArrayList<MessageQueue>();

@Before
public void init() {
consumerManageProcessor = new ConsumerManageProcessor(brokerController);
clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100);
brokerController.getProducerManager().registerProducer(group, clientChannelInfo);

mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 0));
mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 1));
mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 3));
mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 4));

ConsumerData consumerData = createConsumerData(group, topic);
brokerController.getConsumerManager().registerConsumer(
consumerData.getGroupName(),
clientChannelInfo,
consumerData.getConsumeType(),
consumerData.getMessageModel(),
consumerData.getConsumeFromWhere(),
consumerData.getSubscriptionDataSet(),
false);
}

@Test
public void testAllocateMessageQueue() throws RemotingCommandException {
String emptyClientId = "";
RemotingCommand request = buildAllocateMessageQueueRequest(emptyClientId);
RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
assertThat(response.getRemark()).isEqualTo("currentCID is empty");

request = buildAllocateMessageQueueRequest(clientId);
response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response.getBody()).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

private RemotingCommand buildAllocateMessageQueueRequest(String clientId) {
AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader();
requestHeader.setConsumerGroup(group);
requestHeader.setClientID(clientId);
requestHeader.setStrategyName(AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY);

AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody();
requestBody.setMqAll(mqAll);

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader);
request.setBody(requestBody.encode());
request.makeCustomHeaderToNet();
return request;
}
}
Expand Up @@ -19,17 +19,18 @@
import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;

public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
Expand Down Expand Up @@ -66,6 +67,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
* Consumption pattern,default is clustering
*/
private MessageModel messageModel = MessageModel.CLUSTERING;

/**
* The switch for applying the rebalancing calculation task at the broker side
*/
private boolean rebalanceByBroker = false;
/**
* Message queue listener
*/
Expand Down Expand Up @@ -409,6 +415,14 @@ public void setMessageModel(MessageModel messageModel) {
this.messageModel = messageModel;
}

public boolean isRebalanceByBroker() {
return rebalanceByBroker;
}

public void setRebalanceByBroker(boolean rebalanceByBroker) {
this.rebalanceByBroker = rebalanceByBroker;
}

public String getConsumerGroup() {
return consumerGroup;
}
Expand Down
Expand Up @@ -20,17 +20,18 @@
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;

Expand Down Expand Up @@ -65,6 +66,10 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
* Consumption pattern,default is clustering
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
/**
* The switch for applying the rebalancing calculation task at the broker side
*/
private boolean rebalanceByBroker = false;
/**
* Message queue listener
*/
Expand Down Expand Up @@ -245,6 +250,14 @@ public void setMessageModel(MessageModel messageModel) {
this.messageModel = messageModel;
}

public boolean isRebalanceByBroker() {
return rebalanceByBroker;
}

public void setRebalanceByBroker(boolean rebalanceByBroker) {
this.rebalanceByBroker = rebalanceByBroker;
}

public MessageQueueListener getMessageQueueListener() {
return messageQueueListener;
}
Expand Down

0 comments on commit 96b7a00

Please sign in to comment.