From 95e27b9dd789805126ad513ca8995ae4c138c9d3 Mon Sep 17 00:00:00 2001 From: zhaoyuhan Date: Thu, 25 Apr 2024 15:24:05 +0800 Subject: [PATCH] fix tiered storage skip dispatch and destroy when broker role is Slave --- .../tieredstore/TieredMessageStore.java | 2 +- .../core/MessageStoreDispatcherImpl.java | 10 +++++++- .../tieredstore/file/FlatFileStore.java | 25 +++++++++++++------ .../core/MessageStoreDispatcherImplTest.java | 6 ++++- .../tieredstore/file/FlatFileStoreTest.java | 9 +++++-- 5 files changed, 40 insertions(+), 12 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 99d586ae236..6112b27c7d6 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -88,7 +88,7 @@ public TieredMessageStore(MessageStorePluginContext context, MessageStore next) this.metadataStore = this.getMetadataStore(this.storeConfig); this.topicFilter = new MessageStoreTopicFilter(this.storeConfig); this.storeExecutor = new MessageStoreExecutor(); - this.flatFileStore = new FlatFileStore(this.storeConfig, this.metadataStore, this.storeExecutor); + this.flatFileStore = new FlatFileStore(this.storeConfig, this.metadataStore, this.storeExecutor, this.defaultStore); this.indexService = new IndexStoreService(this.flatFileStore.getFlatFileFactory(), MessageStoreUtil.getIndexFilePath(this.storeConfig.getBrokerName())); this.fetcher = new MessageStoreFetcherImpl(this); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java index 330872ab9cd..fc7512168b5 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java @@ -34,6 +34,7 @@ import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.queue.ConsumeQueueInterface; import org.apache.rocketmq.store.queue.CqUnit; import org.apache.rocketmq.tieredstore.MessageStoreConfig; @@ -292,7 +293,14 @@ public void constructIndexFile(long topicId, DispatchRequest request) { public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { - flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore); + if (defaultStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { + log.info("Broker role is slave, skip dispatch"); + } else if (defaultStore.getMessageStoreConfig().isEnableDLegerCommitLog() && + defaultStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.ASYNC_MASTER) { + log.info("Dledger leader is not elected yet, skip dispatch"); + } else { + flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore); + } this.waitForRunning(Duration.ofSeconds(20).toMillis()); } log.info("{} service shutdown", this.getServiceName()); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java index 0d7044a5447..be3a31436a2 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java @@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.tieredstore.MessageStoreConfig; import org.apache.rocketmq.tieredstore.MessageStoreExecutor; import org.apache.rocketmq.tieredstore.metadata.MetadataStore; @@ -45,13 +47,15 @@ public class FlatFileStore { private final MessageStoreExecutor executor; private final FlatFileFactory flatFileFactory; private final ConcurrentMap flatFileConcurrentMap; + private final MessageStore defaultStore; - public FlatFileStore(MessageStoreConfig storeConfig, MetadataStore metadataStore, MessageStoreExecutor executor) { + public FlatFileStore(MessageStoreConfig storeConfig, MetadataStore metadataStore, MessageStoreExecutor executor, MessageStore defaultStore) { this.storeConfig = storeConfig; this.metadataStore = metadataStore; this.executor = executor; this.flatFileFactory = new FlatFileFactory(metadataStore, storeConfig); this.flatFileConcurrentMap = new ConcurrentHashMap<>(); + this.defaultStore = defaultStore; } public boolean load() { @@ -60,12 +64,19 @@ public boolean load() { this.flatFileConcurrentMap.clear(); this.recover(); this.executor.commonExecutor.scheduleWithFixedDelay(() -> { - long expiredTimeStamp = System.currentTimeMillis() - - TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime()); - for (FlatMessageFile flatFile : deepCopyFlatFileToList()) { - flatFile.destroyExpiredFile(expiredTimeStamp); - if (flatFile.consumeQueue.fileSegmentTable.isEmpty()) { - this.destroyFile(flatFile.getMessageQueue()); + if (defaultStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { + log.info("Broker role is slave, skip destroy"); + } else if (defaultStore.getMessageStoreConfig().isEnableDLegerCommitLog() && + defaultStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.ASYNC_MASTER) { + log.info("Dledger leader is not elected yet, skip destroy"); + } else { + long expiredTimeStamp = System.currentTimeMillis() - + TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime()); + for (FlatMessageFile flatFile : deepCopyFlatFileToList()) { + flatFile.destroyExpiredFile(expiredTimeStamp); + if (flatFile.consumeQueue.fileSegmentTable.isEmpty()) { + this.destroyFile(flatFile.getMessageQueue()); + } } } }, 60, 60, TimeUnit.SECONDS); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java index 8ac7e068a76..c3335319959 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java @@ -79,7 +79,6 @@ public void init() { mq = new MessageQueue("StoreTest", storeConfig.getBrokerName(), 1); metadataStore = new DefaultMetadataStore(storeConfig); executor = new MessageStoreExecutor(); - fileStore = new FlatFileStore(storeConfig, metadataStore, executor); } @After @@ -95,7 +94,9 @@ public void dispatchFromCommitLogTest() throws Exception { MessageStore defaultStore = Mockito.mock(MessageStore.class); Mockito.when(defaultStore.getMinOffsetInQueue(anyString(), anyInt())).thenReturn(100L); Mockito.when(defaultStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(200L); + Mockito.when(defaultStore.getMessageStoreConfig()).thenReturn(new org.apache.rocketmq.store.config.MessageStoreConfig()); + fileStore = new FlatFileStore(storeConfig, metadataStore, executor, defaultStore); messageStore = Mockito.mock(TieredMessageStore.class); IndexService indexService = new IndexStoreService(new FlatFileFactory(metadataStore, storeConfig), storePath); @@ -159,6 +160,9 @@ public void dispatchFromCommitLogTest() throws Exception { @Test public void dispatchServiceTest() { MessageStore defaultStore = Mockito.mock(MessageStore.class); + Mockito.when(defaultStore.getMessageStoreConfig()).thenReturn(new org.apache.rocketmq.store.config.MessageStoreConfig()); + + fileStore = new FlatFileStore(storeConfig, metadataStore, executor, defaultStore); messageStore = Mockito.mock(TieredMessageStore.class); IndexService indexService = new IndexStoreService(new FlatFileFactory(metadataStore, storeConfig), storePath); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java index 79647932dae..7e74b23656c 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.tieredstore.MessageStoreConfig; import org.apache.rocketmq.tieredstore.MessageStoreExecutor; import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode; @@ -40,6 +42,7 @@ public class FlatFileStoreTest { private final String storePath = MessageStoreUtilTest.getRandomStorePath(); private MessageStoreConfig storeConfig; private MetadataStore metadataStore; + private MessageStore defaultStore; @Before public void init() { @@ -48,6 +51,8 @@ public void init() { storeConfig.setTieredBackendServiceProvider(PosixFileSegment.class.getName()); storeConfig.setBrokerName(storeConfig.getBrokerName()); metadataStore = new DefaultMetadataStore(storeConfig); + defaultStore = Mockito.mock(DefaultMessageStore.class); + Mockito.when(defaultStore.getMessageStoreConfig()).thenReturn(new org.apache.rocketmq.store.config.MessageStoreConfig()); } @After @@ -59,7 +64,7 @@ public void shutdown() throws IOException { public void flatFileStoreTest() { // Empty recover MessageStoreExecutor executor = new MessageStoreExecutor(); - FlatFileStore fileStore = new FlatFileStore(storeConfig, metadataStore, executor); + FlatFileStore fileStore = new FlatFileStore(storeConfig, metadataStore, executor, defaultStore); Assert.assertTrue(fileStore.load()); Assert.assertEquals(storeConfig, fileStore.getStoreConfig()); @@ -75,7 +80,7 @@ public void flatFileStoreTest() { Assert.assertEquals(4, fileStore.deepCopyFlatFileToList().size()); fileStore.shutdown(); - fileStore = new FlatFileStore(storeConfig, metadataStore, executor); + fileStore = new FlatFileStore(storeConfig, metadataStore, executor, defaultStore); Assert.assertTrue(fileStore.load()); Assert.assertEquals(4, fileStore.deepCopyFlatFileToList().size());