Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
09288cd
add enable unseq compaction
zhanglingzhe0820 Nov 6, 2020
49fab34
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Nov 7, 2020
2854f93
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Nov 7, 2020
9ba6816
Merge branches 'master' and 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Nov 8, 2020
9da6835
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Nov 10, 2020
f438705
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Nov 11, 2020
b5a17d5
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Nov 12, 2020
ae79d88
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Nov 12, 2020
97a2b13
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Nov 13, 2020
7255242
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Nov 16, 2020
b80249f
Merge branch 'master' of https://github.com/zhanglingzhe0820/incubato…
zhanglingzhe0820 Nov 18, 2020
e7a4e32
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Nov 18, 2020
3a99508
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Nov 19, 2020
095c5de
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Nov 19, 2020
658497c
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Nov 24, 2020
181682b
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Nov 29, 2020
1b1a09b
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Nov 30, 2020
ae6b811
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Dec 1, 2020
ad21302
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Dec 2, 2020
8f906cd
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Dec 8, 2020
0085fe6
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Dec 9, 2020
18d7c0c
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Dec 16, 2020
23ef328
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Dec 24, 2020
8715ed4
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Dec 24, 2020
d71cd0d
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Dec 24, 2020
096bc98
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Dec 24, 2020
e596a9b
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Dec 25, 2020
6262d1a
Merge branch 'master' of https://github.com/apache/iotdb
Dec 29, 2020
743c1fa
Merge branch 'master' of https://github.com/apache/iotdb
Dec 31, 2020
0ed59df
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Jan 4, 2021
392fb2a
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Jan 5, 2021
c2dd30c
Merge branch 'master' of https://github.com/zhanglingzhe0820/incubato…
zhanglingzhe0820 Jan 6, 2021
d7e20a1
Merge branch 'master' of https://github.com/apache/iotdb
zhanglingzhe0820 Jan 6, 2021
b800db0
Merge branch 'master' of https://github.com/apache/iotdb
Jan 8, 2021
ce30cb2
Merge branch 'master' of https://github.com/apache/iotdb
Jan 8, 2021
0263af1
Merge branch 'master' of https://github.com/apache/iotdb
Jan 11, 2021
6b13448
Merge branch 'master' of https://github.com/zhanglingzhe0820/incubato…
Jan 11, 2021
0eec294
Merge branch 'master' of https://github.com/apache/iotdb
Jan 13, 2021
7b257d4
Merge branch 'master' of https://github.com/zhanglingzhe0820/incubato…
Jan 21, 2021
3b17b15
Merge branches 'master' and 'master' of https://github.com/apache/iotdb
Jan 21, 2021
f664dd6
Merge branch 'master' of https://github.com/apache/iotdb
Jan 26, 2021
e71ab2a
Merge branch 'master' of https://github.com/apache/iotdb
Feb 9, 2021
23b3032
Merge branch 'master' of https://github.com/apache/iotdb
Feb 18, 2021
f462c01
Merge branch 'master' of https://github.com/apache/iotdb
Feb 21, 2021
d32071e
Merge branch 'master' of https://github.com/apache/iotdb
Feb 25, 2021
1f0a455
Merge branch 'master' of https://github.com/apache/iotdb
Mar 5, 2021
3f03760
open merge ci log for test
Mar 11, 2021
eaf2797
[ISSUE-2730] Add the number of unseq merge times in TsFile name.
Mar 12, 2021
7111ce4
[ISSUE-2730] Add the number of unseq merge times in TsFile name.
Mar 12, 2021
21f6d30
[ISSUE-2730] Add the number of unseq merge times in TsFile name.
Mar 12, 2021
e14a6da
set logger level
Mar 15, 2021
ee040f4
Merge branch 'change_ts_file_name_3' of https://github.com/WilliamSon…
Mar 15, 2021
bcca6a3
rollback unpassed ci
Mar 16, 2021
8a76f1b
fix so much bugs about file name change
Mar 16, 2021
8cdc7cf
Merge branch 'master' of https://github.com/apache/iotdb
Mar 23, 2021
d3ee9ef
Merge branch 'master' into change_ts_file_name
Mar 25, 2021
f6a99a2
fix format
Mar 25, 2021
971996e
fix windows ci
Mar 25, 2021
a09cf77
fix format
Mar 25, 2021
b0cfcef
fix format
Mar 25, 2021
5b1bf07
fix ci
Mar 25, 2021
eef60f6
fix ci
Mar 25, 2021
3b9e1c9
Merge branch 'master' of https://github.com/apache/iotdb
Mar 25, 2021
d217a31
merge
Mar 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/src/assembly/resources/conf/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@
<appender-ref ref="stdout"/>
</root>
<logger level="OFF" name="io.moquette.broker.metrics.MQTTMessageLogger"/>
<logger level="debug" name="org.apache.iotdb.db.integration.IoTDBRemovePartitionIT"/>
<logger level="info" name="org.apache.iotdb.db.service"/>
<logger level="info" name="org.apache.iotdb.db.conf"/>
<logger level="info" name="org.apache.iotdb.db.cost.statistic">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,12 @@ private IoTDBConstant() {}

// thrift
public static final int LEFT_SIZE_IN_REQUEST = 4 * 1024 * 1024;

// change tsFile name
public static final int FILE_NAME_SUFFIX_INDEX = 0;
public static final int FILE_NAME_SUFFIX_TIME_INDEX = 0;
public static final int FILE_NAME_SUFFIX_VERSION_INDEX = 1;
public static final int FILE_NAME_SUFFIX_MERGECNT_INDEX = 2;
public static final int FILE_NAME_SUFFIX_UNSEQMERGECNT_INDEX = 3;
public static final String FILE_NAME_SUFFIX_SEPARATOR = "\\.";
}
Original file line number Diff line number Diff line change
Expand Up @@ -392,20 +392,20 @@ public void mergeEndAction(
mergedFile.delete();
}
updateMergeModification(seqFile);
if (i == seqFiles.size() - 1) {
// FIXME if there is an exception, the the modification file will be not closed.
removeMergingModification();
isUnseqMerging = false;
Files.delete(mergeLog.toPath());
}
} catch (IOException e) {
logger.error(
"{} a merge task ends but cannot delete log {}", storageGroupName, mergeLog.toPath());
} finally {
doubleWriteUnlock(seqFile);
}
}

try {
removeMergingModification();
isUnseqMerging = false;
Files.delete(mergeLog.toPath());
} catch (IOException e) {
logger.error(
"{} a merge task ends but cannot delete log {}", storageGroupName, mergeLog.toPath());
}

logger.info("{} a merge task ends", storageGroupName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ private void merge(
}
isSeqMerging = true;
long startTimeMillis = System.currentTimeMillis();
CompactionLogger compactionLogger = null;
try {
logger.info("{} start to filter compaction condition", storageGroupName);
for (int i = 0; i < currMaxLevel - 1; i++) {
Expand All @@ -600,14 +601,13 @@ private void merge(
isSeqMerging = false;
merge(isForceFullMerge, getTsFileList(true), mergeResources.get(i), Long.MAX_VALUE);
} else {
CompactionLogger compactionLogger =
new CompactionLogger(storageGroupDir, storageGroupName);
compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName);
// log source file list and target file for recover
for (TsFileResource mergeResource : mergeResources.get(i)) {
compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
}
File newLevelFile =
createNewTsFileName(mergeResources.get(i).get(0).getTsFile(), i + 1);
TsFileResource.modifyTsFileNameMergeCnt(mergeResources.get(i).get(0).getTsFile());
compactionLogger.logSequence(sequence);
compactionLogger.logFile(TARGET_NAME, newLevelFile);
List<TsFileResource> toMergeTsFiles = mergeResources.get(i);
Expand Down Expand Up @@ -664,6 +664,13 @@ private void merge(
}
}
} catch (Exception e) {
if (compactionLogger != null) {
try {
compactionLogger.close();
} catch (IOException ioException) {
logger.error("{} Compaction log close fail", storageGroupName + COMPACTION_LOG_NAME);
}
}
restoreCompaction();
logger.error("Error occurred in Compaction Merge thread", e);
} finally {
Expand All @@ -677,13 +684,6 @@ private void merge(
}
}

/** if level < maxLevel-1, the file need compaction else, the file can be merged later */
private File createNewTsFileName(File sourceFile, int level) {
String path = sourceFile.getPath();
String prefixPath = path.substring(0, path.lastIndexOf(FILE_NAME_SEPARATOR) + 1);
return new File(prefixPath + level + TSFILE_SUFFIX);
}

private List<SortedSet<TsFileResource>> newSequenceTsFileResources(Long k) {
List<SortedSet<TsFileResource>> newSequenceTsFileResources = new CopyOnWriteArrayList<>();
for (int i = 0; i < seqLevelNum; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@
import java.util.Map;
import java.util.Map.Entry;

import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.modifyTsFileNameUnseqMergCnt;

/**
* MergeFileTask merges the merge temporary files with the seqFiles, either move the merged chunks
* in the temp files into the seqFiles or move the unmerged chunks into the merge temp files,
* depending on which one is the majority.
*/
class MergeFileTask {
public class MergeFileTask {

private static final Logger logger = LoggerFactory.getLogger(MergeFileTask.class);

Expand Down Expand Up @@ -196,11 +198,20 @@ private void moveMergedToOld(TsFileResource seqFile) throws IOException {
}
updateStartTimeAndEndTime(seqFile, oldFileWriter);
oldFileWriter.endFile();

updatePlanIndexes(seqFile);
seqFile.serialize();
mergeLogger.logFileMergeEnd();
logger.debug("{} moved merged chunks of {} to the old file", taskName, seqFile);

newFileWriter.getFile().delete();
// change tsFile name
File nextMergeVersionFile = modifyTsFileNameUnseqMergCnt(seqFile.getTsFile());
fsFactory.moveFile(seqFile.getTsFile(), nextMergeVersionFile);
fsFactory.moveFile(
fsFactory.getFile(seqFile.getTsFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX),
fsFactory.getFile(
nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
seqFile.setFile(nextMergeVersionFile);
} catch (Exception e) {
restoreOldFile(seqFile);
throw e;
Expand Down Expand Up @@ -327,20 +338,26 @@ private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {
}
updateStartTimeAndEndTime(seqFile, fileWriter);
resource.removeFileReader(seqFile);
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
fileWriter.endFile();

updatePlanIndexes(seqFile);
seqFile.serialize();
mergeLogger.logFileMergeEnd();
logger.debug("{} moved unmerged chunks of {} to the new file", taskName, seqFile);

seqFile.writeLock();
try {
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
File newMergeFile = seqFile.getTsFile();
newMergeFile.delete();
fsFactory.moveFile(fileWriter.getFile(), newMergeFile);
seqFile.setFile(newMergeFile);
seqFile.serialize();

// change tsFile name
seqFile.getTsFile().delete();
File nextMergeVersionFile = modifyTsFileNameUnseqMergCnt(seqFile.getTsFile());
fsFactory.moveFile(fileWriter.getFile(), nextMergeVersionFile);
fsFactory.moveFile(
fsFactory.getFile(seqFile.getTsFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX),
fsFactory.getFile(
nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
seqFile.setFile(nextMergeVersionFile);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1239,7 +1239,7 @@ private String getNewTsFileName(long timePartitionId) {
}

private String getNewTsFileName(long time, long version, int mergeCnt) {
return time + FILE_NAME_SEPARATOR + version + FILE_NAME_SEPARATOR + mergeCnt + TSFILE_SUFFIX;
return TsFileResource.getNewTsFileName(System.currentTimeMillis(), version, 0, 0);
}

public void syncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
Expand Down Expand Up @@ -2466,7 +2466,8 @@ private String getFileNameForLoadingFile(
return tsfileName;
}

return getNewTsFileName(preTime + ((subsequenceTime - preTime) >> 1), subsequenceVersion, 0);
return TsFileResource.getNewTsFileName(
preTime + ((subsequenceTime - preTime) >> 1), subsequenceVersion, 0, 0);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@
import java.util.Random;
import java.util.Set;

import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_INDEX;
import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_MERGECNT_INDEX;
import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_SEPARATOR;
import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_TIME_INDEX;
import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_UNSEQMERGECNT_INDEX;
import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_VERSION_INDEX;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;

@SuppressWarnings("java:S1135") // ignore todos
public class TsFileResource {

Expand Down Expand Up @@ -754,4 +763,127 @@ public long getVersion() {
public void setTimeIndex(ITimeIndex timeIndex) {
this.timeIndex = timeIndex;
}

// change tsFile name

public static String getNewTsFileName(long time, long version, int mergeCnt, int unSeqMergeCnt) {
return time
+ FILE_NAME_SEPARATOR
+ version
+ FILE_NAME_SEPARATOR
+ mergeCnt
+ FILE_NAME_SEPARATOR
+ unSeqMergeCnt
+ TSFILE_SUFFIX;
}

public static TsFileName getTsFileName(String FileName) {
String[] fileName =
FileName.split(FILE_NAME_SUFFIX_SEPARATOR)[FILE_NAME_SUFFIX_INDEX].split(
FILE_NAME_SEPARATOR);
TsFileName tsFileName =
new TsFileName(
Long.parseLong(fileName[FILE_NAME_SUFFIX_TIME_INDEX]),
Long.parseLong(fileName[FILE_NAME_SUFFIX_VERSION_INDEX]),
Integer.parseInt(fileName[FILE_NAME_SUFFIX_MERGECNT_INDEX]),
Integer.parseInt(fileName[FILE_NAME_SUFFIX_UNSEQMERGECNT_INDEX]));
return tsFileName;
}

public static TsFileResource modifyTsFileNameUnseqMergCnt(TsFileResource tsFileResource) {
File tsFile = tsFileResource.getTsFile();
String path = tsFile.getParent();
TsFileName tsFileName = getTsFileName(tsFileResource.getTsFile().getName());
tsFileName.setUnSeqMergeCnt(tsFileName.getUnSeqMergeCnt() + 1);
tsFileResource.setFile(
new File(
path,
tsFileName.time
+ FILE_NAME_SEPARATOR
+ tsFileName.version
+ FILE_NAME_SEPARATOR
+ tsFileName.mergeCnt
+ FILE_NAME_SEPARATOR
+ tsFileName.unSeqMergeCnt
+ TSFILE_SUFFIX));
return tsFileResource;
}

public static File modifyTsFileNameUnseqMergCnt(File tsFile) {
String path = tsFile.getParent();
TsFileName tsFileName = getTsFileName(tsFile.getName());
tsFileName.setUnSeqMergeCnt(tsFileName.getUnSeqMergeCnt() + 1);
return new File(
path,
tsFileName.time
+ FILE_NAME_SEPARATOR
+ tsFileName.version
+ FILE_NAME_SEPARATOR
+ tsFileName.mergeCnt
+ FILE_NAME_SEPARATOR
+ tsFileName.unSeqMergeCnt
+ TSFILE_SUFFIX);
}

public static File modifyTsFileNameMergeCnt(File tsFile) {
String path = tsFile.getParent();
TsFileName tsFileName = getTsFileName(tsFile.getName());
tsFileName.setMergeCnt(tsFileName.getMergeCnt() + 1);
return new File(
path,
tsFileName.time
+ FILE_NAME_SEPARATOR
+ tsFileName.version
+ FILE_NAME_SEPARATOR
+ tsFileName.mergeCnt
+ FILE_NAME_SEPARATOR
+ tsFileName.unSeqMergeCnt
+ TSFILE_SUFFIX);
}

public static class TsFileName {
private long time;
private long version;
private int mergeCnt;
private int unSeqMergeCnt;

public TsFileName(long time, long version, int mergeCnt, int unSeqMergeCnt) {
this.time = time;
this.version = version;
this.mergeCnt = mergeCnt;
this.unSeqMergeCnt = unSeqMergeCnt;
}

public long getTime() {
return time;
}

public long getVersion() {
return version;
}

public int getMergeCnt() {
return mergeCnt;
}

public int getUnSeqMergeCnt() {
return unSeqMergeCnt;
}

public void setTime(long time) {
this.time = time;
}

public void setVersion(long version) {
this.version = version;
}

public void setMergeCnt(int mergeCnt) {
this.mergeCnt = mergeCnt;
}

public void setUnSeqMergeCnt(int unSeqMergeCnt) {
this.unSeqMergeCnt = unSeqMergeCnt;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public static List<AggregateResult> mergeRecordByPath(

public static long splitAndGetTsFileVersion(String tsFileName) {
String[] names = tsFileName.split(FILE_NAME_SEPARATOR);
if (names.length != 3) {
if (names.length != 4) {
return 0;
}
return Long.parseLong(names[1]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public void testAppendMerge() throws IOException, IllegalPathException {
+ 0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 1
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
+ ".tsfile"));
TsFileResource targetTsfileResource = new TsFileResource(file);
RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getMergeWriteRateLimiter();
Expand Down Expand Up @@ -168,6 +170,8 @@ public void testDeserializeMerge() throws IOException, IllegalPathException {
+ 0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 1
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
+ ".tsfile"));
TsFileResource targetTsfileResource = new TsFileResource(file);
RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getMergeWriteRateLimiter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WritePro
+ i
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
+ ".tsfile"));
TsFileResource tsFileResource = new TsFileResource(file);
tsFileResource.setClosed(true);
Expand All @@ -117,6 +119,8 @@ void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WritePro
+ (10000 + i)
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
+ ".tsfile"));
TsFileResource tsFileResource = new TsFileResource(file);
tsFileResource.setClosed(true);
Expand All @@ -133,6 +137,8 @@ void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WritePro
+ unseqFileNum
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
+ ".tsfile"));
TsFileResource tsFileResource = new TsFileResource(file);
tsFileResource.setClosed(true);
Expand Down
Loading