Skip to content

Commit

Permalink
refactor chunk reader (#680)
Browse files Browse the repository at this point in the history
* optimize PageReader and ChunkReader & refactor BatchData
  • Loading branch information
Jialin Qiao committed Dec 27, 2019
1 parent 777dd00 commit ba76c4b
Show file tree
Hide file tree
Showing 26 changed files with 372 additions and 489 deletions.
21 changes: 21 additions & 0 deletions docs/Development-Chinese.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# 一、工作流程

## 主要链接
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,13 @@ public static void main(String[] args) throws IOException {
System.out
.println("\t\tUncompressed page data size: " + pageHeader.getUncompressedSize());
PageReader reader1 = new PageReader(pageData, header.getDataType(), valueDecoder,
defaultTimeDecoder);
while (reader1.hasNextBatch()) {
BatchData batchData = reader1.nextBatch();
while (batchData.hasCurrent()) {
System.out.println(
"\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
.currentValue());
batchData.next();
}
defaultTimeDecoder, null);
BatchData batchData = reader1.getAllSatisfiedPageData();
while (batchData.hasCurrent()) {
System.out.println(
"\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
.currentValue());
batchData.next();
}
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
Expand Down Expand Up @@ -401,9 +400,9 @@ private int writeRemainingUnseq(IChunkWriter chunkWriter,
private int writeChunkWithUnseq(Chunk chunk, IChunkWriter chunkWriter, IPointReader unseqReader,
long chunkLimitTime, int pathIdx) throws IOException {
int cnt = 0;
AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
while (chunkReader.hasNextBatch()) {
BatchData batchData = chunkReader.nextBatch();
ChunkReader chunkReader = new ChunkReader(chunk, null);
while (chunkReader.hasNextSatisfiedPage()) {
BatchData batchData = chunkReader.nextPageData();
cnt += mergeWriteBatch(batchData, chunkWriter, unseqReader, pathIdx);
}
cnt += writeRemainingUnseq(chunkWriter, unseqReader, chunkLimitTime, pathIdx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;

public class CachedDiskChunkReader implements IPointReader {

private AbstractChunkReader chunkReader;
private ChunkReader chunkReader;
private BatchData data;
private TimeValuePair prev;
private TimeValuePair current;

public CachedDiskChunkReader(AbstractChunkReader chunkReader) {
public CachedDiskChunkReader(ChunkReader chunkReader) {
this.chunkReader = chunkReader;
this.prev =
TimeValuePairUtils.getEmptyTimeValuePair(chunkReader.getChunkHeader().getDataType());
Expand All @@ -44,8 +44,8 @@ public boolean hasNext() throws IOException {
if (data != null && data.hasCurrent()) {
return true;
}
while (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
data = chunkReader.nextPageData();
if (data.hasCurrent()) {
return true;
}
Expand All @@ -60,8 +60,8 @@ public TimeValuePair next() throws IOException {
if (data.hasCurrent()) {
TimeValuePairUtils.setCurrentTimeValuePair(data, current());
} else {
while (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
data = chunkReader.nextPageData();
if (data.hasCurrent()) {
TimeValuePairUtils.setCurrentTimeValuePair(data, current());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;

Expand Down Expand Up @@ -71,7 +70,7 @@ public ChunkReaderWrap(ReadOnlyMemChunk readOnlyMemChunk, Filter filter) {
public IPointReader getIPointReader() throws IOException {
if (type.equals(ChunkReaderType.DISK_CHUNK)) {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
AbstractChunkReader chunkReader = new ChunkReader(chunk, filter);
ChunkReader chunkReader = new ChunkReader(chunk, filter);
return new DiskChunkReader(chunkReader);
} else {
return new MemChunkReader(readOnlyMemChunk, filter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,26 @@

import java.io.IOException;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.resourceRelated.NewUnseqResourceMergeReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;

/**
* To read chunk data on disk, this class implements an interface {@link IPointReader} based on the
* data reader {@link AbstractChunkReader}.
* data reader {@link ChunkReader}.
* <p>
* Note that <code>ChunkReader</code> is an abstract class with three concrete classes, two of which
* are used here: <code>ChunkReaderWithoutFilter</code> and <code>ChunkReaderWithFilter</code>.
* <p>
*/
public class DiskChunkReader implements IPointReader, IBatchReader {

private AbstractChunkReader chunkReader;
private ChunkReader chunkReader;
private BatchData data;

public DiskChunkReader(AbstractChunkReader chunkReader) {
public DiskChunkReader(ChunkReader chunkReader) {
this.chunkReader = chunkReader;
}

Expand All @@ -49,8 +48,8 @@ public boolean hasNext() throws IOException {
if (data != null && data.hasCurrent()) {
return true;
}
while (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
data = chunkReader.nextPageData();
if (data.hasCurrent()) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public Object getValueInTimestamp(long timestamp) throws IOException {
return null;
} else {
chunkReaderByTimestamp.setCurrentTimestamp(timestamp);
if (chunkReaderByTimestamp.hasNextBatch()) {
data = chunkReaderByTimestamp.nextBatch();
if (chunkReaderByTimestamp.hasNextSatisfiedPage()) {
data = chunkReaderByTimestamp.nextPageData();
} else {
return null;
}
Expand All @@ -70,8 +70,8 @@ public boolean hasNext() throws IOException {
if (data != null && data.hasCurrent()) {
return true;
}
if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextBatch()) {
data = chunkReaderByTimestamp.nextBatch();
if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextSatisfiedPage()) {
data = chunkReaderByTimestamp.nextPageData();
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public boolean hasNextBatch() throws IOException {

@Override
public BatchData nextBatch() {
BatchData batchData = new BatchData(dataType, true);
BatchData batchData = new BatchData(dataType);
if (hasCachedTimeValuePair) {
hasCachedTimeValuePair = false;
batchData.putTime(cachedTimeValuePair.getTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.iotdb.db.query.reader.universal.CachedPriorityMergeReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;

public class CachedUnseqResourceMergeReader extends CachedPriorityMergeReader {
Expand All @@ -35,7 +34,7 @@ public CachedUnseqResourceMergeReader(List<Chunk> chunks, TSDataType dataType)
super(dataType);
int priorityValue = 1;
for (Chunk chunk : chunks) {
AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
ChunkReader chunkReader = new ChunkReader(chunk, null);
addReaderWithPriority(new CachedDiskChunkReader(chunkReader), priorityValue++);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,15 @@ public NewUnseqResourceMergeReader(Path seriesPath, TSDataType dataType,
if (tsFileResource.isClosed()) {
// get chunk metadata list of current closed tsfile
currentChunkMetaDataList = DeviceMetaDataCache.getInstance().get(tsFileResource, seriesPath);

// get modifications and apply to chunk metadatas
List<Modification> pathModifications = context
.getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
if (!pathModifications.isEmpty()) {
QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
}
} else {
// metadata list of already flushed chunk groups
// metadata list of already flushed chunks in unsealed file, already applied modifications
currentChunkMetaDataList = tsFileResource.getChunkMetaDataList();
}

Expand Down Expand Up @@ -143,7 +145,7 @@ public boolean hasNextBatch() throws IOException {
return true;
}

batchData = new BatchData(dataType, true);
batchData = new BatchData(dataType);

for (int rowCount = 0; rowCount < batchSize; rowCount++) {
if (priorityMergeReader.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,27 +132,6 @@ public boolean constructNextReader(int idx) throws IOException {
}
}

/**
* Returns true if the start and end time of the series data in this sequence TsFile do not
* satisfy the filter condition. Returns false if satisfy.
* <p>
* This method is used to in <code>constructNextReader</code> to check whether this TsFile can be
* skipped.
*
* @param tsFile the TsFileResource corresponding to this TsFile
* @param filter filter condition. Null if no filter.
* @return True if the TsFile's start and end time do not satisfy the filter condition; False if
* satisfy.
*/
private boolean isTsFileNotSatisfied(TsFileResource tsFile, Filter filter) {
if (filter == null) {
return false;
}
long startTime = tsFile.getStartTimeMap().get(seriesPath.getDevice());
long endTime = tsFile.getEndTimeMap().get(seriesPath.getDevice());
return !filter.satisfyStartEndTime(startTime, endTime);
}

private IAggregateReader initSealedTsFileReader(TsFileResource sealedTsFile, Filter filter,
QueryContext context) throws IOException {
// prepare metaDataList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public BatchData nextBatch() throws IOException {
if (hasNextInSeq() && hasNextInUnSeq()) {
// if the count reaches batch data size
int count = 0;
BatchData batchData = new BatchData(seqBatchData.getDataType(), true);
BatchData batchData = new BatchData(seqBatchData.getDataType());
while (count < batchSize && hasNextInSeq() && hasNextInUnSeq()) {
long timeInSeq = seqBatchData.currentTime();
long timeInUnseq = unseqBatchData.currentTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -101,10 +100,10 @@ public static long collectFileSizes(List<TsFileResource> seqFiles, List<TsFileRe
}

public static int writeChunkWithoutUnseq(Chunk chunk, IChunkWriter chunkWriter) throws IOException {
AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
ChunkReader chunkReader = new ChunkReader(chunk, null);
int ptWritten = 0;
while (chunkReader.hasNextBatch()) {
BatchData batchData = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
BatchData batchData = chunkReader.nextPageData();
for (int i = 0; i < batchData.length(); i++) {
writeBatchPoint(batchData, i, chunkWriter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,27 @@
import static org.junit.Assert.assertFalse;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.junit.After;
Expand Down Expand Up @@ -71,6 +77,43 @@ public void tearDown() throws Exception {
}


@Test
public void testUnseqUnsealedDelete() throws QueryProcessException, IOException {
TSRecord record = new TSRecord(10000, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
processor.insert(new InsertPlan(record));
processor.waitForAllCurrentTsFileProcessorsClosed();


for (int j = 1; j <= 10; j++) {
record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
}

processor.getWorkUnSequenceTsFileProcessor().syncFlush();

for (int j = 11; j <= 20; j++) {
record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
}

processor.delete(deviceId, measurementId, 15L);

Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor.getWorkUnSequenceTsFileProcessor()
.query(deviceId, measurementId, TSDataType.INT32, Collections.emptyMap(), new QueryContext());

List<TimeValuePair> timeValuePairs = pair.left.getSortedTimeValuePairList();

long time = 16;
for (TimeValuePair timeValuePair : timeValuePairs) {
Assert.assertEquals(time++, timeValuePair.getTimestamp());
}

Assert.assertEquals(0, pair.right.size());
}

@Test
public void testSequenceSyncClose() throws QueryProcessException {
for (int j = 1; j <= 10; j++) {
Expand Down

0 comments on commit ba76c4b

Please sign in to comment.