Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public AbstractCrossSpaceCompactionTask getCompactionTask(
String logicalStorageGroupName,
String virtualStorageGroupName,
long timePartitionId,
String storageGroupDir,
TsFileManager tsFileManager,
List<TsFileResource> selectedSeqTsFileResourceList,
List<TsFileResource> selectedUnSeqTsFileResourceList) {
Expand All @@ -53,7 +52,6 @@ public AbstractCrossSpaceCompactionTask getCompactionTask(
logicalStorageGroupName,
virtualStorageGroupName,
timePartitionId,
storageGroupDir,
tsFileManager,
selectedSeqTsFileResourceList,
selectedUnSeqTsFileResourceList,
Expand All @@ -65,7 +63,6 @@ public AbstractCrossSpaceCompactionTask getCompactionRecoverTask(
String logicalStorageGroupName,
String virtualStorageGroupName,
long timePartitionId,
String storageGroupDir,
File logFile,
TsFileManager tsFileManager) {
switch (this) {
Expand All @@ -75,7 +72,6 @@ public AbstractCrossSpaceCompactionTask getCompactionRecoverTask(
logicalStorageGroupName,
virtualStorageGroupName,
timePartitionId,
storageGroupDir,
logFile,
CompactionTaskManager.currentTaskNum,
tsFileManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;

import java.util.List;

Expand All @@ -33,10 +32,7 @@ public AbstractCompactionTask createTask(
String logicalStorageGroupName,
String virtualStorageGroupName,
long timePartitionId,
String storageGroupDir,
TsFileManager tsFileManager,
TsFileResourceList seqTsFileResourceList,
TsFileResourceList unseqTsFileResourceList,
List<TsFileResource> selectedSeqTsFileResourceList,
List<TsFileResource> selectedUnSeqTsFileResourceList) {
return IoTDBDescriptor.getInstance()
Expand All @@ -46,7 +42,6 @@ public AbstractCompactionTask createTask(
logicalStorageGroupName,
virtualStorageGroupName,
timePartitionId,
storageGroupDir,
tsFileManager,
selectedSeqTsFileResourceList,
selectedUnSeqTsFileResourceList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,7 @@ public void selectAndSubmit() {
logicalStorageGroupName,
virtualGroupId,
timePartition,
storageGroupDir,
tsFileManager,
sequenceFileList,
unsequenceFileList,
mergeFiles[0],
mergeFiles[1]);
CompactionTaskManager.getInstance().addTaskToWaitingQueue(compactionTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,23 @@ public class RewriteCrossCompactionRecoverTask extends RewriteCrossSpaceCompacti
private static final Logger LOGGER =
LoggerFactory.getLogger(RewriteCrossCompactionRecoverTask.class);
private File compactionLogFile;
private String logicalStorageGroupName;
private String virtualStorageGroup;

public RewriteCrossCompactionRecoverTask(
String logicalStorageGroupName,
String virtualStorageGroupName,
long timePartitionId,
String storageGroupDir,
File logFile,
AtomicInteger currentTaskNum,
TsFileManager tsFileManager) {
super(
logicalStorageGroupName,
virtualStorageGroupName,
timePartitionId,
storageGroupDir,
tsFileManager,
null,
null,
currentTaskNum);
this.compactionLogFile = logFile;
this.logicalStorageGroupName = logicalStorageGroupName;
this.virtualStorageGroup = virtualStorageGroupName;
}

@Override
Expand All @@ -80,9 +74,8 @@ public void doCompaction() {
try {
if (compactionLogFile.exists()) {
LOGGER.info(
"{}-{} [Compaction][Recover] cross space compaction log file {} exists, start to recover it",
logicalStorageGroupName,
virtualStorageGroup,
"{} [Compaction][Recover] cross space compaction log file {} exists, start to recover it",
fullStorageGroupName,
compactionLogFile);
RewriteCrossSpaceCompactionLogAnalyzer logAnalyzer =
new RewriteCrossSpaceCompactionLogAnalyzer(compactionLogFile);
Expand All @@ -93,9 +86,7 @@ public void doCompaction() {
// compaction log file is incomplete
if (targetFileIdentifiers.isEmpty() || sourceFileIdentifiers.isEmpty()) {
LOGGER.info(
"{}-{} [Compaction][Recover] incomplete log file, abort recover",
logicalStorageGroupName,
virtualStorageGroup);
"{} [Compaction][Recover] incomplete log file, abort recover", fullStorageGroupName);
return;
}

Expand Down Expand Up @@ -216,17 +207,15 @@ private boolean handleWithoutAllSourceFilesExist(List<TsFileIdentifier> sourceFi
getFileFromDataDirs(sourceFileIdentifier.getFilePath() + ModificationFile.FILE_SUFFIX);
if (compactionModFile != null && !compactionModFile.delete()) {
LOGGER.error(
"{}-{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness",
logicalStorageGroupName,
virtualStorageGroup,
"{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness",
fullStorageGroupName,
compactionModFile);
handleSuccess = false;
}
if (modFile != null && !modFile.delete()) {
LOGGER.error(
"{}-{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness",
logicalStorageGroupName,
virtualStorageGroup,
"{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness",
fullStorageGroupName,
modFile);
handleSuccess = false;
}
Expand All @@ -235,9 +224,7 @@ private boolean handleWithoutAllSourceFilesExist(List<TsFileIdentifier> sourceFi
if (!InnerSpaceCompactionUtils.deleteTsFilesInDisk(
remainSourceTsFileResources, fullStorageGroupName)) {
LOGGER.error(
"{}-{} [Compaction][Recover] fail to delete remaining source files.",
logicalStorageGroupName,
virtualStorageGroup);
"{} [Compaction][Recover] fail to delete remaining source files.", fullStorageGroupName);
handleSuccess = false;
}
return handleSuccess;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,8 @@
public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactionTask {

private static final Logger logger = LoggerFactory.getLogger("COMPACTION");
protected String storageGroupDir;
protected List<TsFileResource> selectedSeqTsFileResourceList;
protected List<TsFileResource> selectedUnSeqTsFileResourceList;
protected String logicalStorageGroupName;
protected String virtualStorageGroupName;
protected TsFileManager tsFileManager;
private File logFile;

Expand All @@ -69,13 +66,10 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
private final long ACQUIRE_WRITE_LOCK_TIMEOUT =
IoTDBDescriptor.getInstance().getConfig().getCompactionAcquireWriteLockTimeout();

String storageGroupName;

public RewriteCrossSpaceCompactionTask(
String logicalStorageGroupName,
String virtualStorageGroupName,
long timePartitionId,
String storageGroupDir,
TsFileManager tsFileManager,
List<TsFileResource> selectedSeqTsFileResourceList,
List<TsFileResource> selectedUnSeqTsFileResourceList,
Expand All @@ -86,9 +80,6 @@ public RewriteCrossSpaceCompactionTask(
currentTaskNum,
selectedSeqTsFileResourceList,
selectedUnSeqTsFileResourceList);
this.logicalStorageGroupName = logicalStorageGroupName;
this.virtualStorageGroupName = virtualStorageGroupName;
this.storageGroupDir = storageGroupDir;
this.selectedSeqTsFileResourceList = selectedSeqTsFileResourceList;
this.selectedUnSeqTsFileResourceList = selectedUnSeqTsFileResourceList;
this.tsFileManager = tsFileManager;
Expand All @@ -101,7 +92,7 @@ protected void doCompaction() throws Exception {
} catch (Throwable throwable) {
// catch throwable instead of exception to handle OOM errors
CrossSpaceCompactionExceptionHandler.handleException(
storageGroupName,
fullStorageGroupName,
logFile,
targetTsfileResourceList,
selectedSeqTsFileResourceList,
Expand All @@ -128,13 +119,14 @@ private void executeCompaction()
|| selectedSeqTsFileResourceList.isEmpty()
|| selectedUnSeqTsFileResourceList.isEmpty()) {
logger.info(
"{} [Compaction] Cross space compaction file list is empty, end it", storageGroupName);
"{} [Compaction] Cross space compaction file list is empty, end it",
fullStorageGroupName);
return;
}

logger.info(
"{} [Compaction] CrossSpaceCompactionTask start. Sequence files : {}, unsequence files : {}",
storageGroupName,
fullStorageGroupName,
selectedSeqTsFileResourceList,
selectedUnSeqTsFileResourceList);
logFile =
Expand All @@ -154,7 +146,7 @@ private void executeCompaction()
CompactionUtils.compact(
selectedSeqTsFileResourceList, selectedUnSeqTsFileResourceList, targetTsfileResourceList);

CompactionUtils.moveTargetFile(targetTsfileResourceList, false, storageGroupName);
CompactionUtils.moveTargetFile(targetTsfileResourceList, false, fullStorageGroupName);

// indicates that the cross compaction is complete and the result can be reused during a
// restart recovery
Expand Down Expand Up @@ -193,7 +185,7 @@ private void executeCompaction()
}
logger.info(
"{} [Compaction] CrossSpaceCompactionTask Costs {} s",
storageGroupName,
fullStorageGroupName,
(System.currentTimeMillis() - startTime) / 1000);
}
}
Expand Down Expand Up @@ -266,7 +258,7 @@ void deleteOldFiles(List<TsFileResource> tsFileResourceList) throws IOException
}

public String getStorageGroupName() {
return storageGroupName;
return fullStorageGroupName;
}

private void removeCompactionModification() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ private void recoverCrossCompaction() throws Exception {
logicalStorageGroupName,
virtualStorageGroupId,
timePartition,
storageGroupDir,
compactionLog,
tsFileManager)
.call();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,13 +429,10 @@ public void testOneSeqFileAndSixUnseqFile() throws Exception {
COMPACTION_TEST_SG,
"0",
0,
"target",
new TsFileManager(
"root.compactionTest",
"0",
"target\\data\\sequence\\test\\root.compactionTest\\0\\0\\"),
seqTsFileResourceList,
unseqTsFileResourceList,
mergeResource.getSeqFiles(),
mergeResource.getUnseqFiles());
compactionTask.call();
Expand Down Expand Up @@ -735,13 +732,10 @@ public void testFiveSeqFileAndOneUnseqFileWithSomeDeviceNotInSeqFiles() throws E
COMPACTION_TEST_SG,
"0",
0,
"target",
new TsFileManager(
"root.compactionTest",
"0",
"target\\data\\sequence\\test\\root.compactionTest\\0\\0\\"),
seqTsFileResourceList,
unseqTsFileResourceList,
mergeResource.getSeqFiles(),
mergeResource.getUnseqFiles());
compactionTask.call();
Expand Down Expand Up @@ -1040,13 +1034,10 @@ public void testFiveSeqFileAndOneUnseqFile() throws Exception {
COMPACTION_TEST_SG,
"0",
0,
"target",
new TsFileManager(
"root.compactionTest",
"0",
"target\\data\\sequence\\test\\root.compactionTest\\0\\0\\"),
seqTsFileResourceList,
unseqTsFileResourceList,
mergeResource.getSeqFiles(),
mergeResource.getUnseqFiles());
compactionTask.call();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ public void testRecoverWithAllSourceFilesExisted() throws Exception {
COMPACTION_LOG_NAME,
"0",
0,
SEQ_DIRS.getPath(),
compactionLogFile,
CompactionTaskManager.currentTaskNum,
tsFileManager)
Expand Down Expand Up @@ -171,7 +170,6 @@ public void testRecoverWithAllSourceFilesExistedAndTargetFilesMoved() throws Exc
COMPACTION_LOG_NAME,
"0",
0,
SEQ_DIRS.getPath(),
compactionLogFile,
CompactionTaskManager.currentTaskNum,
tsFileManager)
Expand Down Expand Up @@ -243,7 +241,6 @@ public void testRecoverWithSomeSourceFilesExisted() throws Exception {
COMPACTION_LOG_NAME,
"0",
0,
SEQ_DIRS.getPath(),
compactionLogFile,
CompactionTaskManager.currentTaskNum,
tsFileManager)
Expand Down Expand Up @@ -334,7 +331,6 @@ public void testRecoverWithoutAllSourceFilesAndModFilesExist() throws Exception
COMPACTION_LOG_NAME,
"0",
0,
SEQ_DIRS.getPath(),
compactionLogFile,
CompactionTaskManager.currentTaskNum,
tsFileManager)
Expand Down Expand Up @@ -438,7 +434,6 @@ public void testRecoverWithAllSourcesFileAndCompactonModFileExist() throws Excep
COMPACTION_LOG_NAME,
"0",
0,
SEQ_DIRS.getPath(),
compactionLogFile,
CompactionTaskManager.currentTaskNum,
tsFileManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ public void testAlignedCrossSpaceCompactionWithAllDataDeletedInTimeseries() thro
COMPACTION_TEST_SG,
"0",
0,
STORAGE_GROUP_DIR.getPath(),
tsFileManager,
seqResources,
unseqResources,
Expand Down Expand Up @@ -462,7 +461,6 @@ public void testAlignedCrossSpaceCompactionWithAllDataDeletedInOneTargetFile() t
COMPACTION_TEST_SG,
"0",
0,
STORAGE_GROUP_DIR.getPath(),
tsFileManager,
seqResources,
unseqResources,
Expand Down Expand Up @@ -610,7 +608,6 @@ public void testOneDeletionDuringCompaction() throws Exception {
COMPACTION_TEST_SG,
"0",
0,
STORAGE_GROUP_DIR.getPath(),
vsgp.getTsFileResourceManager(),
seqResources,
unseqResources,
Expand Down Expand Up @@ -720,7 +717,6 @@ public void testSeveralDeletionsDuringCompaction() throws Exception {
COMPACTION_TEST_SG,
"0",
0,
STORAGE_GROUP_DIR.getPath(),
vsgp.getTsFileResourceManager(),
seqResources,
unseqResources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@ public FakedCrossSpaceCompactionTask(
String logicalStorageGroupName,
String virtualStorageGroupName,
long timePartitionId,
String storageGroupDir,
TsFileManager tsFileManager,
List<TsFileResource> selectedSeqTsFileResourceList,
List<TsFileResource> selectedUnSeqTsFileResourceList) {
super(
logicalStorageGroupName,
virtualStorageGroupName,
timePartitionId,
storageGroupDir,
tsFileManager,
selectedSeqTsFileResourceList,
selectedUnSeqTsFileResourceList,
Expand Down
Loading