Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ private IoTDBConstant() {}
// compaction mods of previous version (<0.13)
public static final String COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD = "merge.mods";

public static final String SETTLE_SUFFIX = ".settle";
public static final String MODS_SETTLE_FILE_SUFFIX = ".mods.settle";
public static final String BLANK = "";

// write ahead log
public static final String WAL_FILE_PREFIX = "_";
public static final String WAL_FILE_SUFFIX = ".wal";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,16 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static org.apache.iotdb.commons.conf.IoTDBConstant.BLANK;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MODS_SETTLE_FILE_SUFFIX;
import static org.apache.iotdb.commons.conf.IoTDBConstant.SETTLE_SUFFIX;
import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD;
import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX_FROM_OLD;

Expand Down Expand Up @@ -87,18 +94,51 @@ private void recoverCompaction(boolean isInnerSpace, boolean isLogSequence) {
|| !Pattern.compile("[0-9]*").matcher(timePartitionDir.getName()).matches()) {
continue;
}
File[] compactionLogs =
CompactionLogger.findCompactionLogs(isInnerSpace, timePartitionDir.getPath());
for (File compactionLog : compactionLogs) {
logger.info("Calling compaction recover task.");
new CompactionRecoverTask(
logicalStorageGroupName, dataRegionId, tsFileManager, compactionLog, isInnerSpace)
.doCompaction();
}
// recover temporary files generated during compacted
recoverCompaction(isInnerSpace, timePartitionDir);

// recover temporary files generated during .mods file settled
recoverModSettleFile(timePartitionDir.toPath());
}
}
}

public void recoverModSettleFile(Path timePartitionDir) {
try (Stream<Path> settlesStream = Files.list(timePartitionDir)) {
settlesStream
.filter(path -> path.toString().endsWith(MODS_SETTLE_FILE_SUFFIX))
.forEach(
modsSettle -> {
Path originModFile =
modsSettle.resolveSibling(
modsSettle.getFileName().toString().replace(SETTLE_SUFFIX, BLANK));
try {
if (Files.exists(originModFile)) {
Files.deleteIfExists(modsSettle);
} else {
Files.move(modsSettle, originModFile);
}
} catch (IOException e) {
logger.error(
"recover mods file error on delete origin file or rename mods settle,", e);
}
});
} catch (IOException e) {
logger.error("recover mods file error on list files:{}", timePartitionDir, e);
}
}

public void recoverCompaction(boolean isInnerSpace, File timePartitionDir) {
File[] compactionLogs =
CompactionLogger.findCompactionLogs(isInnerSpace, timePartitionDir.getPath());
for (File compactionLog : compactionLogs) {
logger.info("Calling compaction recover task.");
new CompactionRecoverTask(
logicalStorageGroupName, dataRegionId, tsFileManager, compactionLog, isInnerSpace)
.doCompaction();
}
}

/** Check whether there is old compaction log from previous version (<0.13) and recover it. */
private void recoverCompactionBefore013(boolean isInnerSpace) {
String oldLogName =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

public class InnerSpaceCompactionTask extends AbstractCompactionTask {
Expand All @@ -66,6 +67,8 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
protected boolean[] isHoldingReadLock;
protected boolean[] isHoldingWriteLock;

protected long maxModsFileSize;

public InnerSpaceCompactionTask(
long timePartition,
TsFileManager tsFileManager,
Expand Down Expand Up @@ -339,6 +342,7 @@ private void collectSelectedFilesInfo() {
sumOfCompactionCount = 0;
maxFileVersion = -1L;
maxCompactionCount = -1;
maxModsFileSize = 0;
if (selectedTsFileResourceList == null) {
return;
}
Expand All @@ -354,6 +358,10 @@ private void collectSelectedFilesInfo() {
if (fileName.getVersion() > maxFileVersion) {
maxFileVersion = fileName.getVersion();
}
if (!Objects.isNull(resource.getModFile())) {
long modsFileSize = resource.getModFile().getSize();
maxModsFileSize = Math.max(maxModsFileSize, modsFileSize);
}
} catch (IOException e) {
LOGGER.warn("Fail to get the tsfile name of {}", resource.getTsFile(), e);
}
Expand All @@ -380,6 +388,10 @@ public long getMaxFileVersion() {
return maxFileVersion;
}

public long getMaxModsFileSize() {
return maxModsFileSize;
}

@Override
public String toString() {
return storageGroupName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public int compare(AbstractCompactionTask o1, AbstractCompactionTask o2) {

public int compareInnerSpaceCompactionTask(
InnerSpaceCompactionTask o1, InnerSpaceCompactionTask o2) {

// if max mods file size of o1 and o2 are different
// we prefer to execute task with greater mods file
if (o1.getMaxModsFileSize() != o2.getMaxModsFileSize()) {
return o2.getMaxModsFileSize() > o1.getMaxModsFileSize() ? 1 : -1;
}

// if the sum of compaction count of the selected files are different
// we prefer to execute task with smaller compaction count
// this can reduce write amplification
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,15 @@ private long calculateReadingUnseqFile(TsFileResource unseqResource) throws IOEx
return 0;
}
// it means the max size of a timeseries in this file when reading all of its chunk into memory.
return compressionRatio
* concurrentSeriesNum
* (unseqResource.getTsFileSize() * fileInfo.maxSeriesChunkNum / fileInfo.totalChunkNum);

long resourceFileSize =
compressionRatio
* concurrentSeriesNum
* (unseqResource.getTsFileSize() * fileInfo.maxSeriesChunkNum / fileInfo.totalChunkNum);

// add mod file size
long modFileSize = unseqResource.getModFile().getSize();
return resourceFileSize + modFileSize;
}

/**
Expand Down Expand Up @@ -162,6 +168,9 @@ private long calculateReadingSeqFiles(List<TsFileResource> seqResources) throws
cost += seqFileCost;
maxCostOfReadingSeqFile = seqFileCost;
}

// add mod file size
cost += seqResource.getModFile().getSize();
}
return cost;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.db.engine.compaction.schedule.comparator.ICompactionTaskComparator;
import org.apache.iotdb.db.engine.compaction.selector.IInnerSeqSpaceSelector;
import org.apache.iotdb.db.engine.compaction.selector.IInnerUnseqSpaceSelector;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
Expand All @@ -40,6 +41,7 @@
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.PriorityQueue;

/**
Expand All @@ -64,6 +66,7 @@ public class SizeTieredCompactionSelector
protected boolean sequence;
protected TsFileManager tsFileManager;
protected boolean hasNextTimePartition;
private static final long MODS_FILE_SIZE_THRESHOLD = 1024 * 1024 * 50L;

public SizeTieredCompactionSelector(
String storageGroupName,
Expand All @@ -88,27 +91,23 @@ public SizeTieredCompactionSelector(
* longer search for higher layers), otherwise it will return true.
*
* @param level the level to be searched
* @param taskPriorityQueue it stores the batches of files to be compacted and the total size of
* each batch
* @return return whether to continue the search to higher levels
* @throws IOException
*/
private boolean selectLevelTask(
int level, PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue)
throws IOException {
boolean shouldContinueToSearch = true;
@SuppressWarnings({"squid:S3776", "squid:S135"})
private List<Pair<List<TsFileResource>, Long>> selectSingleLevel(int level) throws IOException {
List<TsFileResource> selectedFileList = new ArrayList<>();
long selectedFileSize = 0L;
long targetCompactionFileSize = config.getTargetCompactionFileSize();

List<Pair<List<TsFileResource>, Long>> taskList = new ArrayList<>();
for (TsFileResource currentFile : tsFileResources) {
TsFileNameGenerator.TsFileName currentName =
TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
if (currentName.getInnerCompactionCnt() != level) {
// meet files of another level
if (selectedFileList.size() > 1) {
taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
shouldContinueToSearch = false;
taskList.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
}
selectedFileList = new ArrayList<>();
selectedFileSize = 0L;
Expand All @@ -132,8 +131,7 @@ private boolean selectLevelTask(
|| selectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) {
// submit the task
if (selectedFileList.size() > 1) {
taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
shouldContinueToSearch = false;
taskList.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
}
selectedFileList = new ArrayList<>();
selectedFileSize = 0L;
Expand All @@ -143,17 +141,19 @@ private boolean selectLevelTask(
// if next time partition exists
// submit a merge task even it does not meet the requirement for file num or file size
if (hasNextTimePartition && selectedFileList.size() > 1) {
taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
shouldContinueToSearch = false;
taskList.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
}
return shouldContinueToSearch;
return taskList;
}

/**
* This method searches for a batch of files to be compacted from layer 0 to the highest layer. If
* there are more than a batch of files to be merged on a certain layer, it does not search to
* higher layers. It creates a compaction thread for each batch of files and put it into the
* candidateCompactionTaskQueue of the {@link CompactionTaskManager}.
* This method is used to select a batch of files to be merged. There are two ways to select
* files.If the first method selects the appropriate file, the second method is not executed. The
* first one is based on the mods file corresponding to the file. We will preferentially select
* file with mods file larger than 50M. The second way is based on the file layer from layer 0 to
* the highest layer. If there are more than a batch of files to be merged on a certain layer, it
* does not search to higher layers. It creates a compaction thread for each batch of files and
* put it into the candidateCompactionTaskQueue of the {@link CompactionTaskManager}.
*
* @return Returns whether the file was found and submits the merge task
*/
Expand All @@ -163,12 +163,14 @@ public List<List<TsFileResource>> selectInnerSpaceTask(List<TsFileResource> tsFi
PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
new PriorityQueue<>(new SizeTieredCompactionTaskComparator());
try {
int maxLevel = searchMaxFileLevel();
for (int currentLevel = 0; currentLevel <= maxLevel; currentLevel++) {
if (!selectLevelTask(currentLevel, taskPriorityQueue)) {
break;
}
// preferentially select files based on mods file size
taskPriorityQueue.addAll(selectMaxModsFileTask());

// if a suitable file is not selected in the first step, select the file at the tsfile level
if (taskPriorityQueue.isEmpty()) {
taskPriorityQueue.addAll(selectLevelTask());
}

List<List<TsFileResource>> taskList = new LinkedList<>();
while (taskPriorityQueue.size() > 0) {
List<TsFileResource> resources = taskPriorityQueue.poll().left;
Expand All @@ -181,6 +183,32 @@ public List<List<TsFileResource>> selectInnerSpaceTask(List<TsFileResource> tsFi
return Collections.emptyList();
}

private List<Pair<List<TsFileResource>, Long>> selectLevelTask() throws IOException {
List<Pair<List<TsFileResource>, Long>> taskList = new ArrayList<>();
int maxLevel = searchMaxFileLevel();
for (int currentLevel = 0; currentLevel <= maxLevel; currentLevel++) {
List<Pair<List<TsFileResource>, Long>> singleLevelTask = selectSingleLevel(currentLevel);
if (!singleLevelTask.isEmpty()) {
taskList.addAll(singleLevelTask);
break;
}
}
return taskList;
}

private List<Pair<List<TsFileResource>, Long>> selectMaxModsFileTask() {
List<Pair<List<TsFileResource>, Long>> taskList = new ArrayList<>();
for (TsFileResource tsFileResource : tsFileResources) {
ModificationFile modFile = tsFileResource.getModFile();
if (!Objects.isNull(modFile) && modFile.getSize() > MODS_FILE_SIZE_THRESHOLD) {
taskList.add(
new Pair<>(Collections.singletonList(tsFileResource), tsFileResource.getTsFileSize()));
LOGGER.debug("select tsfile {},the mod file size is {}", tsFileResource, modFile.getSize());
}
}
return taskList;
}

private int searchMaxFileLevel() throws IOException {
int maxLevel = -1;
for (TsFileResource currentFile : tsFileResources) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public class Deletion extends Modification implements Cloneable {
public Deletion(PartialPath path, long fileOffset, long endTime) {
super(Type.DELETION, path, fileOffset);
this.timeRange = new TimeRange(Long.MIN_VALUE, endTime);
this.timeRange.setLeftClose(false);
if (endTime == Long.MAX_VALUE) {
this.timeRange.setRightClose(false);
}
}

/**
Expand All @@ -56,6 +60,12 @@ public Deletion(PartialPath path, long fileOffset, long endTime) {
public Deletion(PartialPath path, long fileOffset, long startTime, long endTime) {
super(Type.DELETION, path, fileOffset);
this.timeRange = new TimeRange(startTime, endTime);
if (startTime == Long.MIN_VALUE) {
this.timeRange.setLeftClose(false);
}
if (endTime == Long.MAX_VALUE) {
this.timeRange.setRightClose(false);
}
}

public long getStartTime() {
Expand Down
Loading