Skip to content

Commit

Permalink
[ISSUE #4522] Fix topic route info not found in some case (#4523)
Browse files Browse the repository at this point in the history
* fix topic route info not found in some case

* fix unit test about broker register

Co-authored-by: dengzhiwen1 <dengzhiwen1@xiaomi.com>
  • Loading branch information
cserwen and dengzhiwen1 committed Aug 10, 2022
1 parent 79d89f7 commit 9b87205
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 25 deletions.
Expand Up @@ -245,6 +245,13 @@ public RemotingCommand registerBroker(ChannelHandlerContext ctx,
ctx.channel()
);

if (result == null) {
// Register single topic route info should be after the broker completes the first registration.
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("register broker failed");
return response;
}

responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());

Expand Down
Expand Up @@ -286,6 +286,12 @@ public RegisterBrokerResult registerBroker(
}
}

if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {
log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",
topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);
return null;
}

String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));

Expand Down
Expand Up @@ -27,10 +27,13 @@
import java.util.concurrent.ConcurrentHashMap;

import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
Expand Down Expand Up @@ -519,12 +522,23 @@ private RemotingCommand getRemotingCommand(int code) {
request.addExtField("clusterName", "cluster");
request.addExtField("haServerAddr", "10.10.2.1");
request.addExtField("brokerId", "2333");
request.addExtField("topic", "unit-test");
request.addExtField("topic", "unit-test0");
return request;
}

private static RemotingCommand genSampleRegisterCmd(boolean reg) {
RegisterBrokerRequestHeader header = new RegisterBrokerRequestHeader();
byte[] body = null;
if (reg) {
TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();
topicConfigWrapper.getTopicConfigTable().put("unit-test1", new TopicConfig());
topicConfigWrapper.getTopicConfigTable().put("unit-test2", new TopicConfig());
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
body = requestBody.encode(false);
final int bodyCrc32 = UtilAll.crc32(body);
header.setBodyCrc32(bodyCrc32);
}
header.setBrokerName("broker");
RemotingCommand request = RemotingCommand.createRequestCommand(
reg ? RequestCode.REGISTER_BROKER : RequestCode.UNREGISTER_BROKER, header);
Expand All @@ -533,6 +547,7 @@ private static RemotingCommand genSampleRegisterCmd(boolean reg) {
request.addExtField("clusterName", "cluster");
request.addExtField("haServerAddr", "10.10.2.1");
request.addExtField("brokerId", "2333");
request.setBody(body);
return request;
}

Expand All @@ -547,13 +562,15 @@ private static void setFinalStatic(Field field, Object newValue) throws Exceptio
private void registerRouteInfoManager() {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = new ConcurrentHashMap<>();
TopicConfig topicConfig = new TopicConfig();
topicConfig.setWriteQueueNums(8);
topicConfig.setTopicName("unit-test");
topicConfig.setPerm(6);
topicConfig.setReadQueueNums(8);
topicConfig.setOrder(false);
topicConfigConcurrentHashMap.put("unit-test", topicConfig);
for (int i = 0; i < 2; i++) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setWriteQueueNums(8);
topicConfig.setTopicName("unit-test" + i);
topicConfig.setPerm(6);
topicConfig.setReadQueueNums(8);
topicConfig.setOrder(false);
topicConfigConcurrentHashMap.put(topicConfig.getTopicName(), topicConfig);
}
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
Channel channel = mock(Channel.class);
RegisterBrokerResult registerBrokerResult = routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", "default-broker", 1234, "127.0.0.1:1001",
Expand Down
Expand Up @@ -82,13 +82,8 @@ public void testQueryBrokerTopicConfig() {
targetVersion.setTimestamp(200L);

ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = new ConcurrentHashMap<>();
TopicConfig topicConfig = new TopicConfig();
topicConfig.setWriteQueueNums(8);
topicConfig.setTopicName("unit-test");
topicConfig.setPerm(6);
topicConfig.setReadQueueNums(8);
topicConfig.setOrder(false);
topicConfigConcurrentHashMap.put("unit-test-1", topicConfig);
topicConfigConcurrentHashMap.put("unit-test-0", new TopicConfig("unit-test-0"));
topicConfigConcurrentHashMap.put("unit-test-1", new TopicConfig("unit-test-1"));

TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setDataVersion(targetVersion);
Expand Down Expand Up @@ -127,13 +122,8 @@ public void testRegisterBroker() {
dataVersion.setTimestamp(100L);

ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = new ConcurrentHashMap<>();
TopicConfig topicConfig = new TopicConfig();
topicConfig.setWriteQueueNums(8);
topicConfig.setTopicName("unit-test");
topicConfig.setPerm(6);
topicConfig.setReadQueueNums(8);
topicConfig.setOrder(false);
topicConfigConcurrentHashMap.put("unit-test", topicConfig);
topicConfigConcurrentHashMap.put("unit-test0", new TopicConfig("unit-test0"));
topicConfigConcurrentHashMap.put("unit-test1", new TopicConfig("unit-test1"));

TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setDataVersion(dataVersion);
Expand Down
Expand Up @@ -629,7 +629,8 @@ public void switchBrokerRole_ChannelDestroy() {

private RegisterBrokerResult registerBrokerWithNormalTopic(BrokerBasicInfo brokerInfo, String... topics) {
ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = new ConcurrentHashMap<>();

TopicConfig baseTopic = new TopicConfig("baseTopic");
topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), baseTopic);
for (final String topic : topics) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setWriteQueueNums(8);
Expand All @@ -646,6 +647,9 @@ private RegisterBrokerResult registerBrokerWithNormalTopic(BrokerBasicInfo broke
private RegisterBrokerResult registerBrokerWithOrderTopic(BrokerBasicInfo brokerBasicInfo, String... topics) {
ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = new ConcurrentHashMap<>();

TopicConfig baseTopic = new TopicConfig("baseTopic");
baseTopic.setOrder(true);
topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), baseTopic);
for (final String topic : topics) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setWriteQueueNums(8);
Expand All @@ -660,7 +664,9 @@ private RegisterBrokerResult registerBrokerWithOrderTopic(BrokerBasicInfo broker

private RegisterBrokerResult registerBrokerWithGlobalOrderTopic(BrokerBasicInfo brokerBasicInfo, String... topics) {
ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = new ConcurrentHashMap<>();

TopicConfig baseTopic = new TopicConfig("baseTopic", 1, 1);
baseTopic.setOrder(true);
topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), baseTopic);
for (final String topic : topics) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setWriteQueueNums(1);
Expand All @@ -678,7 +684,8 @@ private RegisterBrokerResult registerBroker(BrokerBasicInfo brokerInfo, Channel

if (topicConfigConcurrentHashMap == null) {
topicConfigConcurrentHashMap = new ConcurrentHashMap<>();

TopicConfig baseTopic = new TopicConfig("baseTopic");
topicConfigConcurrentHashMap.put(baseTopic.getTopicName(), baseTopic);
for (final String topic : topics) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setWriteQueueNums(8);
Expand Down

0 comments on commit 9b87205

Please sign in to comment.