Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #3286] replace Timer with ScheduledExecutorService #3287

Merged
merged 1 commit into from
Nov 10, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.topic.TopicValidator;
Expand Down Expand Up @@ -60,7 +62,8 @@ public class ScheduleMessageService extends ConfigManager {
new ConcurrentHashMap<Integer, Long>(32);
private final DefaultMessageStore defaultMessageStore;
private final AtomicBoolean started = new AtomicBoolean(false);
private Timer timer;
private ScheduledExecutorService deliverExecutorService;
private int deliverThreadPoolNums = Runtime.getRuntime().availableProcessors();
private MessageStore writeMessageStore;
private int maxDelayLevel;

Expand Down Expand Up @@ -113,7 +116,7 @@ public long computeDeliverTimestamp(final int delayLevel, final long storeTimest
public void start() {
if (started.compareAndSet(false, true)) {
super.load();
this.timer = new Timer("ScheduleMessageTimerThread", true);
this.deliverExecutorService = new ScheduledThreadPoolExecutor(deliverThreadPoolNums, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Expand All @@ -123,11 +126,11 @@ public void start() {
}

if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}

this.timer.scheduleAtFixedRate(new TimerTask() {
this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
Expand All @@ -139,16 +142,14 @@ public void run() {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
}
}

public void shutdown() {
if (this.started.compareAndSet(true, false)) {
if (null != this.timer)
this.timer.cancel();
if (this.started.compareAndSet(true, false) && null != this.deliverExecutorService) {
this.deliverExecutorService.shutdownNow();
}

}

public boolean isStarted() {
Expand All @@ -159,10 +160,12 @@ public int getMaxDelayLevel() {
return maxDelayLevel;
}

@Override
public String encode() {
return this.encode(false);
}

@Override
public boolean load() {
boolean result = super.load();
result = result && this.parseDelayLevel();
Expand Down Expand Up @@ -223,6 +226,7 @@ public void decode(String jsonString) {
}
}

@Override
public String encode(final boolean prettyFormat) {
DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new DelayOffsetSerializeWrapper();
delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);
Expand Down Expand Up @@ -261,7 +265,7 @@ public boolean parseDelayLevel() {
return true;
}

class DeliverDelayedMessageTimerTask extends TimerTask {
class DeliverDelayedMessageTimerTask implements Runnable {
private final int delayLevel;
private final long offset;

Expand All @@ -279,8 +283,8 @@ public void run() {
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -372,9 +376,9 @@ public void executeOnTimeup() {
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
ScheduleMessageService.this.deliverExecutorService.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
nextOffset), DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
Expand All @@ -388,17 +392,17 @@ public void executeOnTimeup() {
}
}
} else {
ScheduleMessageService.this.timer.schedule(
ScheduleMessageService.this.deliverExecutorService.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
countdown, TimeUnit.MILLISECONDS);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
Expand All @@ -424,8 +428,8 @@ public void executeOnTimeup() {
}
} // end of if (cq != null)

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
}

private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
Expand Down