Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.FileElement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.PageElement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.AbstractCompactionWriter;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.LazyChunkLoader;
import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
Expand Down Expand Up @@ -249,7 +250,7 @@ void deserializeChunkIntoPageQueue(ChunkMetadataElement chunkMetadataElement) th
List<List<ByteBuffer>> compressedValuePageDatas = new ArrayList<>();

// deserialize time chunk
Chunk timeChunk = chunkMetadataElement.chunk;
Chunk timeChunk = chunkMetadataElement.timeChunkLoader.loadChunk();

ChunkReader chunkReader = new ChunkReader(timeChunk);
ByteBuffer chunkDataBuffer = timeChunk.getData();
Expand All @@ -268,15 +269,19 @@ void deserializeChunkIntoPageQueue(ChunkMetadataElement chunkMetadataElement) th
}

// deserialize value chunks
List<Chunk> valueChunks = chunkMetadataElement.valueChunks;
for (int i = 0; i < valueChunks.size(); i++) {
Chunk valueChunk = valueChunks.get(i);
if (valueChunk == null) {
List<LazyChunkLoader> readValueChunks = chunkMetadataElement.valueChunkLoaders;
List<Chunk> valueChunks = new ArrayList<>(readValueChunks.size());
for (int i = 0; i < readValueChunks.size(); i++) {
LazyChunkLoader readValueChunk = readValueChunks.get(i);
if (readValueChunk == null) {
// value chunk has been deleted completely
valueChunks.add(null);
valuePageHeaders.add(null);
compressedValuePageDatas.add(null);
continue;
}
Chunk valueChunk = readValueChunk.loadChunk();
valueChunks.add(valueChunk);
chunkReader = new ChunkReader(valueChunk);
chunkDataBuffer = valueChunk.getData();
chunkHeader = valueChunk.getHeader();
Expand Down Expand Up @@ -335,39 +340,39 @@ void readChunk(ChunkMetadataElement chunkMetadataElement) throws IOException {
updateSummary(chunkMetadataElement, ChunkStatus.READ_IN);
AlignedChunkMetadata alignedChunkMetadata =
(AlignedChunkMetadata) chunkMetadataElement.chunkMetadata;
chunkMetadataElement.chunk =
readerCacheMap
.get(chunkMetadataElement.fileElement.resource)
.readMemChunk((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata());
List<Chunk> valueChunks = new ArrayList<>();
LazyChunkLoader timeChunkLoader =
new LazyChunkLoader(
readerCacheMap.get(chunkMetadataElement.fileElement.resource),
(ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata());
List<LazyChunkLoader> valueChunkLoaders =
new ArrayList<>(alignedChunkMetadata.getValueChunkMetadataList().size());
for (IChunkMetadata valueChunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) {
if (valueChunkMetadata == null || valueChunkMetadata.getStatistics().getCount() == 0) {
// value chunk has been deleted completely or is empty value chunk
valueChunks.add(null);
valueChunkLoaders.add(null);
continue;
}
valueChunks.add(
readerCacheMap
.get(chunkMetadataElement.fileElement.resource)
.readMemChunk((ChunkMetadata) valueChunkMetadata));
valueChunkLoaders.add(
new LazyChunkLoader(
readerCacheMap.get(chunkMetadataElement.fileElement.resource),
(ChunkMetadata) valueChunkMetadata));
}
chunkMetadataElement.valueChunks = valueChunks;
chunkMetadataElement.timeChunkLoader = timeChunkLoader;
chunkMetadataElement.valueChunkLoaders = valueChunkLoaders;
setForceDecoding(chunkMetadataElement);
}

void setForceDecoding(ChunkMetadataElement chunkMetadataElement) {
if (timeColumnMeasurementSchema.getCompressor()
!= chunkMetadataElement.chunk.getHeader().getCompressionType()
|| timeColumnMeasurementSchema.getEncodingType()
!= chunkMetadataElement.chunk.getHeader().getEncodingType()) {
void setForceDecoding(ChunkMetadataElement chunkMetadataElement) throws IOException {
ChunkHeader timeChunkHeader = chunkMetadataElement.timeChunkLoader.loadChunkHeader();
if (timeColumnMeasurementSchema.getCompressor() != timeChunkHeader.getCompressionType()
|| timeColumnMeasurementSchema.getEncodingType() != timeChunkHeader.getEncodingType()) {
chunkMetadataElement.needForceDecoding = true;
return;
}
for (Chunk chunk : chunkMetadataElement.valueChunks) {
if (chunk == null) {
for (LazyChunkLoader valueChunkLoader : chunkMetadataElement.valueChunkLoaders) {
if (valueChunkLoader == null) {
continue;
}
ChunkHeader header = chunk.getHeader();
ChunkHeader header = valueChunkLoader.loadChunkHeader();
String measurementId = header.getMeasurementID();
IMeasurementSchema measurementSchema = measurementSchemaMap.get(measurementId);
if (measurementSchema == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ private void compactWithNonOverlapChunk(ChunkMetadataElement chunkMetadataElemen
if (isAligned) {
success =
compactionWriter.flushAlignedChunk(
chunkMetadataElement.chunk,
chunkMetadataElement.timeChunkLoader,
((AlignedChunkMetadata) chunkMetadataElement.chunkMetadata).getTimeChunkMetadata(),
chunkMetadataElement.valueChunks,
chunkMetadataElement.valueChunkLoaders,
((AlignedChunkMetadata) chunkMetadataElement.chunkMetadata)
.getValueChunkMetadataList(),
subTaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element;

import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.LazyChunkLoader;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.read.common.Chunk;

Expand All @@ -36,9 +37,10 @@ public class ChunkMetadataElement {

public FileElement fileElement;

public Chunk chunk;
public LazyChunkLoader timeChunkLoader;
public List<LazyChunkLoader> valueChunkLoaders;

public List<Chunk> valueChunks;
public Chunk chunk;

public boolean needForceDecoding;

Expand All @@ -53,6 +55,7 @@ public ChunkMetadataElement(

public void clearChunks() {
chunk = null;
valueChunks = null;
timeChunkLoader = null;
valueChunkLoaders = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ public abstract boolean flushNonAlignedChunk(
Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId) throws IOException;

public abstract boolean flushAlignedChunk(
Chunk timeChunk,
LazyChunkLoader timeChunkLoader,
IChunkMetadata timeChunkMetadata,
List<Chunk> valueChunks,
List<LazyChunkLoader> valueChunkLoaders,
List<IChunkMetadata> valueChunkMetadatas,
int subTaskId)
throws IOException;
Expand All @@ -198,9 +198,9 @@ protected void flushNonAlignedChunkToFileWriter(
@SuppressWarnings("squid:S2445")
protected void flushAlignedChunkToFileWriter(
CompactionTsFileWriter targetWriter,
Chunk timeChunk,
LazyChunkLoader timeChunkLoader,
IChunkMetadata timeChunkMetadata,
List<Chunk> valueChunks,
List<LazyChunkLoader> valueChunkLoaders,
List<IChunkMetadata> valueChunkMetadatas,
int subTaskId)
throws IOException {
Expand All @@ -213,12 +213,14 @@ protected void flushAlignedChunkToFileWriter(
targetWriter.markStartingWritingAligned();

// flush time chunk

Chunk timeChunk = timeChunkLoader.loadChunk();
targetWriter.writeChunk(timeChunk, (ChunkMetadata) timeChunkMetadata);

// flush value chunks
for (int i = 0; i < valueChunks.size(); i++) {
Chunk valueChunk = valueChunks.get(i);
if (valueChunk == null) {
for (int i = 0; i < valueChunkLoaders.size(); i++) {
LazyChunkLoader readValueChunk = valueChunkLoaders.get(i);
if (readValueChunk == null) {
// sub sensor does not exist in current file or value chunk has been deleted completely
ValueChunkWriter valueChunkWriter = alignedChunkWriter.getValueChunkWriterByIndex(i);
targetWriter.writeEmptyValueChunk(
Expand All @@ -229,6 +231,7 @@ protected void flushAlignedChunkToFileWriter(
Statistics.getStatsByType(valueChunkWriter.getDataType()));
continue;
}
Chunk valueChunk = readValueChunk.loadChunk();
targetWriter.writeChunk(valueChunk, (ChunkMetadata) valueChunkMetadatas.get(i));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ public boolean flushNonAlignedChunk(Chunk chunk, ChunkMetadata chunkMetadata, in
*/
@Override
public boolean flushAlignedChunk(
Chunk timeChunk,
LazyChunkLoader timeChunkLoader,
IChunkMetadata timeChunkMetadata,
List<Chunk> valueChunks,
List<LazyChunkLoader> valueChunkLoaders,
List<IChunkMetadata> valueChunkMetadatas,
int subTaskId)
throws IOException {
Expand All @@ -109,9 +109,9 @@ public boolean flushAlignedChunk(

flushAlignedChunkToFileWriter(
targetFileWriters.get(fileIndex),
timeChunk,
timeChunkLoader,
timeChunkMetadata,
valueChunks,
valueChunkLoaders,
valueChunkMetadatas,
subTaskId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.tsfile.exception.write.PageException;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
Expand Down Expand Up @@ -81,9 +82,9 @@ public boolean flushNonAlignedChunk(Chunk chunk, ChunkMetadata chunkMetadata, in
*/
@Override
public boolean flushAlignedChunk(
Chunk timeChunk,
LazyChunkLoader timeChunkLoader,
IChunkMetadata timeChunkMetadata,
List<Chunk> valueChunks,
List<LazyChunkLoader> valueChunkLoaders,
List<IChunkMetadata> valueChunkMetadatas,
int subTaskId)
throws IOException {
Expand All @@ -94,12 +95,17 @@ public boolean flushAlignedChunk(
sealChunk(fileWriter, chunkWriters[subTaskId], subTaskId);
}
if (chunkPointNumArray[subTaskId] != 0
|| !checkIsAlignedChunkLargeEnough(timeChunk, valueChunks)) {
|| !checkIsAlignedChunkLargeEnough(timeChunkLoader, valueChunkLoaders)) {
// if there is unsealed chunk or current chunk is not large enough, then deserialize the chunk
return false;
}
flushAlignedChunkToFileWriter(
fileWriter, timeChunk, timeChunkMetadata, valueChunks, valueChunkMetadatas, subTaskId);
fileWriter,
timeChunkLoader,
timeChunkMetadata,
valueChunkLoaders,
valueChunkMetadatas,
subTaskId);

isEmptyFile = false;
lastTime[subTaskId] = timeChunkMetadata.getEndTime();
Expand Down Expand Up @@ -177,15 +183,16 @@ public boolean flushNonAlignedPage(
return true;
}

private boolean checkIsAlignedChunkLargeEnough(Chunk timeChunk, List<Chunk> valueChunks) {
if (checkIsChunkLargeEnough(timeChunk)) {
private boolean checkIsAlignedChunkLargeEnough(
LazyChunkLoader timeChunk, List<LazyChunkLoader> valueChunks) throws IOException {
if (checkIsChunkLargeEnough(timeChunk.getChunkMetadata(), timeChunk.loadChunkHeader())) {
return true;
}
for (Chunk valueChunk : valueChunks) {
for (LazyChunkLoader valueChunk : valueChunks) {
if (valueChunk == null) {
continue;
}
if (checkIsChunkLargeEnough(valueChunk)) {
if (checkIsChunkLargeEnough(valueChunk.getChunkMetadata(), valueChunk.loadChunkHeader())) {
return true;
}
}
Expand All @@ -197,6 +204,11 @@ private boolean checkIsChunkLargeEnough(Chunk chunk) {
|| getChunkSize(chunk) >= targetChunkSize;
}

private boolean checkIsChunkLargeEnough(ChunkMetadata chunkMetadata, ChunkHeader chunkHeader) {
return chunkMetadata.getStatistics().getCount() >= targetChunkPointNum
|| chunkHeader.getSerializedSize() + chunkHeader.getDataSize() >= targetChunkSize;
}

private boolean checkIsAlignedPageLargeEnough(
PageHeader timePageHeader, List<PageHeader> valuePageHeaders) {
if (checkIsPageLargeEnough(timePageHeader)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer;

import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;

import java.io.IOException;
import java.nio.ByteBuffer;

public class LazyChunkLoader {

private final TsFileSequenceReader reader;
private final ChunkMetadata chunkMetadata;
private ChunkHeader chunkHeaderCache;

public LazyChunkLoader(TsFileSequenceReader reader, ChunkMetadata chunkMetadata) {
this.reader = reader;
this.chunkMetadata = chunkMetadata;
}

public ChunkHeader loadChunkHeader() throws IOException {
if (chunkHeaderCache != null) {
return chunkHeaderCache;
}
int chunkHeaderSize = ChunkHeader.getSerializedSize(chunkMetadata.getMeasurementUid());
chunkHeaderCache =
reader.readChunkHeader(chunkMetadata.getOffsetOfChunkHeader(), chunkHeaderSize);
return chunkHeaderCache;
}

public Chunk loadChunk() throws IOException {
if (chunkHeaderCache == null) {
return reader.readMemChunk(chunkMetadata);
}
ByteBuffer buffer =
reader.readChunk(
chunkMetadata.getOffsetOfChunkHeader() + chunkHeaderCache.getSerializedSize(),
chunkHeaderCache.getDataSize());
return new Chunk(
chunkHeaderCache,
buffer,
chunkMetadata.getDeleteIntervalList(),
chunkMetadata.getStatistics());
}

public ChunkMetadata getChunkMetadata() {
return chunkMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public boolean flushNonAlignedChunk(Chunk chunk, ChunkMetadata chunkMetadata, in

@Override
public boolean flushAlignedChunk(
Chunk timeChunk,
LazyChunkLoader timeChunkLoader,
IChunkMetadata timeChunkMetadata,
List<Chunk> valueChunks,
List<LazyChunkLoader> valueChunkLoaders,
List<IChunkMetadata> valueChunkMetadatas,
int subTaskId) {
throw new RuntimeException("Does not support this method in ReadPointCrossCompactionWriter");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public boolean flushNonAlignedChunk(Chunk chunk, ChunkMetadata chunkMetadata, in

@Override
public boolean flushAlignedChunk(
Chunk timeChunk,
LazyChunkLoader timeChunkLoader,
IChunkMetadata timeChunkMetadata,
List<Chunk> valueChunks,
List<LazyChunkLoader> valueChunkLoaders,
List<IChunkMetadata> valueChunkMetadatas,
int subTaskId) {
throw new RuntimeException("Does not support this method in ReadPointInnerCompactionWriter");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1267,7 +1267,7 @@ public ChunkHeader readChunkHeader(byte chunkType) throws IOException {
* @param position the file offset of this chunk's header
* @param chunkHeaderSize the size of chunk's header
*/
private ChunkHeader readChunkHeader(long position, int chunkHeaderSize) throws IOException {
public ChunkHeader readChunkHeader(long position, int chunkHeaderSize) throws IOException {
try {
return ChunkHeader.deserializeFrom(tsFileInput, position, chunkHeaderSize);
} catch (Throwable t) {
Expand Down