Skip to content

Commit

Permalink
[Schedule] Replace Timer with ScheduledExecutorService
Browse files Browse the repository at this point in the history
Signed-off-by: zhangyang21 <zhangyang21@xiaomi.com>
  • Loading branch information
Git-Yang committed Nov 3, 2021
1 parent 9a5ec90 commit bb01183
Showing 1 changed file with 29 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.topic.TopicValidator;
Expand Down Expand Up @@ -59,7 +61,8 @@ public class ScheduleMessageService extends ConfigManager {
new ConcurrentHashMap<Integer, Long>(32);
private final DefaultMessageStore defaultMessageStore;
private final AtomicBoolean started = new AtomicBoolean(false);
private Timer timer;
private ScheduledExecutorService deliverExecutorService;
private int deliverThreadPoolNums = Runtime.getRuntime().availableProcessors();
private MessageStore writeMessageStore;
private int maxDelayLevel;

Expand Down Expand Up @@ -113,7 +116,7 @@ public long computeDeliverTimestamp(final int delayLevel, final long storeTimest
public void start() {
if (started.compareAndSet(false, true)) {
super.load();
this.timer = new Timer("ScheduleMessageTimerThread", true);
this.deliverExecutorService = new ScheduledThreadPoolExecutor(deliverThreadPoolNums, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Expand All @@ -123,30 +126,30 @@ public void start() {
}

if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}

this.timer.scheduleAtFixedRate(new TimerTask() {
this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
if (started.get()) {
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
}
}

public void shutdown() {
if (this.started.compareAndSet(true, false)) {
if (null != this.timer)
this.timer.cancel();
if (this.started.compareAndSet(true, false) && null != this.deliverExecutorService) {
this.deliverExecutorService.shutdownNow();
}

}

public boolean isStarted() {
Expand All @@ -157,10 +160,12 @@ public int getMaxDelayLevel() {
return maxDelayLevel;
}

@Override
public String encode() {
return this.encode(false);
}

@Override
public boolean load() {
boolean result = super.load();
result = result && this.parseDelayLevel();
Expand All @@ -184,6 +189,7 @@ public void decode(String jsonString) {
}
}

@Override
public String encode(final boolean prettyFormat) {
DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new DelayOffsetSerializeWrapper();
delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);
Expand Down Expand Up @@ -222,7 +228,7 @@ public boolean parseDelayLevel() {
return true;
}

class DeliverDelayedMessageTimerTask extends TimerTask {
class DeliverDelayedMessageTimerTask implements Runnable {
private final int delayLevel;
private final long offset;

Expand All @@ -240,8 +246,8 @@ public void run() {
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -329,19 +335,16 @@ public void executeOnTimeup() {
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
ScheduleMessageService.this.deliverExecutorService.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
nextOffset), DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
Expand All @@ -350,17 +353,17 @@ public void executeOnTimeup() {
}
}
} else {
ScheduleMessageService.this.timer.schedule(
ScheduleMessageService.this.deliverExecutorService.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
countdown, TimeUnit.MILLISECONDS);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
Expand All @@ -379,8 +382,8 @@ public void executeOnTimeup() {
}
} // end of if (cq != null)

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
}

private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
Expand Down

0 comments on commit bb01183

Please sign in to comment.