Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RIP-28] light message queue(LMQ) #3694

Merged
merged 16 commits into from
Jan 13, 2022
Merged

Conversation

tianliuliu
Copy link
Contributor

@tianliuliu tianliuliu commented Jan 4, 2022

Brief changelog

#3666 Support light message queue (LMQ) multi-consumeQueue atomic dispatch, at the same time support LMQ metaData and offset management

Some messaging scenarios require light message queue, Such as MQTT multi-level topic or AMQP lightweight queue can be set at will by users when sending and subscription message. Therefore, the implementation of LMQ's queue model is required

  1. Implement multi-dimensional dispatch of a message based on ConsumeQueue, and a message can support multiple lightweight queue consumption.
  2. Implement multi-level topic management, so topic metadata also needs adaptation management, such as topic verification, data cleaning and so on.
  3. Implement lightweight queue consumption offset management

Follow this checklist to help us incorporate your contribution quickly and easily. Notice, it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR.

  • Make sure there is a Github issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
  • Format the pull request title like [ISSUE #123] Fix UnknownException when host config not exist. Each commit in the pull request should have a meaningful subject line and body.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in test module.
  • Run mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle to make sure basic checks pass. Run mvn clean install -DskipITs to make sure unit-test pass. Run mvn clean test-compile failsafe:integration-test to make sure integration-test pass.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@duhenglucky duhenglucky added this to the 4.9.3 milestone Jan 4, 2022
@coveralls
Copy link

coveralls commented Jan 4, 2022

Coverage Status

Coverage decreased (-2.1%) to 51.032% when pulling f4dc125 on tianliuliu:support_mqtt into a7b5903 on apache:develop.

@codecov-commenter
Copy link

codecov-commenter commented Jan 4, 2022

Codecov Report

Merging #3694 (f4dc125) into develop (a7b5903) will decrease coverage by 0.20%.
The diff coverage is 34.07%.

Impacted file tree graph

@@              Coverage Diff              @@
##             develop    #3694      +/-   ##
=============================================
- Coverage      47.19%   46.98%   -0.21%     
+ Complexity      5032     4857     -175     
=============================================
  Files            628      636       +8     
  Lines          41400    42244     +844     
  Branches        5377     5521     +144     
=============================================
+ Hits           19540    19850     +310     
- Misses         19437    19909     +472     
- Partials        2423     2485      +62     
Impacted Files Coverage Δ
...a/org/apache/rocketmq/broker/BrokerController.java 46.54% <0.00%> (-0.89%) ⬇️
.../broker/longpolling/LmqPullRequestHoldService.java 0.00% <0.00%> (ø)
...e/rocketmq/broker/longpolling/ManyPullRequest.java 0.00% <0.00%> (ø)
...etmq/broker/plugin/AbstractPluginMessageStore.java 0.00% <0.00%> (ø)
...ocketmq/broker/processor/AdminBrokerProcessor.java 34.38% <0.00%> (-0.19%) ⬇️
...ocketmq/broker/processor/SendMessageProcessor.java 39.59% <0.00%> (-0.30%) ⬇️
...n/java/org/apache/rocketmq/store/MessageStore.java 0.00% <ø> (ø)
...he/rocketmq/store/stats/LmqBrokerStatsManager.java 0.00% <0.00%> (ø)
...n/java/org/apache/rocketmq/test/util/StatUtil.java 16.19% <16.19%> (ø)
...cketmq/broker/offset/LmqConsumerOffsetManager.java 44.44% <44.44%> (ø)
... and 38 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a7b5903...f4dc125. Read the comment docs.

@yuz10
Copy link
Member

yuz10 commented Jan 6, 2022

Is there documentation about how to use the light message queue?

@tianliuliu
Copy link
Contributor Author

Is there documentation about how to use the light message queue?

There is no document yet, you can refer to the demo in PR: test/src/test/java/org/apache/rocketmq/test/lmq/TestBenchLmqStore.java

@tianliuliu
Copy link
Contributor Author

Is there documentation about how to use the light message queue?

https://github.com/tianliuliu/rocketmq/blob/support_mqtt/docs/cn/Example_LMQ.md

duhenglucky
duhenglucky previously approved these changes Jan 7, 2022

import org.junit.Test;

import static org.junit.Assert.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use * to replace all imports.

log.warn("[BUG]put commit log position info to " + queueName + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + j + " times");

try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you like to reduce the nesting level?

logger.error(" ", e);
}
}
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with the last comment,nesting level should be reduced and error log may be can more clear

@duhenglucky duhenglucky dismissed their stale review January 7, 2022 04:01

some details should be polish.

@ShannonDing
Copy link
Member

LMQ的消息通过客户端暴露出来的userProperty参数区分,在服务端动态获取LMQ并创建逻辑队列。 这里建议对客户端传递的参数和服务端的逻辑队列数做下校验, 防止客户端传递大量非法值,引起服务端逻辑队列创建过多引发资源耗尽。
建议在服务端broker.conf里增加max_lmq_size保护门限, 可根据实际情况调整LMQ最大服务的数量。


Long offset = defaultMQPullConsumer.maxOffset(mq);

defaultMQPullConsumer.pullBlockIfNotFound(
Copy link
Member

@yuz10 yuz10 Jan 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I tested the doc and found some problems with the example:

  1. producer: queueId is illegal
caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 1  DESC: request queueId[3] is illegal, TopicConfig [topicName=TopicTest, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false] Producer: 172.18.40.26:50188 BROKER: 172.18.40.26:10911
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:662)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:508)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:490)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:434)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:838)
	at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:581)
	... 4 more
  1. consumer: The broker[broker-a] not exist
Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: The broker[broker-a] not exist
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
	at org.apache.rocketmq.client.impl.MQAdminImpl.maxOffset(MQAdminImpl.java:219)
	at org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.maxOffset(DefaultMQPullConsumerImpl.java:170)
	at org.apache.rocketmq.client.consumer.DefaultMQPullConsumer.maxOffset(DefaultMQPullConsumer.java:155)
	at org.apache.rocketmq.example.simple.PullConsumer.main(PullConsumer.java:49)

Add Thead.sleep(30000) will solve it, because the lmq topic is not in nameserver, should wait until the timer fetch broker names.

and I enhanced the consumer, to make it consume continously:

        DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer();
        defaultMQPullConsumer.setNamesrvAddr("localhost:9876");
        defaultMQPullConsumer.setVipChannelEnabled(false);
        defaultMQPullConsumer.setConsumerGroup("CID_RMQ_SYS_LMQ_TEST");
        defaultMQPullConsumer.setInstanceName("CID_RMQ_SYS_LMQ_TEST");
        defaultMQPullConsumer.setRegisterTopics(new HashSet<>(Collections.singletonList("TopicTest")));

        defaultMQPullConsumer.start();
        Thread.sleep(30000);

        String brokerName = "broker-a";
        MessageQueue mq = new MessageQueue("%LMQ%123", brokerName, 0);

        long offset = defaultMQPullConsumer.fetchConsumeOffset(mq, false);
        while (true) {
            PullResult result = defaultMQPullConsumer.pullBlockIfNotFound(mq, "*", offset, 32);
            if (result.getMsgFoundList() == null || result.getMsgFoundList().isEmpty()) {
                continue;
            }
            for (MessageExt messageExt : result.getMsgFoundList()) {
                System.out.printf("%s%n", messageExt);
            }
            offset = result.getNextBeginOffset();
            defaultMQPullConsumer.updateConsumeOffset(mq, offset);
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. request queueId[3] is illegal, TopicConfig [topicName=TopicTest, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false] Producer: 172.18.40.26:50188 BROKER: 172.18.40.26:10911
    For more information, please visit the url, http://rocketmq.apache.org/docs/faq/

TopicTest 多创建几个队列就可以了 这个只有一个队列 所以队列queueId 3拿不到
2. broker-a 不存在问题 就是你部署的mq集群中,没有这个broker

Copy link
Member

@yuz10 yuz10 Jan 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. TopicTest有3个队列,因为light队列中将队列数改为1导致判断有问题。
  2. broker-a 是存在的,defaultMQPullConsumer.maxOffset中代码
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

updateTopicRouteInfoFromNameServer是通过topic去nameserver更新broker列表,nameserver中不存在这个topic(%LMQ%123),所以无法获取broker。sleep 30秒等待定时器同步broker列表之后就能在缓存中拿到。

放出来的示例我试验的结果是这样的,不知道你有没有试过

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. TopicTest有3个队列,因为light队列中将队列数改为1导致判断有问题。
  2. broker-a 是存在的,defaultMQPullConsumer.maxOffset中代码
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

updateTopicRouteInfoFromNameServer是通过topic去nameserver更新broker列表,nameserver中不存在这个topic(%LMQ%123),所以无法获取broker。

%LMQ%这种是不会上报nameserver的,根据那个需要分发的普通topic来获取,TopicTest是普通的,不能当作lmq, lmq只能内部分发,lmq topic只支持命名格式%LMQ%开头的name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. TopicTest有3个队列,因为light队列中将队列数改为1导致判断有问题。
  2. broker-a 是存在的,defaultMQPullConsumer.maxOffset中代码
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

updateTopicRouteInfoFromNameServer是通过topic去nameserver更新broker列表,nameserver中不存在这个topic(%LMQ%123),所以无法获取broker。sleep 30秒等待定时器同步broker列表之后就能在缓存中拿到。

放出来的示例我试验的结果是这样的,不知道你有没有试过

这个地方brokerName 我不是从nameserver动态获取的,我知道lmq会分发在我的demo broker-a上,直接写死的,你的lmq能分发到哪个broker 就从哪个broker上拉取消息 这个lmq不会上报nameServer,只能根据普通topic来获取brokername列表,还是需要根据普通topic更新到本地缓存的,之前写的demo只是伪逻辑哈

@RongtongJin RongtongJin merged commit 14b63ee into apache:develop Jan 13, 2022
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.PermName;

public class LmqTopicConfigManager extends TopicConfigManager {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does LMQ need a special TopicConfigManager? The broker should have unified topic management ability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LMQ topic will not register nameServer. As a lightweight resource , TopicConfigManager is only used for broker verification and logical processing. LMQ topic resource will be managed at the proxy layer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

9 participants