Skip to content

Commit

Permalink
fix tiered storage skip dispatch and destroy when broker role is Slave
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoyuhan committed Apr 25, 2024
1 parent b37d283 commit 95e27b9
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public TieredMessageStore(MessageStorePluginContext context, MessageStore next)
this.metadataStore = this.getMetadataStore(this.storeConfig);
this.topicFilter = new MessageStoreTopicFilter(this.storeConfig);
this.storeExecutor = new MessageStoreExecutor();
this.flatFileStore = new FlatFileStore(this.storeConfig, this.metadataStore, this.storeExecutor);
this.flatFileStore = new FlatFileStore(this.storeConfig, this.metadataStore, this.storeExecutor, this.defaultStore);
this.indexService = new IndexStoreService(this.flatFileStore.getFlatFileFactory(),
MessageStoreUtil.getIndexFilePath(this.storeConfig.getBrokerName()));
this.fetcher = new MessageStoreFetcherImpl(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
Expand Down Expand Up @@ -292,7 +293,14 @@ public void constructIndexFile(long topicId, DispatchRequest request) {
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore);
if (defaultStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
log.info("Broker role is slave, skip dispatch");
} else if (defaultStore.getMessageStoreConfig().isEnableDLegerCommitLog() &&
defaultStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.ASYNC_MASTER) {
log.info("Dledger leader is not elected yet, skip dispatch");
} else {
flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore);
}
this.waitForRunning(Duration.ofSeconds(20).toMillis());
}
log.info("{} service shutdown", this.getServiceName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
Expand All @@ -45,13 +47,15 @@ public class FlatFileStore {
private final MessageStoreExecutor executor;
private final FlatFileFactory flatFileFactory;
private final ConcurrentMap<MessageQueue, FlatMessageFile> flatFileConcurrentMap;
private final MessageStore defaultStore;

public FlatFileStore(MessageStoreConfig storeConfig, MetadataStore metadataStore, MessageStoreExecutor executor) {
public FlatFileStore(MessageStoreConfig storeConfig, MetadataStore metadataStore, MessageStoreExecutor executor, MessageStore defaultStore) {
this.storeConfig = storeConfig;
this.metadataStore = metadataStore;
this.executor = executor;
this.flatFileFactory = new FlatFileFactory(metadataStore, storeConfig);
this.flatFileConcurrentMap = new ConcurrentHashMap<>();
this.defaultStore = defaultStore;
}

public boolean load() {
Expand All @@ -60,12 +64,19 @@ public boolean load() {
this.flatFileConcurrentMap.clear();
this.recover();
this.executor.commonExecutor.scheduleWithFixedDelay(() -> {
long expiredTimeStamp = System.currentTimeMillis() -
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
for (FlatMessageFile flatFile : deepCopyFlatFileToList()) {
flatFile.destroyExpiredFile(expiredTimeStamp);
if (flatFile.consumeQueue.fileSegmentTable.isEmpty()) {
this.destroyFile(flatFile.getMessageQueue());
if (defaultStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
log.info("Broker role is slave, skip destroy");
} else if (defaultStore.getMessageStoreConfig().isEnableDLegerCommitLog() &&
defaultStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.ASYNC_MASTER) {
log.info("Dledger leader is not elected yet, skip destroy");
} else {
long expiredTimeStamp = System.currentTimeMillis() -
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
for (FlatMessageFile flatFile : deepCopyFlatFileToList()) {
flatFile.destroyExpiredFile(expiredTimeStamp);
if (flatFile.consumeQueue.fileSegmentTable.isEmpty()) {
this.destroyFile(flatFile.getMessageQueue());
}
}
}
}, 60, 60, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public void init() {
mq = new MessageQueue("StoreTest", storeConfig.getBrokerName(), 1);
metadataStore = new DefaultMetadataStore(storeConfig);
executor = new MessageStoreExecutor();
fileStore = new FlatFileStore(storeConfig, metadataStore, executor);
}

@After
Expand All @@ -95,7 +94,9 @@ public void dispatchFromCommitLogTest() throws Exception {
MessageStore defaultStore = Mockito.mock(MessageStore.class);
Mockito.when(defaultStore.getMinOffsetInQueue(anyString(), anyInt())).thenReturn(100L);
Mockito.when(defaultStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(200L);
Mockito.when(defaultStore.getMessageStoreConfig()).thenReturn(new org.apache.rocketmq.store.config.MessageStoreConfig());

fileStore = new FlatFileStore(storeConfig, metadataStore, executor, defaultStore);
messageStore = Mockito.mock(TieredMessageStore.class);
IndexService indexService =
new IndexStoreService(new FlatFileFactory(metadataStore, storeConfig), storePath);
Expand Down Expand Up @@ -159,6 +160,9 @@ public void dispatchFromCommitLogTest() throws Exception {
@Test
public void dispatchServiceTest() {
MessageStore defaultStore = Mockito.mock(MessageStore.class);
Mockito.when(defaultStore.getMessageStoreConfig()).thenReturn(new org.apache.rocketmq.store.config.MessageStoreConfig());

fileStore = new FlatFileStore(storeConfig, metadataStore, executor, defaultStore);
messageStore = Mockito.mock(TieredMessageStore.class);
IndexService indexService =
new IndexStoreService(new FlatFileFactory(metadataStore, storeConfig), storePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
Expand All @@ -40,6 +42,7 @@ public class FlatFileStoreTest {
private final String storePath = MessageStoreUtilTest.getRandomStorePath();
private MessageStoreConfig storeConfig;
private MetadataStore metadataStore;
private MessageStore defaultStore;

@Before
public void init() {
Expand All @@ -48,6 +51,8 @@ public void init() {
storeConfig.setTieredBackendServiceProvider(PosixFileSegment.class.getName());
storeConfig.setBrokerName(storeConfig.getBrokerName());
metadataStore = new DefaultMetadataStore(storeConfig);
defaultStore = Mockito.mock(DefaultMessageStore.class);
Mockito.when(defaultStore.getMessageStoreConfig()).thenReturn(new org.apache.rocketmq.store.config.MessageStoreConfig());
}

@After
Expand All @@ -59,7 +64,7 @@ public void shutdown() throws IOException {
public void flatFileStoreTest() {
// Empty recover
MessageStoreExecutor executor = new MessageStoreExecutor();
FlatFileStore fileStore = new FlatFileStore(storeConfig, metadataStore, executor);
FlatFileStore fileStore = new FlatFileStore(storeConfig, metadataStore, executor, defaultStore);
Assert.assertTrue(fileStore.load());

Assert.assertEquals(storeConfig, fileStore.getStoreConfig());
Expand All @@ -75,7 +80,7 @@ public void flatFileStoreTest() {
Assert.assertEquals(4, fileStore.deepCopyFlatFileToList().size());
fileStore.shutdown();

fileStore = new FlatFileStore(storeConfig, metadataStore, executor);
fileStore = new FlatFileStore(storeConfig, metadataStore, executor, defaultStore);
Assert.assertTrue(fileStore.load());
Assert.assertEquals(4, fileStore.deepCopyFlatFileToList().size());

Expand Down

0 comments on commit 95e27b9

Please sign in to comment.