Skip to content

Commit

Permalink
[RIP-28] light message queue(LMQ) (apache#3694)
Browse files Browse the repository at this point in the history
  • Loading branch information
tianliuliu committed Jan 13, 2022
1 parent fd39f7a commit 6f1805c
Show file tree
Hide file tree
Showing 34 changed files with 2,466 additions and 283 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.MessageStoreFactory;
import org.apache.rocketmq.broker.plugin.MessageStorePluginContext;
Expand All @@ -64,7 +66,9 @@
import org.apache.rocketmq.broker.processor.ReplyMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
Expand Down Expand Up @@ -106,6 +110,7 @@
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.store.stats.LmqBrokerStatsManager;

public class BrokerController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
Expand Down Expand Up @@ -180,18 +185,18 @@ public BrokerController(
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
this.messageStoreConfig = messageStoreConfig;
this.consumerOffsetManager = new ConsumerOffsetManager(this);
this.topicConfigManager = new TopicConfigManager(this);
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
this.pullRequestHoldService = new PullRequestHoldService(this);
this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this);
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
this.consumerFilterManager = new ConsumerFilterManager(this);
this.producerManager = new ProducerManager();
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
this.filterServerManager = new FilterServerManager(this);

Expand All @@ -207,7 +212,8 @@ public BrokerController(
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());

this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());

this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));

this.brokerFastFailure = new BrokerFastFailure(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public static String getConsumerOffsetPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
}

public static String getLmqConsumerOffsetPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "lmqConsumerOffset.json";
}

public static String getSubscriptionGroupPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.longpolling;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;


public class LmqPullRequestHoldService extends PullRequestHoldService {
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

public LmqPullRequestHoldService(BrokerController brokerController) {
super(brokerController);
}

@Override
public String getServiceName() {
return LmqPullRequestHoldService.class.getSimpleName();
}

@Override
public void checkHoldRequest() {
for (String key : pullRequestTable.keySet()) {
int idx = key.lastIndexOf(TOPIC_QUEUEID_SEPARATOR);
if (idx <= 0 || idx >= key.length() - 1) {
pullRequestTable.remove(key);
continue;
}
String topic = key.substring(0, idx);
int queueId = Integer.parseInt(key.substring(idx + 1));
final long offset = brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
LOGGER.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
if (MixAll.isLmq(topic)) {
ManyPullRequest mpr = pullRequestTable.get(key);
if (mpr == null || mpr.getPullRequestList() == null || mpr.getPullRequestList().isEmpty()) {
pullRequestTable.remove(key);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,8 @@ public synchronized List<PullRequest> cloneListAndClear() {

return null;
}

public ArrayList<PullRequest> getPullRequestList() {
return pullRequestList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@

public class PullRequestHoldService extends ServiceThread {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final String TOPIC_QUEUEID_SEPARATOR = "@";
private final BrokerController brokerController;
protected static final String TOPIC_QUEUEID_SEPARATOR = "@";
protected final BrokerController brokerController;
private final SystemClock systemClock = new SystemClock();
private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
protected ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
new ConcurrentHashMap<String, ManyPullRequest>(1024);

public PullRequestHoldService(final BrokerController brokerController) {
Expand Down Expand Up @@ -93,7 +93,7 @@ public String getServiceName() {
return PullRequestHoldService.class.getSimpleName();
}

private void checkHoldRequest() {
protected void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@

public class ConsumerOffsetManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final String TOPIC_GROUP_SEPARATOR = "@";
protected static final String TOPIC_GROUP_SEPARATOR = "@";

private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

private transient BrokerController brokerController;
protected transient BrokerController brokerController;

public ConsumerOffsetManager() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.offset;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;

public class LmqConsumerOffsetManager extends ConsumerOffsetManager {
private ConcurrentHashMap<String, Long> lmqOffsetTable = new ConcurrentHashMap<>(512);

public LmqConsumerOffsetManager(BrokerController brokerController) {
super(brokerController);
}

@Override
public long queryOffset(final String group, final String topic, final int queueId) {
if (!MixAll.isLmq(group)) {
return super.queryOffset(group, topic, queueId);
}
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
Long offset = lmqOffsetTable.get(key);
if (offset != null) {
return offset;
}
return -1;
}

@Override
public Map<Integer, Long> queryOffset(final String group, final String topic) {
if (!MixAll.isLmq(group)) {
return super.queryOffset(group, topic);
}
Map<Integer, Long> map = new HashMap<>();
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
Long offset = lmqOffsetTable.get(key);
if (offset != null) {
map.put(0, offset);
}
return map;
}

@Override
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
final long offset) {
if (!MixAll.isLmq(group)) {
super.commitOffset(clientHost, group, topic, queueId, offset);
return;
}
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
lmqOffsetTable.put(key, offset);
}

@Override
public String encode() {
return this.encode(false);
}

@Override
public String configFilePath() {
return BrokerPathConfigHelper.getLmqConsumerOffsetPath(brokerController.getMessageStoreConfig().getStorePathRootDir());
}

@Override
public void decode(String jsonString) {
if (jsonString != null) {
LmqConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, LmqConsumerOffsetManager.class);
if (obj != null) {
super.offsetTable = obj.offsetTable;
this.lmqOffsetTable = obj.lmqOffsetTable;
}
}
}

@Override
public String encode(final boolean prettyFormat) {
return RemotingSerializable.toJson(this, prettyFormat);
}

public ConcurrentHashMap<String, Long> getLmqOffsetTable() {
return lmqOffsetTable;
}

public void setLmqOffsetTable(ConcurrentHashMap<String, Long> lmqOffsetTable) {
this.lmqOffsetTable = lmqOffsetTable;
}
}
Loading

0 comments on commit 6f1805c

Please sign in to comment.