Skip to content

Conversation

@GenerousMan
Copy link
Contributor

Make sure set the target branch to develop

What is the purpose of the change

Currently, RocketMQ's delay message feature only supports delayed delivery for specific time levels. Such delay message feature(only support specific levels of delay time) is no longer enough to support the flexible usage of rocketmq. Therefore, we need a delay message feature that supports arbitrary delay time.

I have written my proposal on the links below:
Google Doc: https://docs.google.com/document/d/1D6XWwY39p531c2aVi5HQll9iwzTUNT1haUFHqMoRkT0/edit?usp=sharing
Shimo:https://shimo.im/docs/gXqme9PKKpIeD7qo/

To optimize the commit history, this is a new PR.
The old PR is :#4558

Brief changelog

  1. Add classes to support the feature, such as TimerMessageStore, TimerWheel, TimerLog, and Slot , etc.
  2. Modify the logic of CommitLog.AsyncPutMessage(). Add two judgement(isRolledTimerMessage(msg) / checkIfTimerMessage(msg)) for timing messages. If the message is timing message, its topic and related properties will be converted.
  3. Modify the logic of BrokerController.start()/shutdown()/initialize(), etc. Add the TimerMessageStore into these processes.
  4. Add some functions in related files(DefaultMessageStore, CommitLog, MappedFileQueue, MessageConst, etc).
  5. Add unit tests for the feature.

Follow this checklist to help us incorporate your contribution quickly and easily. Notice, it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR.

  • [✅] Make sure there is a Github issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
  • [✅] Format the pull request title like [ISSUE #123] Fix UnknownException when host config not exist. Each commit in the pull request should have a meaningful subject line and body.
  • [✅] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • [✅] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in test module.
  • [✅] Run mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle to make sure basic checks pass. Run mvn clean install -DskipITs to make sure unit-test pass. Run mvn clean test-compile failsafe:integration-test to make sure integration-test pass.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@coveralls
Copy link

coveralls commented Jul 20, 2022

Coverage Status

Coverage increased (+0.5%) to 48.994% when pulling d538133 on GenerousMan:RIP-43 into 8ccffa7 on apache:develop.

Copy link
Contributor

@RongtongJin RongtongJin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. There are benchmarks in the current example module, but I think it would be better to add the most straightforward example to tell users how to use timing messages with arbitrary time delay. In addition, adding relevant documents is also a good choice
  2. It would be better to add an IT test.
  3. It is necessary to replace the origin delay level message with new timer message in the POP consume. It can also be completed in the subsequent pull requests.
  4. The current implementation needs to adapt to the slaveActingMaster mode. If no one is willing to do it, I am willing to do this in the subsequent pull requests later.

example/pom.xml Outdated
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this line, we can use the parent pom file dependency.

store/pom.xml Outdated
<dependency>
<groupId>com.conversantmedia</groupId>
<artifactId>disruptor</artifactId>
<version>1.2.10</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if we can put the version to the parent pom file.

public static final String ONS_HTTP_PROXY_GROUP = "CID_ONS-HTTP-PROXY";
public static final String CID_ONSAPI_PERMISSION_GROUP = "CID_ONSAPI_PERMISSION";
public static final String CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER";
public static final String SYSTEM_TOPIC_PREFIX = "RMQ_SYS_";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an same constant in the TopicValidator class (org.apache.rocketmq.common.topic.TopicValidator#SYSTEM_TOPIC_PREFIX). I don't think it's necessary to re create one here

@fuyou001
Copy link
Contributor

Conflicting files @GenerousMan

long deliverMs;
try {
if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) {
deliverMs = System.currentTimeMillis() + Integer.parseInt(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC)) * 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when set setDelayTimeSec(24 * 3600 * 3), integer overflows

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx, it has been resolved.

fuyou001 and others added 3 commits July 21, 2022 15:19
# Conflicts:
#	broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
#	store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
Comment on lines +169 to +171
TimerMessageStore getTimerMessageStore();

void setTimerMessageStore(TimerMessageStore timerMessageStore);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better. to add some comments for the interface.

@duhenglucky duhenglucky merged commit 8336b49 into apache:develop Jul 21, 2022
@duhenglucky duhenglucky added this to the 5.0.0 milestone Jul 21, 2022
public long checkPhyPos(long timeStartMs, long maxOffset) {
long minFirst = Long.MAX_VALUE;
int firstSlotIndex = getSlotIndex(timeStartMs);
for (int i = 0; i < slotsTotal * 2; i++) {
Copy link
Contributor

@AuroraTwinkle AuroraTwinkle Jul 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that slotsTotal * 2 appears multiple times. Why do we need to multiply it by 2? I don't understand, could you please help me? Thanks. @GenerousMan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants