Skip to content

Commit

Permalink
[ISSUE #8203] Adjust the timing of delayed messages to avoid a concen…
Browse files Browse the repository at this point in the history
…trated distribution
  • Loading branch information
CLFutureX committed May 25, 2024
1 parent a7b82b6 commit 20e6191
Showing 1 changed file with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -725,15 +725,16 @@ public boolean doEnqueue(long offsetPy, int sizePy, long delayedTime, MessageExt
LOGGER.debug("Do enqueue [{}] [{}]", new Timestamp(delayedTime), messageExt);
//copy the value first, avoid concurrent problem
long tmpWriteTimeMs = currWriteTimeMs;
boolean needRoll = delayedTime - tmpWriteTimeMs >= (long) timerRollWindowSlots * precisionMs;
long intervalMs = timerRollWindowSlots * precisionMs;
long remainingMs = delayedTime - tmpWriteTimeMs;

int magic = MAGIC_DEFAULT;
if (needRoll) {
if (remainingMs >= intervalMs) {
magic = magic | MAGIC_ROLL;
if (delayedTime - tmpWriteTimeMs - (long) timerRollWindowSlots * precisionMs < (long) timerRollWindowSlots / 3 * precisionMs) {
//give enough time to next roll
delayedTime = tmpWriteTimeMs + (long) (timerRollWindowSlots / 2) * precisionMs;
if (remainingMs < intervalMs / 3) {
delayedTime = tmpWriteTimeMs + remainingMs % intervalMs + intervalMs / 3;
} else {
delayedTime = tmpWriteTimeMs + (long) timerRollWindowSlots * precisionMs;
delayedTime = tmpWriteTimeMs + (remainingMs % intervalMs + 9 * intervalMs) / 10;
}
}
boolean isDelete = messageExt.getProperty(TIMER_DELETE_UNIQUE_KEY) != null;
Expand Down

0 comments on commit 20e6191

Please sign in to comment.