From 7f5715e720c33a3a1a06cd863f9fb02e79cdd69e Mon Sep 17 00:00:00 2001 From: zhangyang21 Date: Tue, 24 Aug 2021 18:10:26 +0800 Subject: [PATCH] [Schedule] Replace Timer with ScheduledExecutorService Signed-off-by: zhangyang21 --- .../schedule/ScheduleMessageService.java | 52 +++++++++++-------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index bacae1e80bcf..3fa8dcfe7fde 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -16,13 +16,15 @@ */ package org.apache.rocketmq.store.schedule; +import io.netty.util.concurrent.DefaultThreadFactory; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.TopicFilterType; @@ -59,7 +61,8 @@ public class ScheduleMessageService extends ConfigManager { new ConcurrentHashMap(32); private final DefaultMessageStore defaultMessageStore; private final AtomicBoolean started = new AtomicBoolean(false); - private Timer timer; + private ScheduledExecutorService scheduledExecutorService; + private int deliverThreadPoolNums = Runtime.getRuntime().availableProcessors(); private MessageStore writeMessageStore; private int maxDelayLevel; @@ -113,7 +116,7 @@ public long computeDeliverTimestamp(final int delayLevel, final long storeTimest public void start() { if (started.compareAndSet(false, true)) { super.load(); - this.timer = new Timer("ScheduleMessageTimerThread", true); + this.scheduledExecutorService = new ScheduledThreadPoolExecutor(deliverThreadPoolNums, new DefaultThreadFactory("ScheduleMessageTimerThread_")); for (Map.Entry entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); @@ -123,30 +126,30 @@ public void start() { } if (timeDelay != null) { - this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); + this.scheduledExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS); } } - this.timer.scheduleAtFixedRate(new TimerTask() { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { - if (started.get()) ScheduleMessageService.this.persist(); + if (started.get()) { + ScheduleMessageService.this.persist(); + } } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } - }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); + }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS); } } public void shutdown() { - if (this.started.compareAndSet(true, false)) { - if (null != this.timer) - this.timer.cancel(); + if (this.started.compareAndSet(true, false) && null != this.scheduledExecutorService) { + this.scheduledExecutorService.shutdownNow(); } - } public boolean isStarted() { @@ -157,10 +160,12 @@ public int getMaxDelayLevel() { return maxDelayLevel; } + @Override public String encode() { return this.encode(false); } + @Override public boolean load() { boolean result = super.load(); result = result && this.parseDelayLevel(); @@ -184,6 +189,7 @@ public void decode(String jsonString) { } } + @Override public String encode(final boolean prettyFormat) { DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new DelayOffsetSerializeWrapper(); delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable); @@ -222,7 +228,7 @@ public boolean parseDelayLevel() { return true; } - class DeliverDelayedMessageTimerTask extends TimerTask { + class DeliverDelayedMessageTimerTask implements Runnable { private final int delayLevel; private final long offset; @@ -240,8 +246,8 @@ public void run() { } catch (Exception e) { // XXX: warn and notify me log.error("ScheduleMessageService, executeOnTimeup exception", e); - ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( - this.delayLevel, this.offset), DELAY_FOR_A_PERIOD); + ScheduleMessageService.this.scheduledExecutorService.schedule(new DeliverDelayedMessageTimerTask( + this.delayLevel, this.offset), DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS); } } @@ -329,9 +335,9 @@ public void executeOnTimeup() { log.error( "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId()); - ScheduleMessageService.this.timer.schedule( + ScheduleMessageService.this.scheduledExecutorService.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, - nextOffset), DELAY_FOR_A_PERIOD); + nextOffset), DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; @@ -350,17 +356,17 @@ public void executeOnTimeup() { } } } else { - ScheduleMessageService.this.timer.schedule( + ScheduleMessageService.this.scheduledExecutorService.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), - countdown); + countdown, TimeUnit.MILLISECONDS); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } // end of for nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); - ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( - this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); + ScheduleMessageService.this.scheduledExecutorService.schedule(new DeliverDelayedMessageTimerTask( + this.delayLevel, nextOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } finally { @@ -379,8 +385,8 @@ public void executeOnTimeup() { } } // end of if (cq != null) - ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, - failScheduleOffset), DELAY_FOR_A_WHILE); + ScheduleMessageService.this.scheduledExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, + failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS); } private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {