diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index a56fa461573..970e9b05ee1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -176,13 +176,13 @@ private boolean mmapOperation() { if (messageStore.isTransientStorePoolEnable()) { try { mappedFile = ServiceLoader.load(MappedFile.class).iterator().next(); - mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); + mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getRunningFlags(), messageStore.getTransientStorePool()); } catch (RuntimeException e) { log.warn("Use default implementation."); - mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool(), writeWithoutMmap); + mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getRunningFlags(), messageStore.getTransientStorePool(), writeWithoutMmap); } } else { - mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), writeWithoutMmap); + mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getRunningFlags(), writeWithoutMmap); } long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime); diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 3b26afcc098..38894abc811 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -114,11 +114,12 @@ public CommitLog(final DefaultMessageStore messageStore) { if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) { this.mappedFileQueue = new MultiPathMappedFileQueue(messageStore.getMessageStoreConfig(), messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), - messageStore.getAllocateMappedFileService(), this::getFullStorePaths); + messageStore.getAllocateMappedFileService(), this::getFullStorePaths, messageStore.getRunningFlags()); } else { this.mappedFileQueue = new MappedFileQueue(storePath, messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), messageStore.getAllocateMappedFileService(), + messageStore.getRunningFlags(), messageStore.getMessageStoreConfig().isWriteWithoutMmap()); } diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index 320e8421549..70cc65f8f60 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -53,7 +53,9 @@ public class MappedFileQueue implements Swappable { protected long committedWhere = 0; protected volatile long storeTimestamp = 0; - + + protected RunningFlags runningFlags; + /** * Configuration flag to use RandomAccessFile instead of MappedByteBuffer for writing */ @@ -61,16 +63,25 @@ public class MappedFileQueue implements Swappable { public MappedFileQueue(final String storePath, int mappedFileSize, AllocateMappedFileService allocateMappedFileService) { - this.storePath = storePath; - this.mappedFileSize = mappedFileSize; - this.allocateMappedFileService = allocateMappedFileService; + this(storePath, mappedFileSize, allocateMappedFileService, null, false); + } + + public MappedFileQueue(final String storePath, int mappedFileSize, + AllocateMappedFileService allocateMappedFileService, RunningFlags runningFlags) { + this(storePath, mappedFileSize, allocateMappedFileService, runningFlags, false); } public MappedFileQueue(final String storePath, int mappedFileSize, AllocateMappedFileService allocateMappedFileService, boolean writeWithoutMmap) { + this(storePath, mappedFileSize, allocateMappedFileService, null, writeWithoutMmap); + } + + public MappedFileQueue(final String storePath, int mappedFileSize, + AllocateMappedFileService allocateMappedFileService, RunningFlags runningFlags, boolean writeWithoutMmap) { this.storePath = storePath; this.mappedFileSize = mappedFileSize; this.allocateMappedFileService = allocateMappedFileService; + this.runningFlags = runningFlags; this.writeWithoutMmap = writeWithoutMmap; } @@ -279,7 +290,7 @@ public boolean doLoad(List files) { } try { - MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize, writeWithoutMmap); + MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize, runningFlags, writeWithoutMmap); mappedFile.setWrotePosition(this.mappedFileSize); mappedFile.setFlushedPosition(this.mappedFileSize); @@ -369,7 +380,7 @@ protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFile nextNextFilePath, this.mappedFileSize); } else { try { - mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize, this.writeWithoutMmap); + mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize, runningFlags, this.writeWithoutMmap); } catch (IOException e) { log.error("create mappedFile exception", e); } diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java index 72ec8820a6d..fcae4948c6c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java @@ -36,10 +36,15 @@ public class MultiPathMappedFileQueue extends MappedFileQueue { private final MessageStoreConfig config; private final Supplier> fullStorePathsSupplier; + public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize, + AllocateMappedFileService allocateMappedFileService, + Supplier> fullStorePathsSupplier) { + this(messageStoreConfig, mappedFileSize, allocateMappedFileService, fullStorePathsSupplier, null); + } public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize, AllocateMappedFileService allocateMappedFileService, - Supplier> fullStorePathsSupplier) { - super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService, + Supplier> fullStorePathsSupplier, RunningFlags runningFlags) { + super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService, runningFlags, messageStoreConfig.isWriteWithoutMmap()); this.config = messageStoreConfig; this.fullStorePathsSupplier = fullStorePathsSupplier; diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java index b2d89108b4b..f2383993d4b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java @@ -53,6 +53,7 @@ import org.apache.rocketmq.store.AppendMessageStatus; import org.apache.rocketmq.store.CompactionAppendMsgCallback; import org.apache.rocketmq.store.PutMessageContext; +import org.apache.rocketmq.store.RunningFlags; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.TransientStorePool; import org.apache.rocketmq.store.config.FlushDiskType; @@ -121,6 +122,7 @@ public class DefaultMappedFile extends AbstractMappedFile { private static int maxSharedNum = 16; private static final SharedByteBuffer[] SHARED_BYTE_BUFFER; + protected RunningFlags runningFlags; static class SharedByteBuffer { private final ReentrantLock lock; private final ByteBuffer buffer; @@ -173,24 +175,36 @@ public DefaultMappedFile() { } public DefaultMappedFile(final String fileName, final int fileSize) throws IOException { - init(fileName, fileSize); + this(fileName, fileSize, null); } - public DefaultMappedFile(final String fileName, final int fileSize, + public DefaultMappedFile(final String fileName, final int fileSize, boolean writeWithoutMmap) throws IOException { + this(fileName, fileSize, null, null, writeWithoutMmap); + } + + public DefaultMappedFile(final String fileName, final int fileSize, RunningFlags runningFlags) throws IOException { + this(fileName, fileSize, runningFlags, null, false); + } + + public DefaultMappedFile(final String fileName, final int fileSize, final RunningFlags runningFlags, final TransientStorePool transientStorePool) throws IOException { - init(fileName, fileSize, transientStorePool); + this(fileName, fileSize, runningFlags, transientStorePool, false); } - public DefaultMappedFile(final String fileName, final int fileSize, + public DefaultMappedFile(final String fileName, final int fileSize, final RunningFlags runningFlags, final boolean writeWithoutMmap) throws IOException { - this.writeWithoutMmap = writeWithoutMmap; - init(fileName, fileSize); + this(fileName, fileSize, runningFlags, null, writeWithoutMmap); } public DefaultMappedFile(final String fileName, final int fileSize, + final TransientStorePool transientStorePool, final boolean writeWithoutMmap) throws IOException { + this(fileName, fileSize, null, transientStorePool, writeWithoutMmap); + } + + public DefaultMappedFile(final String fileName, final int fileSize, final RunningFlags runningFlags, final TransientStorePool transientStorePool, final boolean writeWithoutMmap) throws IOException { this.writeWithoutMmap = writeWithoutMmap; - init(fileName, fileSize, transientStorePool); + init(fileName, fileSize, runningFlags, transientStorePool); } public static int getTotalMappedFiles() { @@ -202,30 +216,30 @@ public static long getTotalMappedVirtualMemory() { } @Override - public void init(final String fileName, final int fileSize, + public void init(final String fileName, final int fileSize, final RunningFlags runningFlags, final TransientStorePool transientStorePool) throws IOException { - init(fileName, fileSize); + init(fileName, fileSize, runningFlags); if (transientStorePool != null) { this.writeBuffer = transientStorePool.borrowBuffer(); this.transientStorePool = transientStorePool; } } - private void init(final String fileName, final int fileSize) throws IOException { + private void init(final String fileName, final int fileSize, final RunningFlags runningFlags) throws IOException { this.fileName = fileName; this.fileSize = fileSize; this.file = new File(fileName); this.fileFromOffset = Long.parseLong(this.file.getName()); + this.runningFlags = runningFlags; boolean ok = false; UtilAll.ensureDirOK(this.file.getParent()); try { - this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); + this.randomAccessFile = new RandomAccessFile(this.file, "rw"); + this.fileChannel = this.randomAccessFile.getChannel(); if (writeWithoutMmap) { - // Use RandomAccessFile for writing instead of MappedByteBuffer - this.randomAccessFile = new RandomAccessFile(this.file, "rw"); // Still create MappedByteBuffer for reading operations this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_ONLY, 0, fileSize); } else { @@ -522,6 +536,10 @@ public int flush(final int flushLeastPages) { if (this.hold()) { int value = getReadPosition(); + if (!isWriteable()) { + return this.getFlushedPosition(); + } + try { this.mappedByteBufferAccessCountSinceLastSwap++; @@ -538,6 +556,9 @@ public int flush(final int flushLeastPages) { } this.lastFlushTime = System.currentTimeMillis(); } catch (Throwable e) { + if (e instanceof IOException) { + getAndMakeNotWriteable(); + } log.error("Error occurred when force data to disk.", e); } @@ -597,6 +618,20 @@ protected void commit0() { } } + public boolean getAndMakeNotWriteable() { + if (runningFlags == null) { + return false; + } + return runningFlags.getAndMakeNotWriteable(); + } + + public boolean isWriteable() { + if (runningFlags == null) { + return true; + } + return runningFlags.isWriteable(); + } + private boolean isAbleToFlush(final int flushLeastPages) { int flush = FLUSHED_POSITION_UPDATER.get(this); int write = getReadPosition(); diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java index d1f11959aa6..0985ff1edce 100644 --- a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java @@ -28,6 +28,7 @@ import org.apache.rocketmq.store.AppendMessageResult; import org.apache.rocketmq.store.CompactionAppendMsgCallback; import org.apache.rocketmq.store.PutMessageContext; +import org.apache.rocketmq.store.RunningFlags; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.TransientStorePool; import org.apache.rocketmq.store.config.FlushDiskType; @@ -368,7 +369,7 @@ public interface MappedFile { * @param transientStorePool transient store pool * @throws IOException */ - void init(String fileName, int fileSize, TransientStorePool transientStorePool) throws IOException; + void init(String fileName, int fileSize, RunningFlags runningFlags, TransientStorePool transientStorePool) throws IOException; Iterator iterator(int pos);