Skip to content

Commit

Permalink
Merge pull request #3153 from chenzlalvin/logicqueue
Browse files Browse the repository at this point in the history
[RIP-21] RocketMQ Logic Queue
  • Loading branch information
duhenglucky committed Jul 20, 2021
2 parents aed47c3 + 297582b commit da5d30b
Show file tree
Hide file tree
Showing 44 changed files with 4,753 additions and 251 deletions.
104 changes: 91 additions & 13 deletions broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@
*/
package org.apache.rocketmq.broker;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -33,6 +38,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
Expand All @@ -42,6 +49,7 @@
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
Expand Down Expand Up @@ -79,23 +87,30 @@
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.ServiceProvider;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.stats.MomentStatsItem;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
Expand Down Expand Up @@ -159,6 +174,7 @@ public class BrokerController {
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final ConcurrentMap<String, String> brokerName2AddrMap = Maps.newConcurrentMap();
private MessageStore messageStore;
private RemotingServer remotingServer;
private RemotingServer fastRemotingServer;
Expand Down Expand Up @@ -277,9 +293,9 @@ public boolean initialize() throws CloneNotSupportedException {

if (result) {
try {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
DefaultMessageStore messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
messageStore.registerCleanFileHook(topicConfigManager.getLogicalQueueCleanHook());
this.messageStore = messageStore;
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
Expand Down Expand Up @@ -467,6 +483,14 @@ public void run() {
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}

this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
BrokerController.this.refreshBrokerNameMapping();
} catch (Exception e) {
log.error("ScheduledTask examineBrokerClusterInfo exception", e);
}
}, 10, 10, TimeUnit.SECONDS);

if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
Expand Down Expand Up @@ -593,6 +617,18 @@ private void initialRpcHooks() {
}
}

private void refreshBrokerNameMapping() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
ClusterInfo brokerClusterInfo = this.brokerOuterAPI.getBrokerClusterInfo();
brokerClusterInfo.getBrokerAddrTable().forEach((brokerName, data) -> {
String masterBrokerAddr = data.getBrokerAddrs().get(MixAll.MASTER_ID);
this.brokerName2AddrMap.put(brokerName, masterBrokerAddr);
});
}

public String getBrokerAddrByName(String brokerName) {
return this.brokerName2AddrMap.get(brokerName);
}

public void registerProcessor() {
/**
* SendMessageProcessor
Expand Down Expand Up @@ -1009,20 +1045,54 @@ public void run() {
}

public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
TopicConfig registerTopicConfig = topicConfig;
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
registerTopicConfig =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
this.registerIncrementBrokerData(Collections.singletonList(topicConfig), dataVersion);
}

public synchronized void registerIncrementBrokerData(List<TopicConfig> topicConfigList, DataVersion dataVersion) {
if (topicConfigList == null || topicConfigList.isEmpty()) {
return;
}

ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setDataVersion(dataVersion);

ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigList.stream()
.map(topicConfig -> {
TopicConfig registerTopicConfig;
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
registerTopicConfig =
new TopicConfig(topicConfig.getTopicName(),
topicConfig.getReadQueueNums(),
topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
} else {
registerTopicConfig = new TopicConfig(topicConfig);
}
return registerTopicConfig;
})
.collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity()));
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);

String brokerName = this.brokerConfig.getBrokerName();
Map<String, LogicalQueuesInfo> logicalQueuesInfoMap = topicConfigList.stream()
.map(TopicConfig::getTopicName)
.map(topicName -> Optional.ofNullable(this.topicConfigManager.selectLogicalQueuesInfo(topicName))
.map(info -> {
info.readLock().lock();
try {
return new AbstractMap.SimpleImmutableEntry<>(topicName, new LogicalQueuesInfoInBroker(info, data -> Objects.equals(data.getBrokerName(), brokerName)));
} finally {
info.readLock().unlock();
}
})
.orElse(null))
.filter(Objects::nonNull)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (!logicalQueuesInfoMap.isEmpty()) {
topicConfigSerializeWrapper.setLogicalQueuesInfoMap(logicalQueuesInfoMap);
}

doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
}

Expand All @@ -1032,13 +1102,21 @@ public synchronized void registerBrokerAll(final boolean checkOrderConfig, boole
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
Map<String, LogicalQueuesInfo> logicalQueuesInfoMap = Maps.newHashMapWithExpectedSize(topicConfigWrapper.getTopicConfigTable().size());
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
String topicName = topicConfig.getTopicName();
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
new TopicConfig(topicName, topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
topicConfigTable.put(topicName, tmp);
LogicalQueuesInfoInBroker logicalQueuesInfo = this.topicConfigManager.selectLogicalQueuesInfo(topicName);
if (logicalQueuesInfo != null) {
String brokerName = this.brokerConfig.getBrokerName();
logicalQueuesInfoMap.put(topicName, new LogicalQueuesInfoInBroker(logicalQueuesInfo, data -> Objects.equals(data.getBrokerName(), brokerName)));
}
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
topicConfigWrapper.setLogicalQueuesInfoMap(logicalQueuesInfoMap);
}

if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.broker.domain;

import com.alibaba.fastjson.parser.ParserConfig;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.rocketmq.common.fastjson.GenericMapSuperclassDeserializer;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.srvutil.ConcurrentHashMapUtil;

import static java.util.Optional.ofNullable;

public class LogicalQueuesInfoInBroker extends LogicalQueuesInfo {
private final ConcurrentMap<Integer, ConcurrentNavigableMap<Long, LogicalQueueRouteData>> queueId2LogicalQueueMap = Maps.newConcurrentMap();

public LogicalQueuesInfoInBroker() {
}

public LogicalQueuesInfoInBroker(LogicalQueuesInfoInBroker other) {
this(other, null);
}

// deep copy
public LogicalQueuesInfoInBroker(LogicalQueuesInfoInBroker other, Predicate<LogicalQueueRouteData> predicate) {
other.readLock().lock();
try {
for (Entry<Integer, List<LogicalQueueRouteData>> entry : other.entrySet()) {
Stream<LogicalQueueRouteData> stream = entry.getValue().stream();
if (predicate != null) {
stream = stream.filter(predicate);
}
this.put(entry.getKey(), stream.map(LogicalQueueRouteData::new).collect(Collectors.toList()));
}
} finally {
other.readLock().unlock();
}
}

public void updateQueueRouteDataByQueueId(int queueId, LogicalQueueRouteData queueRouteData) {
if (queueRouteData == null) {
return;
}
ConcurrentHashMapUtil.computeIfAbsent(queueId2LogicalQueueMap, queueId, k -> new ConcurrentSkipListMap<>()).put(queueRouteData.getOffsetDelta(), queueRouteData);
}

/**
* find logical queue route data for message queues owned by this broker
*/
public LogicalQueueRouteData queryQueueRouteDataByQueueId(int queueId, long offset) {
ConcurrentNavigableMap<Long, LogicalQueueRouteData> m = this.queueId2LogicalQueueMap.get(queueId);
if (m == null || m.isEmpty()) {
return null;
}
Entry<Long, LogicalQueueRouteData> entry = m.floorEntry(offset);
if (entry == null) {
return null;
}
return entry.getValue();
}

public void deleteQueueRouteData(LogicalQueueRouteData logicalQueueRouteData) {
ConcurrentNavigableMap<Long, LogicalQueueRouteData> m = this.queueId2LogicalQueueMap.get(logicalQueueRouteData.getQueueId());
if (m != null) {
m.remove(logicalQueueRouteData.getOffsetDelta(), logicalQueueRouteData);
}
}

public LogicalQueueRouteData nextAvailableLogicalRouteData(LogicalQueueRouteData queueRouteData,
Predicate<LogicalQueueRouteData> predicate) {
this.readLock().lock();
try {
List<LogicalQueueRouteData> queueRouteDataList = ofNullable(this.get(queueRouteData.getLogicalQueueIndex())).orElse(Collections.emptyList());
int idx = Collections.binarySearch(queueRouteDataList, queueRouteData);
if (idx >= 0) {
for (int i = idx + 1, size = queueRouteDataList.size(); i < size; i++) {
LogicalQueueRouteData tmp = queueRouteDataList.get(i);
if (predicate.test(tmp)) {
return tmp;
}
}
}
} finally {
this.readLock().unlock();
}
return null;
}

static {
// workaround https://github.com/alibaba/fastjson/issues/3730
ParserConfig.getGlobalInstance().putDeserializer(LogicalQueuesInfoInBroker.class, GenericMapSuperclassDeserializer.INSTANCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
Expand All @@ -48,6 +49,7 @@
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
Expand Down Expand Up @@ -432,4 +434,23 @@ public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final

throw new MQBrokerException(response.getCode(), response.getRemark());
}

public ClusterInfo getBrokerClusterInfo() throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
RemotingCommand response = this.remotingClient.invokeSync(null, request, 3_000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return ClusterInfo.decode(response.getBody(), ClusterInfo.class);
}
default:
break;
}

throw new MQBrokerException(response.getCode(), response.getRemark());
}

public void forwardRequest(String brokerAddr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException, RemotingTooMuchRequestException, RemotingConnectException {
this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, invokeCallback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public long getMaxOffsetInQueue(String topic, int queueId) {
return next.getMaxOffsetInQueue(topic, queueId);
}

@Override
public long getMaxOffsetInQueue(String topic, int queueId, boolean committed) {
return next.getMaxOffsetInQueue(topic, queueId, committed);
}

@Override
public long getMinOffsetInQueue(String topic, int queueId) {
return next.getMinOffsetInQueue(topic, queueId);
Expand Down

0 comments on commit da5d30b

Please sign in to comment.