Skip to content

Commit

Permalink
[Assignment] Fix the risk of memory overflow caused by excessive popS…
Browse files Browse the repository at this point in the history
…hareQueueNum.

Signed-off-by: zhangyang21 <zhangyang21@xiaomi.com>
  • Loading branch information
Git-Yang committed Jan 20, 2022
1 parent 1b09702 commit 5254805
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,33 +208,8 @@ private Set<MessageQueue> doLoadBalance(final String topic, final String consume
}

if (setMessageRequestModeRequestBody != null && setMessageRequestModeRequestBody.getMode() == MessageRequestMode.POP) {
if (setMessageRequestModeRequestBody.getPopShareQueueNum() <= 0) {
//each client pop all messagequeue
allocateResult = new ArrayList<>(mqAll.size());
for (MessageQueue mq : mqAll) {
//must create new MessageQueue in case of change cache in AssignmentManager
MessageQueue newMq = new MessageQueue(mq.getTopic(), mq.getBrokerName(), -1);
allocateResult.add(newMq);
}

} else {
if (cidAll.size() <= mqAll.size()) {
//consumer working in pop mode could share the MessageQueues assigned to the N (N = popWorkGroupSize) consumer following it in the cid list
allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
int index = cidAll.indexOf(clientId);
if (index >= 0) {
for (int i = 1; i <= setMessageRequestModeRequestBody.getPopShareQueueNum(); i++) {
index++;
index = index % cidAll.size();
List<MessageQueue> tmp = allocateMessageQueueStrategy.allocate(consumerGroup, cidAll.get(index), mqAll, cidAll);
allocateResult.addAll(tmp);
}
}
} else {
//make sure each cid is assigned
allocateResult = allocate(consumerGroup, clientId, mqAll, cidAll);
}
}
allocateResult = allocate4Pop(allocateMessageQueueStrategy, consumerGroup, clientId, mqAll,
cidAll, setMessageRequestModeRequestBody.getPopShareQueueNum());

} else {
allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
Expand All @@ -256,6 +231,42 @@ private Set<MessageQueue> doLoadBalance(final String topic, final String consume
return assignedQueueSet;
}

public List<MessageQueue> allocate4Pop(AllocateMessageQueueStrategy allocateMessageQueueStrategy,
final String consumerGroup, final String clientId, List<MessageQueue> mqAll, List<String> cidAll,
int popShareQueueNum) {

List<MessageQueue> allocateResult;
if (popShareQueueNum <= 0 || popShareQueueNum >= cidAll.size() - 1) {
//each client pop all messagequeue
allocateResult = new ArrayList<>(mqAll.size());
for (MessageQueue mq : mqAll) {
//must create new MessageQueue in case of change cache in AssignmentManager
MessageQueue newMq = new MessageQueue(mq.getTopic(), mq.getBrokerName(), -1);
allocateResult.add(newMq);
}

} else {
if (cidAll.size() <= mqAll.size()) {
//consumer working in pop mode could share the MessageQueues assigned to the N (N = popWorkGroupSize) consumer following it in the cid list
allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
int index = cidAll.indexOf(clientId);
if (index >= 0) {
for (int i = 1; i <= popShareQueueNum; i++) {
index++;
index = index % cidAll.size();
List<MessageQueue> tmp = allocateMessageQueueStrategy.allocate(consumerGroup, cidAll.get(index), mqAll, cidAll);
allocateResult.addAll(tmp);
}
}
} else {
//make sure each cid is assigned
allocateResult = allocate(consumerGroup, clientId, mqAll, cidAll);
}
}

return allocateResult;
}

private List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@
import com.google.common.collect.ImmutableSet;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.loadbalance.AssignmentManager;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueConsistentHash;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
Expand All @@ -40,6 +46,7 @@
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -67,6 +74,7 @@ public class QueryAssignmentProcessorTest {
@Mock
private Channel channel;

private String broker = "defaultBroker";
private String topic = "FooBar";
private String group = "FooBarGroup";
private String clientId = "127.0.0.1";
Expand Down Expand Up @@ -118,6 +126,73 @@ public void testSetMessageRequestMode_RetryTopic() throws Exception {
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
}


@Test
public void testAllocate4Pop() {
testAllocate4Pop(new AllocateMessageQueueAveragely());
testAllocate4Pop(new AllocateMessageQueueAveragelyByCircle());
testAllocate4Pop(new AllocateMessageQueueConsistentHash());
}

private void testAllocate4Pop(AllocateMessageQueueStrategy strategy) {
int testNum = 16;
List<MessageQueue> mqAll = new ArrayList<>();
for (int mqSize = 0; mqSize < testNum; mqSize++) {
mqAll.add(new MessageQueue(topic, broker, mqSize));

List<String> cidAll = new ArrayList<>();
for (int cidSize = 0; cidSize < testNum; cidSize++) {
String clientId = String.valueOf(cidSize);
cidAll.add(clientId);

for (int popShareQueueNum = 0; popShareQueueNum < testNum; popShareQueueNum++) {
List<MessageQueue> allocateResult =
queryAssignmentProcessor.allocate4Pop(strategy, group, clientId, mqAll, cidAll, popShareQueueNum);
Assert.assertTrue(checkAllocateResult(popShareQueueNum, mqAll.size(), cidAll.size(), allocateResult.size(), strategy));
}
}
}
}

private boolean checkAllocateResult(int popShareQueueNum, int mqSize, int cidSize, int allocateSize,
AllocateMessageQueueStrategy strategy) {

//The maximum size of allocations will not exceed mqSize.
if (allocateSize > mqSize) {
return false;
}

//It is not allowed that the client is not assigned to the consumeQueue.
if (allocateSize <= 0) {
return false;
}

if (popShareQueueNum <= 0 || popShareQueueNum >= cidSize - 1) {
return allocateSize == mqSize;
} else if (mqSize < cidSize) {
return allocateSize == 1;
}

if (strategy instanceof AllocateMessageQueueAveragely
|| strategy instanceof AllocateMessageQueueAveragelyByCircle) {

if (mqSize % cidSize == 0) {
return allocateSize == (mqSize / cidSize) * (popShareQueueNum + 1);
} else {
int avgSize = mqSize / cidSize;
return allocateSize >= avgSize * (popShareQueueNum + 1)
&& allocateSize <= (avgSize + 1) * (popShareQueueNum + 1);
}
}

if (strategy instanceof AllocateMessageQueueConsistentHash) {
//Just skip
return true;
}

return false;
}

private RemotingCommand createQueryAssignmentRequest() {
QueryAssignmentRequestBody requestBody = new QueryAssignmentRequestBody();
requestBody.setTopic(topic);
Expand Down

0 comments on commit 5254805

Please sign in to comment.