Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RIP-7 Multiple Directories Storage Support #3357

Merged
merged 9 commits into from
Sep 25, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

Expand Down Expand Up @@ -186,7 +187,7 @@ private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandl
}
}

if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
Expand Down Expand Up @@ -635,8 +636,12 @@ public SocketAddress getStoreHost() {
}

private String diskUtil() {
String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
double physicRatio = 100;
String storePath = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (String storePathPhysic : paths) {
physicRatio = Math.min(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic));
}

String storePathLogis =
StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
Expand Down
5 changes: 5 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/UtilAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ public static String timeMillisToHumanString3(final long t) {
cal.get(Calendar.SECOND));
}

public static boolean isPathExists(final String path) {
File file = new File(path);
return file.exists();
}

public static double getDiskPartitionSpaceUsedPercent(final String path) {
if (null == path || path.isEmpty()) {
log.error("Error when measuring disk space usage, path is null or empty, path : {}", path);
Expand Down
26 changes: 24 additions & 2 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

Expand All @@ -43,6 +45,7 @@
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.schedule.ScheduleMessageService;

Expand Down Expand Up @@ -71,9 +74,20 @@ public class CommitLog {

protected final PutMessageLock putMessageLock;

private volatile Set<String> fullStorePaths = Collections.emptySet();

public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();
if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);
} else {
this.mappedFileQueue = new MappedFileQueue(storePath,
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
defaultMessageStore.getAllocateMappedFileService());
}

this.defaultMessageStore = defaultMessageStore;

if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
Expand All @@ -95,6 +109,14 @@ protected PutMessageThreadLocal initialValue() {

}

public void setFullStorePaths(Set<String> fullStorePaths) {
this.fullStorePaths = fullStorePaths;
}

public Set<String> getFullStorePaths() {
return fullStorePaths;
}

public boolean load() {
boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.channels.FileLock;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
Expand Down Expand Up @@ -783,15 +784,22 @@ public HashMap<String, String> getRuntimeInfo() {
HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();

{
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));

double minPhysicsUsedRatio = Double.MAX_VALUE;
String commitLogStorePath = getStorePathPhysic();
String[] paths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (String clPath : paths) {
double physicRatio = UtilAll.isPathExists(clPath) ?
UtilAll.getDiskPartitionSpaceUsedPercent(clPath) : -1;
result.put(RunningStats.commitLogDiskRatio.name() + "_" + clPath, String.valueOf(physicRatio));
minPhysicsUsedRatio = Math.min(minPhysicsUsedRatio, physicRatio);
}
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(minPhysicsUsedRatio));
}

{

String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
double logicsRatio = UtilAll.isPathExists(storePathLogics) ?
UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics) : -1;
result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
}

Expand Down Expand Up @@ -1650,25 +1658,43 @@ private boolean isSpaceToDelete() {
cleanImmediately = false;

{
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
if (physicRatio > diskSpaceWarningLevelRatio) {
String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
Set<String> fullStorePath = new HashSet<>();
double minPhysicRatio = 100;
String minStorePath = null;
for (String storePathPhysic : storePaths) {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
if (minPhysicRatio > physicRatio) {
minPhysicRatio = physicRatio;
minStorePath = storePathPhysic;
}
if (physicRatio > diskSpaceCleanForciblyRatio) {
fullStorePath.add(storePathPhysic);
}
}
DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
if (minPhysicRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio +
", so mark disk full, storePathPhysic=" + minStorePath);
}

cleanImmediately = true;
} else if (physicRatio > diskSpaceCleanForciblyRatio) {
} else if (minPhysicRatio > diskSpaceCleanForciblyRatio) {
cleanImmediately = true;
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio +
", so mark disk ok, storePathPhysic=" + minStorePath);
}
}

if (physicRatio < 0 || physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
if (minPhysicRatio < 0 || minPhysicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, "
+ minPhysicRatio + ", storePathPhysic=" + minStorePath);
return true;
}
}
Expand Down Expand Up @@ -1709,8 +1735,27 @@ public int getManualDeleteFileSeveralTimes() {
public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) {
this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes;
}

public double calcStorePathPhysicRatio() {
Set<String> fullStorePath = new HashSet<>();
String storePath = getStorePathPhysic();
String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
double minPhysicRatio = 100;
for (String path : paths) {
double physicRatio = UtilAll.isPathExists(path) ?
UtilAll.getDiskPartitionSpaceUsedPercent(path) : -1;
minPhysicRatio = Math.min(minPhysicRatio, physicRatio);
if (physicRatio > diskSpaceCleanForciblyRatio) {
fullStorePath.add(path);
}
}
DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
return minPhysicRatio;

}

public boolean isSpaceFull() {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
double physicRatio = calcStorePathPhysicRatio();
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
if (physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk of commitLog used: " + physicRatio);
Expand Down
108 changes: 61 additions & 47 deletions store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
Expand All @@ -37,13 +38,13 @@ public class MappedFileQueue {

private final String storePath;

private final int mappedFileSize;
protected final int mappedFileSize;

private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

private final AllocateMappedFileService allocateMappedFileService;

private long flushedWhere = 0;
protected long flushedWhere = 0;
private long committedWhere = 0;

private volatile long storeTimestamp = 0;
Expand Down Expand Up @@ -144,35 +145,40 @@ void deleteExpiredFile(List<MappedFile> files) {
}
}


public boolean load() {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
Arrays.sort(files);
for (File file : files) {

if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
}
File[] ls = dir.listFiles();
if (ls != null) {
return doLoad(Arrays.asList(ls));
}
return true;
}

try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
public boolean doLoad(List<File> files) {
// ascending order
files.sort(Comparator.comparing(File::getName));

for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, ignore it");
return true;
}
}

try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
return true;
}

Expand Down Expand Up @@ -204,33 +210,41 @@ public MappedFile getLastMappedFile(final long startOffset, boolean needCreate)
}

if (createOffset != -1 && needCreate) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
return tryCreateMappedFile(createOffset);
}

if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
return mappedFileLast;
}

protected MappedFile tryCreateMappedFile(long createOffset) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
+ this.mappedFileSize);
return doCreateMappedFile(nextFilePath, nextNextFilePath);
}

protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
MappedFile mappedFile = null;

if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}

if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}

return mappedFile;
this.mappedFiles.add(mappedFile);
}

return mappedFileLast;
return mappedFile;
}

public MappedFile getLastMappedFile(final long startOffset) {
Expand Down
Loading