Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,13 @@ private boolean mmapOperation() {
if (messageStore.isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getRunningFlags(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool(), writeWithoutMmap);
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getRunningFlags(), messageStore.getTransientStorePool(), writeWithoutMmap);
}
} else {
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), writeWithoutMmap);
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getRunningFlags(), writeWithoutMmap);
}

long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
Expand Down
3 changes: 2 additions & 1 deletion store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,12 @@ public CommitLog(final DefaultMessageStore messageStore) {
if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) {
this.mappedFileQueue = new MultiPathMappedFileQueue(messageStore.getMessageStoreConfig(),
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
messageStore.getAllocateMappedFileService(), this::getFullStorePaths);
messageStore.getAllocateMappedFileService(), this::getFullStorePaths, messageStore.getRunningFlags());
} else {
this.mappedFileQueue = new MappedFileQueue(storePath,
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
messageStore.getAllocateMappedFileService(),
messageStore.getRunningFlags(),
messageStore.getMessageStoreConfig().isWriteWithoutMmap());
}

Expand Down
23 changes: 17 additions & 6 deletions store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,35 @@ public class MappedFileQueue implements Swappable {
protected long committedWhere = 0;

protected volatile long storeTimestamp = 0;


protected RunningFlags runningFlags;

/**
* Configuration flag to use RandomAccessFile instead of MappedByteBuffer for writing
*/
protected boolean writeWithoutMmap = false;

public MappedFileQueue(final String storePath, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.allocateMappedFileService = allocateMappedFileService;
this(storePath, mappedFileSize, allocateMappedFileService, null, false);
}

public MappedFileQueue(final String storePath, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService, RunningFlags runningFlags) {
this(storePath, mappedFileSize, allocateMappedFileService, runningFlags, false);
}

public MappedFileQueue(final String storePath, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService, boolean writeWithoutMmap) {
this(storePath, mappedFileSize, allocateMappedFileService, null, writeWithoutMmap);
}

public MappedFileQueue(final String storePath, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService, RunningFlags runningFlags, boolean writeWithoutMmap) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.allocateMappedFileService = allocateMappedFileService;
this.runningFlags = runningFlags;
this.writeWithoutMmap = writeWithoutMmap;
}

Expand Down Expand Up @@ -279,7 +290,7 @@ public boolean doLoad(List<File> files) {
}

try {
MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize, writeWithoutMmap);
MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize, runningFlags, writeWithoutMmap);

mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
Expand Down Expand Up @@ -369,7 +380,7 @@ protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFile
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize, this.writeWithoutMmap);
mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize, runningFlags, this.writeWithoutMmap);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@ public class MultiPathMappedFileQueue extends MappedFileQueue {
private final MessageStoreConfig config;
private final Supplier<Set<String>> fullStorePathsSupplier;

public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService,
Supplier<Set<String>> fullStorePathsSupplier) {
this(messageStoreConfig, mappedFileSize, allocateMappedFileService, fullStorePathsSupplier, null);
}
public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService,
Supplier<Set<String>> fullStorePathsSupplier) {
super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService,
Supplier<Set<String>> fullStorePathsSupplier, RunningFlags runningFlags) {
super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService, runningFlags,
messageStoreConfig.isWriteWithoutMmap());
this.config = messageStoreConfig;
this.fullStorePathsSupplier = fullStorePathsSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.CompactionAppendMsgCallback;
import org.apache.rocketmq.store.PutMessageContext;
import org.apache.rocketmq.store.RunningFlags;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.TransientStorePool;
import org.apache.rocketmq.store.config.FlushDiskType;
Expand Down Expand Up @@ -121,6 +122,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
private static int maxSharedNum = 16;
private static final SharedByteBuffer[] SHARED_BYTE_BUFFER;

protected RunningFlags runningFlags;
static class SharedByteBuffer {
private final ReentrantLock lock;
private final ByteBuffer buffer;
Expand Down Expand Up @@ -173,24 +175,36 @@ public DefaultMappedFile() {
}

public DefaultMappedFile(final String fileName, final int fileSize) throws IOException {
init(fileName, fileSize);
this(fileName, fileSize, null);
}

public DefaultMappedFile(final String fileName, final int fileSize,
public DefaultMappedFile(final String fileName, final int fileSize, boolean writeWithoutMmap) throws IOException {
this(fileName, fileSize, null, null, writeWithoutMmap);
}

public DefaultMappedFile(final String fileName, final int fileSize, RunningFlags runningFlags) throws IOException {
this(fileName, fileSize, runningFlags, null, false);
}

public DefaultMappedFile(final String fileName, final int fileSize, final RunningFlags runningFlags,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize, transientStorePool);
this(fileName, fileSize, runningFlags, transientStorePool, false);
}

public DefaultMappedFile(final String fileName, final int fileSize,
public DefaultMappedFile(final String fileName, final int fileSize, final RunningFlags runningFlags,
final boolean writeWithoutMmap) throws IOException {
this.writeWithoutMmap = writeWithoutMmap;
init(fileName, fileSize);
this(fileName, fileSize, runningFlags, null, writeWithoutMmap);
}

public DefaultMappedFile(final String fileName, final int fileSize,
final TransientStorePool transientStorePool, final boolean writeWithoutMmap) throws IOException {
this(fileName, fileSize, null, transientStorePool, writeWithoutMmap);
}

public DefaultMappedFile(final String fileName, final int fileSize, final RunningFlags runningFlags,
final TransientStorePool transientStorePool, final boolean writeWithoutMmap) throws IOException {
this.writeWithoutMmap = writeWithoutMmap;
init(fileName, fileSize, transientStorePool);
init(fileName, fileSize, runningFlags, transientStorePool);
}

public static int getTotalMappedFiles() {
Expand All @@ -202,30 +216,30 @@ public static long getTotalMappedVirtualMemory() {
}

@Override
public void init(final String fileName, final int fileSize,
public void init(final String fileName, final int fileSize, final RunningFlags runningFlags,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
init(fileName, fileSize, runningFlags);
if (transientStorePool != null) {
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
}
}

private void init(final String fileName, final int fileSize) throws IOException {
private void init(final String fileName, final int fileSize, final RunningFlags runningFlags) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
this.runningFlags = runningFlags;
boolean ok = false;

UtilAll.ensureDirOK(this.file.getParent());

try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.randomAccessFile = new RandomAccessFile(this.file, "rw");
this.fileChannel = this.randomAccessFile.getChannel();

if (writeWithoutMmap) {
// Use RandomAccessFile for writing instead of MappedByteBuffer
this.randomAccessFile = new RandomAccessFile(this.file, "rw");
// Still create MappedByteBuffer for reading operations
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_ONLY, 0, fileSize);
} else {
Expand Down Expand Up @@ -522,6 +536,10 @@ public int flush(final int flushLeastPages) {
if (this.hold()) {
int value = getReadPosition();

if (!isWriteable()) {
return this.getFlushedPosition();
}

try {
this.mappedByteBufferAccessCountSinceLastSwap++;

Expand All @@ -538,6 +556,9 @@ public int flush(final int flushLeastPages) {
}
this.lastFlushTime = System.currentTimeMillis();
} catch (Throwable e) {
if (e instanceof IOException) {
getAndMakeNotWriteable();
}
log.error("Error occurred when force data to disk.", e);
}

Expand Down Expand Up @@ -597,6 +618,20 @@ protected void commit0() {
}
}

public boolean getAndMakeNotWriteable() {
if (runningFlags == null) {
return false;
}
return runningFlags.getAndMakeNotWriteable();
}

public boolean isWriteable() {
if (runningFlags == null) {
return true;
}
return runningFlags.isWriteable();
}

private boolean isAbleToFlush(final int flushLeastPages) {
int flush = FLUSHED_POSITION_UPDATER.get(this);
int write = getReadPosition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.CompactionAppendMsgCallback;
import org.apache.rocketmq.store.PutMessageContext;
import org.apache.rocketmq.store.RunningFlags;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.TransientStorePool;
import org.apache.rocketmq.store.config.FlushDiskType;
Expand Down Expand Up @@ -368,7 +369,7 @@ public interface MappedFile {
* @param transientStorePool transient store pool
* @throws IOException
*/
void init(String fileName, int fileSize, TransientStorePool transientStorePool) throws IOException;
void init(String fileName, int fileSize, RunningFlags runningFlags, TransientStorePool transientStorePool) throws IOException;

Iterator<SelectMappedBufferResult> iterator(int pos);

Expand Down
Loading