Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public boolean unsealed() {
}

private void prepareDeviceInfos() throws IOException {
boolean canCacheDeviceInfo = resource.getStatus() != TsFileResourceStatus.UNCLOSED;
if (deviceInfoMap == null && compactionScheduleContext != null) {
// get device info from cache
deviceInfoMap = compactionScheduleContext.getResourceDeviceInfo(this.resource);
Expand Down Expand Up @@ -107,7 +108,7 @@ private void prepareDeviceInfos() throws IOException {
}
}
hasDetailedDeviceInfo = true;
if (compactionScheduleContext != null) {
if (compactionScheduleContext != null && canCacheDeviceInfo) {
compactionScheduleContext.addResourceDeviceTimeIndex(this.resource, deviceInfoMap);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InsertionCrossSpaceCompactionTask;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.RewriteCrossSpaceCompactionSelector;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.CrossSpaceCompactionCandidate;
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.InsertionCrossCompactionTaskResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
Expand Down Expand Up @@ -99,6 +102,79 @@ public void testSimpleInsertionCompaction() throws IOException, MergeException {
Assert.assertEquals(seqResource2, result.nextSeqFile);
}

@Test
public void testInsertionCompactionWithCachedDeviceInfoAndUnclosedResource()
throws InterruptedException, IOException {
CompactionScheduleContext context = new CompactionScheduleContext();

IDeviceID d1 = new PlainDeviceID("root.testsg.d1");
IDeviceID d2 = new PlainDeviceID("root.testsg.d2");
IDeviceID d3 = new PlainDeviceID("root.testsg.d3");

TsFileResource seqResource1 = createTsFileResource("1-1-0-0.tsfile", true);
seqResource1.getTsFile().createNewFile();
seqResource1.updateStartTime(d1, 10);
seqResource1.updateEndTime(d1, 20);
seqResource1.serialize();

TsFileResource seqResource2 = createTsFileResource("3-3-0-0.tsfile", true);
seqResource2.getTsFile().createNewFile();
seqResource2.updateStartTime(d1, 40);
seqResource2.updateEndTime(d1, 50);
seqResource2.serialize();

// unclosed
TsFileResource seqResource3 = createTsFileResource("6-6-0-0.tsfile", true);
seqResource3.getTsFile().createNewFile();
seqResource3.setStatusForTest(TsFileResourceStatus.UNCLOSED);
seqResource3.updateStartTime(d1, 70);

seqResources.add(seqResource1);
seqResources.add(seqResource2);
seqResources.add(seqResource3);
TsFileResource unseqResource1 = createTsFileResource("5-5-1-0.tsfile", false);
unseqResource1.getTsFile().createNewFile();
unseqResource1.updateStartTime(d1, 30);
unseqResource1.updateEndTime(d1, 35);
unseqResource1.updateStartTime(d3, 10);
unseqResource1.updateEndTime(d3, 20);
unseqResource1.serialize();
unseqResources.add(unseqResource1);

tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);

Phaser phaser = new Phaser(1);
int submitTaskNum =
CompactionScheduler.scheduleInsertionCompaction(tsFileManager, 0, phaser, context);
Assert.assertEquals(1, submitTaskNum);
// perform insertion compaction
phaser.awaitAdvanceInterruptibly(phaser.arrive());

// unclosed file has sealed
seqResource3.updateEndTime(d1, 80);
seqResource3.updateStartTime(d2, 10);
seqResource3.updateEndTime(d2, 20);
seqResource3.setStatusForTest(TsFileResourceStatus.NORMAL);

TsFileResource unseqResource2 = createTsFileResource("7-7-1-0.tsfile", false);
unseqResource2.getTsFile().createNewFile();
unseqResource2.updateStartTime(d2, 10);
unseqResource2.updateEndTime(d2, 20);
unseqResource2.serialize();
tsFileManager.keepOrderInsert(unseqResource2, false);
// Should not select unseq resource2
// The unclosed resource should not be cached. Otherwise, the results here will be incorrect.
// seq resource3: d1[70, 80] d2[10, 20]
// unseq resource2 d2[10, 20]

submitTaskNum =
CompactionScheduler.scheduleInsertionCompaction(tsFileManager, 0, phaser, context);
Assert.assertEquals(0, submitTaskNum);
Assert.assertTrue(
TsFileResourceUtils.validateTsFileResourcesHasNoOverlap(tsFileManager.getTsFileList(true)));
}

@Test
public void testSimpleInsertionCompactionWithMultiUnseqFiles()
throws IOException, MergeException {
Expand Down