Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,14 @@ private void writeChunkIntoChunkWriter(Chunk chunk) throws IOException {
}

private void writeCachedChunkIntoChunkWriter() throws IOException {
cachedChunk.getData().flip();
if (cachedChunk.getData().position() != 0) {
// If the position of cache chunk data buffer is 0,
// it means that the cache chunk is the first chunk cached,
// and it hasn't merged with any chunk yet.
// If we flip it, both the position and limit in the buffer will be 0,
// which leads to the lost of data.
cachedChunk.getData().flip();
}
writeChunkIntoChunkWriter(cachedChunk);
cachedChunk = null;
cachedChunkMetadata = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ public void testDeserializeCachedChunk() throws Exception {
}

@Test
public void testMixCompact() throws Exception {
public void testMixCompact1() throws Exception {
long testTargetChunkPointNum = 2000L;
long testChunkSizeLowerBound = 1024L;
long testChunkPointNumLowerBound = 100L;
Expand Down Expand Up @@ -834,4 +834,85 @@ public void testMixCompact() throws Exception {
.setChunkPointNumLowerBoundInCompaction(originChunkPointNumLowerBound);
}
}

@Test
public void testMixCompact2() throws Exception {
long testTargetChunkPointNum = 2000L;
long testChunkSizeLowerBound = 1024L;
long testChunkPointNumLowerBound = 100L;
long originTargetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
long originTargetChunkPointNum =
IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(1024 * 1024);
IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(testTargetChunkPointNum);
long originChunkSizeLowerBound =
IoTDBDescriptor.getInstance().getConfig().getChunkSizeLowerBoundInCompaction();
IoTDBDescriptor.getInstance()
.getConfig()
.setChunkSizeLowerBoundInCompaction(testChunkSizeLowerBound);
long originChunkPointNumLowerBound =
IoTDBDescriptor.getInstance().getConfig().getChunkPointNumLowerBoundInCompaction();
IoTDBDescriptor.getInstance()
.getConfig()
.setChunkPointNumLowerBoundInCompaction(testChunkPointNumLowerBound);
try {
List<TsFileResource> sourceFiles = new ArrayList();
int fileNum = 12;
long pointStep = 10L;
long[] points = new long[] {1960, 50, 1960, 50, 2100, 50, 1960, 2300, 2500, 1000, 500, 500};
for (int i = 0; i < fileNum; ++i) {
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(points[i]);
chunkPagePointsNum.add(pagePointsNum);
TsFileResource resource =
new TsFileResource(new File(SEQ_DIRS, String.format("%d-%d-0-0.tsfile", i + 1, i + 1)));
sourceFiles.add(resource);
CompactionFileGeneratorUtils.writeTsFile(
fullPathSet, chunkPagePointsNum, i * 2500L, resource);
}

Map<PartialPath, List<TimeValuePair>> originData =
CompactionCheckerUtils.getDataByQuery(paths, schemaList, sourceFiles, new ArrayList<>());
TsFileNameGenerator.TsFileName tsFileName =
TsFileNameGenerator.getTsFileName(sourceFiles.get(0).getTsFile().getName());
TsFileResource targetResource =
new TsFileResource(
new File(
SEQ_DIRS,
String.format(
"%d-%d-%d-%d.tsfile",
tsFileName.getTime(),
tsFileName.getVersion(),
tsFileName.getInnerCompactionCnt() + 1,
tsFileName.getCrossCompactionCnt())));
InnerSpaceCompactionUtils.compact(targetResource, sourceFiles);
Map<String, List<List<Long>>> chunkPagePointsNumMerged = new HashMap<>();
// outer list is a chunk, inner list is point num in each page
for (String path : fullPathSet) {
CompactionCheckerUtils.putOnePageChunk(chunkPagePointsNumMerged, path, 2010);
CompactionCheckerUtils.putOnePageChunk(chunkPagePointsNumMerged, path, 2010);
CompactionCheckerUtils.putOnePageChunk(chunkPagePointsNumMerged, path, 2100);
CompactionCheckerUtils.putOnePageChunk(chunkPagePointsNumMerged, path, 2010);
CompactionCheckerUtils.putOnePageChunk(chunkPagePointsNumMerged, path, 2300);
CompactionCheckerUtils.putOnePageChunk(chunkPagePointsNumMerged, path, 2500);
CompactionCheckerUtils.putChunk(
chunkPagePointsNumMerged, path, new long[] {1000, 500, 500});
}
Map<PartialPath, List<TimeValuePair>> compactedData =
CompactionCheckerUtils.getDataByQuery(
paths, schemaList, Collections.singletonList(targetResource), new ArrayList<>());
CompactionCheckerUtils.validDataByValueList(originData, compactedData);
CompactionCheckerUtils.checkChunkAndPage(chunkPagePointsNumMerged, targetResource);
} finally {
IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(originTargetChunkSize);
IoTDBDescriptor.getInstance().getConfig().setTargetChunkPointNum(originTargetChunkPointNum);
IoTDBDescriptor.getInstance()
.getConfig()
.setChunkSizeLowerBoundInCompaction(originChunkSizeLowerBound);
IoTDBDescriptor.getInstance()
.getConfig()
.setChunkPointNumLowerBoundInCompaction(originChunkPointNumLowerBound);
}
}
}