Skip to content

Commit

Permalink
Merge pull request #3357 from Jason918/RIP-7
Browse files Browse the repository at this point in the history
RIP-7 Multiple Directories Storage Support
  • Loading branch information
francisoliverlee committed Sep 25, 2021
2 parents 538b6b7 + 14d50f9 commit 4dbdbf0
Show file tree
Hide file tree
Showing 9 changed files with 492 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

Expand Down Expand Up @@ -186,7 +187,7 @@ private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandl
}
}

if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
Expand Down Expand Up @@ -636,8 +637,12 @@ public SocketAddress getStoreHost() {
}

private String diskUtil() {
String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
double physicRatio = 100;
String storePath = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (String storePathPhysic : paths) {
physicRatio = Math.min(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic));
}

String storePathLogis =
StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
Expand Down
5 changes: 5 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/UtilAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ public static String timeMillisToHumanString3(final long t) {
cal.get(Calendar.SECOND));
}

public static boolean isPathExists(final String path) {
File file = new File(path);
return file.exists();
}

public static double getDiskPartitionSpaceUsedPercent(final String path) {
if (null == path || path.isEmpty()) {
log.error("Error when measuring disk space usage, path is null or empty, path : {}", path);
Expand Down
26 changes: 24 additions & 2 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

Expand All @@ -43,6 +45,7 @@
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.schedule.ScheduleMessageService;

Expand Down Expand Up @@ -71,9 +74,20 @@ public class CommitLog {

protected final PutMessageLock putMessageLock;

private volatile Set<String> fullStorePaths = Collections.emptySet();

public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();
if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);
} else {
this.mappedFileQueue = new MappedFileQueue(storePath,
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
defaultMessageStore.getAllocateMappedFileService());
}

this.defaultMessageStore = defaultMessageStore;

if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
Expand All @@ -95,6 +109,14 @@ protected PutMessageThreadLocal initialValue() {

}

public void setFullStorePaths(Set<String> fullStorePaths) {
this.fullStorePaths = fullStorePaths;
}

public Set<String> getFullStorePaths() {
return fullStorePaths;
}

public boolean load() {
boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.channels.FileLock;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
Expand Down Expand Up @@ -784,15 +785,22 @@ public HashMap<String, String> getRuntimeInfo() {
HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();

{
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));

double minPhysicsUsedRatio = Double.MAX_VALUE;
String commitLogStorePath = getStorePathPhysic();
String[] paths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (String clPath : paths) {
double physicRatio = UtilAll.isPathExists(clPath) ?
UtilAll.getDiskPartitionSpaceUsedPercent(clPath) : -1;
result.put(RunningStats.commitLogDiskRatio.name() + "_" + clPath, String.valueOf(physicRatio));
minPhysicsUsedRatio = Math.min(minPhysicsUsedRatio, physicRatio);
}
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(minPhysicsUsedRatio));
}

{

String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
double logicsRatio = UtilAll.isPathExists(storePathLogics) ?
UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics) : -1;
result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
}

Expand Down Expand Up @@ -1651,25 +1659,43 @@ private boolean isSpaceToDelete() {
cleanImmediately = false;

{
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
if (physicRatio > diskSpaceWarningLevelRatio) {
String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
Set<String> fullStorePath = new HashSet<>();
double minPhysicRatio = 100;
String minStorePath = null;
for (String storePathPhysic : storePaths) {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
if (minPhysicRatio > physicRatio) {
minPhysicRatio = physicRatio;
minStorePath = storePathPhysic;
}
if (physicRatio > diskSpaceCleanForciblyRatio) {
fullStorePath.add(storePathPhysic);
}
}
DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
if (minPhysicRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio +
", so mark disk full, storePathPhysic=" + minStorePath);
}

cleanImmediately = true;
} else if (physicRatio > diskSpaceCleanForciblyRatio) {
} else if (minPhysicRatio > diskSpaceCleanForciblyRatio) {
cleanImmediately = true;
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio +
", so mark disk ok, storePathPhysic=" + minStorePath);
}
}

if (physicRatio < 0 || physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
if (minPhysicRatio < 0 || minPhysicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, "
+ minPhysicRatio + ", storePathPhysic=" + minStorePath);
return true;
}
}
Expand Down Expand Up @@ -1710,8 +1736,27 @@ public int getManualDeleteFileSeveralTimes() {
public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) {
this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes;
}

public double calcStorePathPhysicRatio() {
Set<String> fullStorePath = new HashSet<>();
String storePath = getStorePathPhysic();
String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
double minPhysicRatio = 100;
for (String path : paths) {
double physicRatio = UtilAll.isPathExists(path) ?
UtilAll.getDiskPartitionSpaceUsedPercent(path) : -1;
minPhysicRatio = Math.min(minPhysicRatio, physicRatio);
if (physicRatio > diskSpaceCleanForciblyRatio) {
fullStorePath.add(path);
}
}
DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
return minPhysicRatio;

}

public boolean isSpaceFull() {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
double physicRatio = calcStorePathPhysicRatio();
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
if (physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk of commitLog used: " + physicRatio);
Expand Down
108 changes: 61 additions & 47 deletions store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
Expand All @@ -37,13 +38,13 @@ public class MappedFileQueue {

private final String storePath;

private final int mappedFileSize;
protected final int mappedFileSize;

private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

private final AllocateMappedFileService allocateMappedFileService;

private long flushedWhere = 0;
protected long flushedWhere = 0;
private long committedWhere = 0;

private volatile long storeTimestamp = 0;
Expand Down Expand Up @@ -144,35 +145,40 @@ void deleteExpiredFile(List<MappedFile> files) {
}
}


public boolean load() {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
Arrays.sort(files);
for (File file : files) {

if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
}
File[] ls = dir.listFiles();
if (ls != null) {
return doLoad(Arrays.asList(ls));
}
return true;
}

try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
public boolean doLoad(List<File> files) {
// ascending order
files.sort(Comparator.comparing(File::getName));

for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, ignore it");
return true;
}
}

try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
return true;
}

Expand Down Expand Up @@ -204,33 +210,41 @@ public MappedFile getLastMappedFile(final long startOffset, boolean needCreate)
}

if (createOffset != -1 && needCreate) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
return tryCreateMappedFile(createOffset);
}

if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
return mappedFileLast;
}

protected MappedFile tryCreateMappedFile(long createOffset) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
+ this.mappedFileSize);
return doCreateMappedFile(nextFilePath, nextNextFilePath);
}

protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
MappedFile mappedFile = null;

if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}

if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}

return mappedFile;
this.mappedFiles.add(mappedFile);
}

return mappedFileLast;
return mappedFile;
}

public MappedFile getLastMappedFile(final long startOffset) {
Expand Down
Loading

0 comments on commit 4dbdbf0

Please sign in to comment.