diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java index 9eba9fbc2d815..ddf11d5b1420d 100644 --- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java @@ -85,15 +85,13 @@ public static void main(String[] args) throws IOException { System.out .println("\t\tUncompressed page data size: " + pageHeader.getUncompressedSize()); PageReader reader1 = new PageReader(pageData, header.getDataType(), valueDecoder, - defaultTimeDecoder); - while (reader1.hasNextBatch()) { - BatchData batchData = reader1.nextBatch(); - while (batchData.hasCurrent()) { - System.out.println( - "\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData - .currentValue()); - batchData.next(); - } + defaultTimeDecoder, null); + BatchData batchData = reader1.getAllSatisfiedPageData(); + while (batchData.hasCurrent()) { + System.out.println( + "\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData + .currentValue()); + batchData.next(); } } break; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java index cecf6eee695dc..87f29c5cf2181 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java @@ -49,7 +49,6 @@ import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.common.Path; -import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -401,9 +400,9 @@ private int writeRemainingUnseq(IChunkWriter chunkWriter, private int writeChunkWithUnseq(Chunk chunk, IChunkWriter chunkWriter, IPointReader unseqReader, long chunkLimitTime, int pathIdx) throws IOException { int cnt = 0; - AbstractChunkReader chunkReader = new ChunkReader(chunk, null); - while (chunkReader.hasNextBatch()) { - BatchData batchData = chunkReader.nextBatch(); + ChunkReader chunkReader = new ChunkReader(chunk, null); + while (chunkReader.hasNextSatisfiedPage()) { + BatchData batchData = chunkReader.nextPageData(); cnt += mergeWriteBatch(batchData, chunkWriter, unseqReader, pathIdx); } cnt += writeRemainingUnseq(chunkWriter, unseqReader, chunkLimitTime, pathIdx); diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java index bdca2e55912bf..822815d30b4cd 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java @@ -24,16 +24,16 @@ import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.db.utils.TimeValuePairUtils; import org.apache.iotdb.tsfile.read.common.BatchData; -import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; public class CachedDiskChunkReader implements IPointReader { - private AbstractChunkReader chunkReader; + private ChunkReader chunkReader; private BatchData data; private TimeValuePair prev; private TimeValuePair current; - public CachedDiskChunkReader(AbstractChunkReader chunkReader) { + public CachedDiskChunkReader(ChunkReader chunkReader) { this.chunkReader = chunkReader; this.prev = TimeValuePairUtils.getEmptyTimeValuePair(chunkReader.getChunkHeader().getDataType()); @@ -44,8 +44,8 @@ public boolean hasNext() throws IOException { if (data != null && data.hasCurrent()) { return true; } - while (chunkReader.hasNextBatch()) { - data = chunkReader.nextBatch(); + while (chunkReader.hasNextSatisfiedPage()) { + data = chunkReader.nextPageData(); if (data.hasCurrent()) { return true; } @@ -60,8 +60,8 @@ public TimeValuePair next() throws IOException { if (data.hasCurrent()) { TimeValuePairUtils.setCurrentTimeValuePair(data, current()); } else { - while (chunkReader.hasNextBatch()) { - data = chunkReader.nextBatch(); + while (chunkReader.hasNextSatisfiedPage()) { + data = chunkReader.nextPageData(); if (data.hasCurrent()) { TimeValuePairUtils.setCurrentTimeValuePair(data, current()); break; diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java index 2bb2bb6e9f96c..4dd57bc4b611e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java @@ -26,7 +26,6 @@ import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.controller.IChunkLoader; import org.apache.iotdb.tsfile.read.filter.basic.Filter; -import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp; @@ -71,7 +70,7 @@ public ChunkReaderWrap(ReadOnlyMemChunk readOnlyMemChunk, Filter filter) { public IPointReader getIPointReader() throws IOException { if (type.equals(ChunkReaderType.DISK_CHUNK)) { Chunk chunk = chunkLoader.getChunk(chunkMetaData); - AbstractChunkReader chunkReader = new ChunkReader(chunk, filter); + ChunkReader chunkReader = new ChunkReader(chunk, filter); return new DiskChunkReader(chunkReader); } else { return new MemChunkReader(readOnlyMemChunk, filter); diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java index 1e0064284ca0d..240867e3d1b88 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java @@ -20,16 +20,15 @@ import java.io.IOException; import org.apache.iotdb.db.query.reader.IPointReader; -import org.apache.iotdb.db.query.reader.resourceRelated.NewUnseqResourceMergeReader; import org.apache.iotdb.db.utils.TimeValuePair; import org.apache.iotdb.db.utils.TimeValuePairUtils; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.reader.IBatchReader; -import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; /** * To read chunk data on disk, this class implements an interface {@link IPointReader} based on the - * data reader {@link AbstractChunkReader}. + * data reader {@link ChunkReader}. *

* Note that ChunkReader is an abstract class with three concrete classes, two of which * are used here: ChunkReaderWithoutFilter and ChunkReaderWithFilter. @@ -37,10 +36,10 @@ */ public class DiskChunkReader implements IPointReader, IBatchReader { - private AbstractChunkReader chunkReader; + private ChunkReader chunkReader; private BatchData data; - public DiskChunkReader(AbstractChunkReader chunkReader) { + public DiskChunkReader(ChunkReader chunkReader) { this.chunkReader = chunkReader; } @@ -49,8 +48,8 @@ public boolean hasNext() throws IOException { if (data != null && data.hasCurrent()) { return true; } - while (chunkReader.hasNextBatch()) { - data = chunkReader.nextBatch(); + while (chunkReader.hasNextSatisfiedPage()) { + data = chunkReader.nextPageData(); if (data.hasCurrent()) { return true; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java index 81d98661127cc..521a4db9f16c3 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java @@ -54,8 +54,8 @@ public Object getValueInTimestamp(long timestamp) throws IOException { return null; } else { chunkReaderByTimestamp.setCurrentTimestamp(timestamp); - if (chunkReaderByTimestamp.hasNextBatch()) { - data = chunkReaderByTimestamp.nextBatch(); + if (chunkReaderByTimestamp.hasNextSatisfiedPage()) { + data = chunkReaderByTimestamp.nextPageData(); } else { return null; } @@ -70,8 +70,8 @@ public boolean hasNext() throws IOException { if (data != null && data.hasCurrent()) { return true; } - if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextBatch()) { - data = chunkReaderByTimestamp.nextBatch(); + if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextSatisfiedPage()) { + data = chunkReaderByTimestamp.nextPageData(); return true; } return false; diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java index 5c7aa47cc6e78..9d596d0a83fa4 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java @@ -92,7 +92,7 @@ public boolean hasNextBatch() throws IOException { @Override public BatchData nextBatch() { - BatchData batchData = new BatchData(dataType, true); + BatchData batchData = new BatchData(dataType); if (hasCachedTimeValuePair) { hasCachedTimeValuePair = false; batchData.putTime(cachedTimeValuePair.getTimestamp()); diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java index ee0fdd8ee6a66..33c3462da762a 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java @@ -25,7 +25,6 @@ import org.apache.iotdb.db.query.reader.universal.CachedPriorityMergeReader; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Chunk; -import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; public class CachedUnseqResourceMergeReader extends CachedPriorityMergeReader { @@ -35,7 +34,7 @@ public CachedUnseqResourceMergeReader(List chunks, TSDataType dataType) super(dataType); int priorityValue = 1; for (Chunk chunk : chunks) { - AbstractChunkReader chunkReader = new ChunkReader(chunk, null); + ChunkReader chunkReader = new ChunkReader(chunk, null); addReaderWithPriority(new CachedDiskChunkReader(chunkReader), priorityValue++); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java index 61e876355d25a..d527272f194df 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java @@ -88,13 +88,15 @@ public NewUnseqResourceMergeReader(Path seriesPath, TSDataType dataType, if (tsFileResource.isClosed()) { // get chunk metadata list of current closed tsfile currentChunkMetaDataList = DeviceMetaDataCache.getInstance().get(tsFileResource, seriesPath); + + // get modifications and apply to chunk metadatas List pathModifications = context .getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath()); if (!pathModifications.isEmpty()) { QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications); } } else { - // metadata list of already flushed chunk groups + // metadata list of already flushed chunks in unsealed file, already applied modifications currentChunkMetaDataList = tsFileResource.getChunkMetaDataList(); } @@ -143,7 +145,7 @@ public boolean hasNextBatch() throws IOException { return true; } - batchData = new BatchData(dataType, true); + batchData = new BatchData(dataType); for (int rowCount = 0; rowCount < batchSize; rowCount++) { if (priorityMergeReader.hasNext()) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/SeqResourceIterateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/SeqResourceIterateReader.java index 1226331023bc1..e46a37acf1529 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/SeqResourceIterateReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/SeqResourceIterateReader.java @@ -132,27 +132,6 @@ public boolean constructNextReader(int idx) throws IOException { } } - /** - * Returns true if the start and end time of the series data in this sequence TsFile do not - * satisfy the filter condition. Returns false if satisfy. - *

- * This method is used to in constructNextReader to check whether this TsFile can be - * skipped. - * - * @param tsFile the TsFileResource corresponding to this TsFile - * @param filter filter condition. Null if no filter. - * @return True if the TsFile's start and end time do not satisfy the filter condition; False if - * satisfy. - */ - private boolean isTsFileNotSatisfied(TsFileResource tsFile, Filter filter) { - if (filter == null) { - return false; - } - long startTime = tsFile.getStartTimeMap().get(seriesPath.getDevice()); - long endTime = tsFile.getEndTimeMap().get(seriesPath.getDevice()); - return !filter.satisfyStartEndTime(startTime, endTime); - } - private IAggregateReader initSealedTsFileReader(TsFileResource sealedTsFile, Filter filter, QueryContext context) throws IOException { // prepare metaDataList diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java index ed71dc8c49ada..eb8503fc2f021 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java @@ -133,7 +133,7 @@ public BatchData nextBatch() throws IOException { if (hasNextInSeq() && hasNextInUnSeq()) { // if the count reaches batch data size int count = 0; - BatchData batchData = new BatchData(seqBatchData.getDataType(), true); + BatchData batchData = new BatchData(seqBatchData.getDataType()); while (count < batchSize && hasNextInSeq() && hasNextInUnSeq()) { long timeInSeq = seqBatchData.currentTime(); long timeInUnseq = unseqBatchData.currentTime(); diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java index a0d125375fa19..eb065d5efd008 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java @@ -37,7 +37,6 @@ import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.common.Path; -import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; import org.slf4j.Logger; @@ -101,10 +100,10 @@ public static long collectFileSizes(List seqFiles, List> pair = processor.getWorkUnSequenceTsFileProcessor() + .query(deviceId, measurementId, TSDataType.INT32, Collections.emptyMap(), new QueryContext()); + + List timeValuePairs = pair.left.getSortedTimeValuePairList(); + + long time = 16; + for (TimeValuePair timeValuePair : timeValuePairs) { + Assert.assertEquals(time++, timeValuePair.getTimestamp()); + } + + Assert.assertEquals(0, pair.right.size()); + } + @Test public void testSequenceSyncClose() throws QueryProcessException { for (int j = 1; j <= 10; j++) { diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryTest.java new file mode 100644 index 0000000000000..e450bc900cb5f --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryTest.java @@ -0,0 +1,74 @@ +package org.apache.iotdb.db.integration; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import org.apache.iotdb.db.engine.version.SimpleFileVersionController; +import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.jdbc.Config; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class IoTDBSimpleQueryTest { + private IoTDB deamon; + + @Before + public void setUp() throws Exception { + deamon = IoTDB.getInstance(); + deamon.active(); + EnvironmentUtils.envSetUp(); + } + + @After + public void tearDown() throws Exception { + deamon.stop(); + EnvironmentUtils.cleanEnv(); + } + + @Test + public void testUnseqUnsealedDeleteQuery() throws SQLException, ClassNotFoundException { + Class.forName(Config.JDBC_DRIVER_NAME); + try(Connection connection = DriverManager + .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", + "root", "root"); + Statement statement = connection.createStatement()){ + statement.execute("SET STORAGE GROUP TO root.sg1"); + statement.execute("CREATE TIMESERIES root.sg1.d0.s0 WITH DATATYPE=INT32,ENCODING=PLAIN"); + + // seq data + statement.execute("INSERT INTO root.sg1.d0(timestamp, s0) VALUES (1000, 1)"); + statement.execute("flush"); + + for (int i = 1; i <= 10; i++) { + statement.execute( + String.format("INSERT INTO root.sg1.d0(timestamp, s0) VALUES (%d, %d)", i, i)); + } + + statement.execute("flush"); + + // unseq data + for (int i = 11; i <= 20; i++) { + statement.execute( + String.format("INSERT INTO root.sg1.d0(timestamp, s0) VALUES (%d, %d)", i, i)); + } + + statement.execute("delete from root.sg1.d0.s0 where time <= 15"); + + ResultSet resultSet = statement.executeQuery("select * from root"); + + long count = 0; + + while(resultSet.next()) { + count++; + } + + System.out.println(count); + + } + } + +} diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/FakedIBatchPoint.java b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/FakedIBatchPoint.java index 54fe52b46e8f8..7ab6a35749017 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/FakedIBatchPoint.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/FakedIBatchPoint.java @@ -90,7 +90,7 @@ private void constructBatchData() { if (!hasEmptyBatch) { num += 1; } - batchData = new BatchData(TSDataType.INT64, true); + batchData = new BatchData(TSDataType.INT64); while (num > 0 && iterator.hasNext()) { TimeValuePair timeValuePair = iterator.next(); batchData.putTime(timeValuePair.getTimestamp()); diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java index 45fa54c8309c2..8b88e8feba712 100644 --- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java +++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java @@ -46,7 +46,6 @@ import org.apache.iotdb.tsfile.read.controller.IChunkLoader; import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier; import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl; -import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; import org.apache.iotdb.tsfile.write.TsFileWriter; import org.apache.iotdb.tsfile.write.record.TSRecord; @@ -165,7 +164,7 @@ public void test() throws StorageGroupProcessorException, IOException { int priorityValue = 1; for (ChunkMetaData chunkMetaData : metadataQuerier.getChunkMetaDataList(path)) { Chunk chunk = chunkLoader.getChunk(chunkMetaData); - AbstractChunkReader chunkReader = new ChunkReader(chunk, null); + ChunkReader chunkReader = new ChunkReader(chunk, null); unSeqMergeReader .addReaderWithPriority(new DiskChunkReader(chunkReader), priorityValue); priorityValue++; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java index 811f8037ed071..43013ae9be88c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java @@ -32,7 +32,12 @@ * BatchData is a self-defined data structure which is optimized for different type of * values. This class can be viewed as a collection which is more efficient than ArrayList. * - * We don't return empty batch data. If you get a batch data, you can iterate the data as the following codes: + * This class records a time list and a value list, which could be replaced by TVList in the future + * + * When you use BatchData in query process, it does not contain duplicated timestamps. The batch data + * may be empty. + * + * If you get a batch data, you can iterate the data as the following codes: * * while (batchData.hasCurrent()) { * long time = batchData.currentTime(); @@ -45,7 +50,6 @@ public class BatchData implements Serializable { private static final long serialVersionUID = -4620310601188394839L; private int timeCapacity = 16; private int valueCapacity = 16; - private int emptyTimeCapacity = 1; private int capacityThreshold = 1024; private TSDataType dataType; @@ -78,7 +82,6 @@ public class BatchData implements Serializable { private int valueLength; private ArrayList timeRet; - private ArrayList emptyTimeRet; private ArrayList booleanRet; private ArrayList intRet; private ArrayList longRet; @@ -90,22 +93,13 @@ public BatchData() { dataType = null; } - public BatchData(TSDataType type) { - dataType = type; - } - /** * BatchData Constructor. * * @param type Data type to record for this BatchData - * @param recordTime whether to record time value for this BatchData */ - public BatchData(TSDataType type, boolean recordTime) { - init(type, recordTime, false); - } - - public BatchData(TSDataType type, boolean recordTime, boolean hasEmptyTime) { - init(type, recordTime, hasEmptyTime); + public BatchData(TSDataType type) { + init(type); } public boolean isEmpty() { @@ -176,10 +170,8 @@ public TSDataType getDataType() { * initialize batch data. * * @param type TSDataType - * @param recordTime if record time - * @param hasEmptyTime if has empty time */ - public void init(TSDataType type, boolean recordTime, boolean hasEmptyTime) { + public void init(TSDataType type) { this.dataType = type; this.valueArrayIdx = 0; this.curValueIdx = 0; @@ -187,18 +179,11 @@ public void init(TSDataType type, boolean recordTime, boolean hasEmptyTime) { this.curIdx = 0; capacityThreshold = TSFileConfig.DYNAMIC_DATA_SIZE; - if (recordTime) { - timeRet = new ArrayList<>(); - timeRet.add(new long[timeCapacity]); - timeArrayIdx = 0; - curTimeIdx = 0; - count = 0; - } - - if (hasEmptyTime) { - emptyTimeRet = new ArrayList<>(); - emptyTimeRet.add(new long[emptyTimeCapacity]); - } + timeRet = new ArrayList<>(); + timeRet.add(new long[timeCapacity]); + timeArrayIdx = 0; + curTimeIdx = 0; + count = 0; switch (dataType) { case BOOLEAN: @@ -422,13 +407,6 @@ private void rangeCheckForTime(int idx) { } } - private void rangeCheckForEmptyTime(int idx) { - if (idx < 0) { - throw new IndexOutOfBoundsException( - "BatchData empty time range check, Index is negative: " + idx); - } - } - public boolean getBoolean() { rangeCheck(curIdx); return this.booleanRet.get(curIdx / timeCapacity)[curIdx % timeCapacity]; @@ -493,24 +471,6 @@ public void setTime(int idx, long v) { this.timeRet.get(idx / timeCapacity)[idx % timeCapacity] = v; } - public long getEmptyTime(int idx) { - rangeCheckForEmptyTime(idx); - return this.emptyTimeRet.get(idx / emptyTimeCapacity)[idx % emptyTimeCapacity]; - } - - /** - * get time as array in long[] structure. - * - * @return time array - */ - public long[] getTimeAsArray() { - long[] res = new long[count]; - for (int i = 0; i < count; i++) { - res[i] = timeRet.get(i / timeCapacity)[i % timeCapacity]; - } - return res; - } - /** * put an object. * @@ -541,45 +501,10 @@ public void putAnObject(Object v) { } } - /** - * set an object. - * - * @param idx object id - * @param v object value - */ - public void setAnObject(int idx, Comparable v) { - switch (dataType) { - case BOOLEAN: - setBoolean(idx, (Boolean) v); - break; - case DOUBLE: - setDouble(idx, (Double) v); - break; - case TEXT: - setBinary(idx, (Binary) v); - break; - case FLOAT: - setFloat(idx, (Float) v); - break; - case INT32: - setInt(idx, (Integer) v); - break; - case INT64: - setLong(idx, (Long) v); - break; - default: - throw new UnSupportedDataTypeException(String.valueOf(dataType)); - } - } - public int length() { return this.count; } - public int getCurIdx() { - return curIdx; - } - public long getTimeByIndex(int idx) { rangeCheckForTime(idx); return this.timeRet.get(idx / timeCapacity)[idx % timeCapacity]; @@ -615,25 +540,6 @@ public boolean getBooleanByIndex(int idx) { return booleanRet.get(idx / timeCapacity)[idx % timeCapacity]; } - public Object getValueByIndex(int idx) { - switch (dataType) { - case INT32: - return getIntByIndex(idx); - case INT64: - return getLongByIndex(idx); - case FLOAT: - return getFloatByIndex(idx); - case DOUBLE: - return getDoubleByIndex(idx); - case BOOLEAN: - return getBooleanByIndex(idx); - case TEXT: - return getBinaryByIndex(idx); - default: - return null; - } - } - public Object getValueInTimestamp(long time) { while (hasCurrent()) { if (currentTime() < time) { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java deleted file mode 100644 index 1758ec748af70..0000000000000 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.tsfile.read.reader.chunk; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; -import org.apache.iotdb.tsfile.compress.IUnCompressor; -import org.apache.iotdb.tsfile.encoding.common.EndianType; -import org.apache.iotdb.tsfile.encoding.decoder.Decoder; -import org.apache.iotdb.tsfile.file.header.ChunkHeader; -import org.apache.iotdb.tsfile.file.header.PageHeader; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.read.common.BatchData; -import org.apache.iotdb.tsfile.read.common.Chunk; -import org.apache.iotdb.tsfile.read.filter.basic.Filter; -import org.apache.iotdb.tsfile.read.reader.IBatchReader; -import org.apache.iotdb.tsfile.read.reader.page.PageReader; - -public abstract class AbstractChunkReader implements IBatchReader { - - private ChunkHeader chunkHeader; - private ByteBuffer chunkDataBuffer; - - private IUnCompressor unCompressor; - private Decoder valueDecoder; - private Decoder timeDecoder = Decoder.getDecoderByType( - TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), - TSDataType.INT64); - - protected Filter filter; - - private BatchData data; - - private PageHeader pageHeader; - private boolean hasCachedPageHeader; - - /** - * Data whose timestamp <= deletedAt should be considered deleted(not be returned). - */ - protected long deletedAt; - - /** - * constructor of ChunkReader. - * - * @param chunk input Chunk object - * @param filter filter - */ - public AbstractChunkReader(Chunk chunk, Filter filter) { - this.filter = filter; - this.chunkDataBuffer = chunk.getData(); - this.deletedAt = chunk.getDeletedAt(); - EndianType endianType = chunk.getEndianType(); - chunkHeader = chunk.getHeader(); - this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()); - valueDecoder = Decoder - .getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType()); - valueDecoder.setEndianType(endianType); - data = new BatchData(chunkHeader.getDataType()); - hasCachedPageHeader = false; - } - - /** - * judge if has nextBatch. - */ - public boolean hasNextBatch() { - if (hasCachedPageHeader) { - return true; - } - // construct next satisfied page header - while (chunkDataBuffer.remaining() > 0) { - // deserialize a PageHeader from chunkDataBuffer - pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType()); - - // if the current page satisfies - if (pageSatisfied(pageHeader)) { - hasCachedPageHeader = true; - return true; - } else { - skipBytesInStreamByLength(pageHeader.getCompressedSize()); - } - } - return false; - } - - /** - * get next data batch. - * - * @return next data batch - * @throws IOException IOException - */ - public BatchData nextBatch() throws IOException { - PageReader pageReader = constructPageReaderForNextPage(pageHeader.getCompressedSize()); - hasCachedPageHeader = false; - if (pageReader.hasNextBatch()) { - data = pageReader.nextBatch(); - return data; - } - return data; - } - - - public PageHeader nextPageHeader() { - return pageHeader; - } - - public void skipPageData() { - skipBytesInStreamByLength(pageHeader.getCompressedSize()); - hasCachedPageHeader = false; - } - - private void skipBytesInStreamByLength(long length) { - chunkDataBuffer.position(chunkDataBuffer.position() + (int) length); - } - - public abstract boolean pageSatisfied(PageHeader pageHeader); - - private PageReader constructPageReaderForNextPage(int compressedPageBodyLength) - throws IOException { - byte[] compressedPageBody = new byte[compressedPageBodyLength]; - - // already in memory - if (compressedPageBodyLength > chunkDataBuffer.remaining()) { - throw new IOException( - "unexpected byte read length when read compressedPageBody. Expected:" - + Arrays.toString(compressedPageBody) + ". Actual:" + chunkDataBuffer - .remaining()); - } - chunkDataBuffer.get(compressedPageBody, 0, compressedPageBodyLength); - valueDecoder.reset(); - ByteBuffer pageData = ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody)); - PageReader reader = new PageReader(pageData, chunkHeader.getDataType(), - valueDecoder, timeDecoder, filter); - reader.setDeletedAt(deletedAt); - return reader; - } - - public void close() { - } - - public ChunkHeader getChunkHeader() { - return chunkHeader; - } -} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java index cf831fce8eb5d..1d8abdf379bbe 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java @@ -16,28 +16,147 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.tsfile.read.reader.chunk; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.compress.IUnCompressor; +import org.apache.iotdb.tsfile.encoding.common.EndianType; +import org.apache.iotdb.tsfile.encoding.decoder.Decoder; +import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.reader.page.PageReader; + +public class ChunkReader { + + private ChunkHeader chunkHeader; + private ByteBuffer chunkDataBuffer; + + private IUnCompressor unCompressor; + private Decoder valueDecoder; + private Decoder timeDecoder = Decoder.getDecoderByType( + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), + TSDataType.INT64); -public class ChunkReader extends AbstractChunkReader { + protected Filter filter; + private PageHeader pageHeader; + private boolean hasCachedPageHeader; + + /** + * Data whose timestamp <= deletedAt should be considered deleted(not be returned). + */ + protected long deletedAt; + + /** + * constructor of ChunkReader. + * + * @param chunk input Chunk object + * @param filter filter + */ public ChunkReader(Chunk chunk, Filter filter) { - super(chunk, filter); + this.filter = filter; + this.chunkDataBuffer = chunk.getData(); + this.deletedAt = chunk.getDeletedAt(); + EndianType endianType = chunk.getEndianType(); + chunkHeader = chunk.getHeader(); + this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()); + valueDecoder = Decoder + .getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType()); + valueDecoder.setEndianType(endianType); + hasCachedPageHeader = false; + } + + /** + * judge if has next page whose page header satisfies the filter. + */ + public boolean hasNextSatisfiedPage() { + if (hasCachedPageHeader) { + return true; + } + // construct next satisfied page header + while (chunkDataBuffer.remaining() > 0) { + // deserialize a PageHeader from chunkDataBuffer + pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType()); + + // if the current page satisfies + if (pageSatisfied(pageHeader)) { + hasCachedPageHeader = true; + return true; + } else { + skipBytesInStreamByLength(pageHeader.getCompressedSize()); + } + } + return false; + } + + /** + * get next data batch. + * + * @return next data batch + * @throws IOException IOException + */ + public BatchData nextPageData() throws IOException { + if(hasCachedPageHeader || hasNextSatisfiedPage()) { + PageReader pageReader = constructPageReaderForNextPage(pageHeader); + hasCachedPageHeader = false; + return pageReader.getAllSatisfiedPageData(); + } else { + throw new IOException("no next page data"); + } + } + + public PageHeader nextPageHeader() { + return pageHeader; + } + + public void skipPageData() { + skipBytesInStreamByLength(pageHeader.getCompressedSize()); + hasCachedPageHeader = false; + } + + private void skipBytesInStreamByLength(long length) { + chunkDataBuffer.position(chunkDataBuffer.position() + (int) length); } - @Override public boolean pageSatisfied(PageHeader pageHeader) { - if (pageHeader.getEndTime() < deletedAt) { + if (pageHeader.getEndTime() <= deletedAt) { return false; } - if (filter == null ) { - return true; - } else { - return filter.satisfy(pageHeader.getStatistics()); + return filter == null || filter.satisfy(pageHeader.getStatistics()); + } + + private PageReader constructPageReaderForNextPage(PageHeader pageHeader) + throws IOException { + int compressedPageBodyLength = pageHeader.getCompressedSize(); + byte[] compressedPageBody = new byte[compressedPageBodyLength]; + + // doesn't has a complete page body + if (compressedPageBodyLength > chunkDataBuffer.remaining()) { + throw new IOException("do not has a complete page body. Expected:" + compressedPageBodyLength + + ". Actual:" + chunkDataBuffer.remaining()); } + + chunkDataBuffer.get(compressedPageBody); + valueDecoder.reset(); + ByteBuffer pageData = ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody)); + PageReader reader = new PageReader(pageData, chunkHeader.getDataType(), + valueDecoder, timeDecoder, filter); + reader.setDeletedAt(deletedAt); + return reader; } + public void close() { + } + + public ChunkHeader getChunkHeader() { + return chunkHeader; + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java index 1630a632a6bf3..1f487d0e42c4e 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java @@ -21,7 +21,7 @@ import org.apache.iotdb.tsfile.file.header.PageHeader; import org.apache.iotdb.tsfile.read.common.Chunk; -public class ChunkReaderByTimestamp extends AbstractChunkReader { +public class ChunkReaderByTimestamp extends ChunkReader { private long currentTimestamp; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java index 7d113e9303d8f..9bf75aa3c2578 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java @@ -25,7 +25,9 @@ import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; @@ -45,32 +47,24 @@ public class PageReader { /** value column in memory */ private ByteBuffer valueBuffer; - private BatchData data = null; - - private Filter filter = null; + private Filter filter; + /** Data whose timestamp <= deletedAt should be considered deleted(not be returned). */ private long deletedAt = Long.MIN_VALUE; public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder, - Decoder timeDecoder, - Filter filter) { - this(pageData, dataType, valueDecoder, timeDecoder); - this.filter = filter; - } - - public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder, - Decoder timeDecoder) { + Decoder timeDecoder, Filter filter) { this.dataType = dataType; this.valueDecoder = valueDecoder; this.timeDecoder = timeDecoder; + this.filter = filter; splitDataToTimeStampAndValue(pageData); } /** * split pageContent into two stream: time and value * - * @param pageData - * uncompressed bytes size of time column, time column, value column + * @param pageData uncompressed bytes size of time column, time column, value column */ private void splitDataToTimeStampAndValue(ByteBuffer pageData) { int timeBufferLength = ReadWriteForEncodingUtils.readUnsignedVarInt(pageData); @@ -82,30 +76,12 @@ private void splitDataToTimeStampAndValue(ByteBuffer pageData) { valueBuffer.position(timeBufferLength); } - public boolean hasNextBatch() throws IOException { - return timeDecoder.hasNext(timeBuffer); - } - /** - * may return an empty BatchData + * @return the returned BatchData may be empty, but never be null */ - public BatchData nextBatch() throws IOException { - if (filter == null) { - data = getAllPageData(); - } else { - data = getAllPageDataWithFilter(); - } - - return data; - } - - public BatchData currentBatch() { - return data; - } - - private BatchData getAllPageData() throws IOException { + public BatchData getAllSatisfiedPageData() throws IOException { - BatchData pageData = new BatchData(dataType, true); + BatchData pageData = new BatchData(dataType); while (timeDecoder.hasNext(timeBuffer)) { long timestamp = timeDecoder.readLong(timeBuffer); @@ -113,42 +89,42 @@ private BatchData getAllPageData() throws IOException { switch (dataType) { case BOOLEAN: boolean aBoolean = valueDecoder.readBoolean(valueBuffer); - if (timestamp > deletedAt) { + if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aBoolean))) { pageData.putTime(timestamp); pageData.putBoolean(aBoolean); } break; case INT32: int anInt = valueDecoder.readInt(valueBuffer); - if (timestamp > deletedAt) { + if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, anInt))) { pageData.putTime(timestamp); pageData.putInt(anInt); } break; case INT64: long aLong = valueDecoder.readLong(valueBuffer); - if (timestamp > deletedAt) { + if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aLong))) { pageData.putTime(timestamp); pageData.putLong(aLong); } break; case FLOAT: float aFloat = valueDecoder.readFloat(valueBuffer); - if (timestamp > deletedAt) { + if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aFloat))) { pageData.putTime(timestamp); pageData.putFloat(aFloat); } break; case DOUBLE: double aDouble = valueDecoder.readDouble(valueBuffer); - if (timestamp > deletedAt) { + if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aDouble))) { pageData.putTime(timestamp); pageData.putDouble(aDouble); } break; case TEXT: Binary aBinary = valueDecoder.readBinary(valueBuffer); - if (timestamp > deletedAt) { + if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aBinary))) { pageData.putTime(timestamp); pageData.putBinary(aBinary); } @@ -160,86 +136,6 @@ private BatchData getAllPageData() throws IOException { return pageData; } - private BatchData getAllPageDataWithFilter() throws IOException { - BatchData pageData = new BatchData(dataType, true); - - while (timeDecoder.hasNext(timeBuffer)) { - long timestamp = timeDecoder.readLong(timeBuffer); - - switch (dataType) { - case BOOLEAN: - readBoolean(pageData, timestamp); - break; - case INT32: - readInt(pageData, timestamp); - break; - case INT64: - readLong(pageData, timestamp); - break; - case FLOAT: - readFloat(pageData, timestamp); - break; - case DOUBLE: - readDouble(pageData, timestamp); - break; - case TEXT: - readText(pageData, timestamp); - break; - default: - throw new UnSupportedDataTypeException(String.valueOf(dataType)); - } - } - - return pageData; - } - - private void readBoolean(BatchData pageData, long timestamp) { - boolean aBoolean = valueDecoder.readBoolean(valueBuffer); - if (timestamp > deletedAt && filter.satisfy(timestamp, aBoolean)) { - pageData.putTime(timestamp); - pageData.putBoolean(aBoolean); - } - } - - private void readInt(BatchData pageData, long timestamp) { - int anInt = valueDecoder.readInt(valueBuffer); - if (timestamp > deletedAt && filter.satisfy(timestamp, anInt)) { - pageData.putTime(timestamp); - pageData.putInt(anInt); - } - } - - private void readLong(BatchData pageData, long timestamp) { - long aLong = valueDecoder.readLong(valueBuffer); - if (timestamp > deletedAt && filter.satisfy(timestamp, aLong)) { - pageData.putTime(timestamp); - pageData.putLong(aLong); - } - } - - private void readFloat(BatchData pageData, long timestamp) { - float aFloat = valueDecoder.readFloat(valueBuffer); - if (timestamp > deletedAt && filter.satisfy(timestamp, aFloat)) { - pageData.putTime(timestamp); - pageData.putFloat(aFloat); - } - } - - private void readDouble(BatchData pageData, long timestamp) { - double aDouble = valueDecoder.readDouble(valueBuffer); - if (timestamp > deletedAt && filter.satisfy(timestamp, aDouble)) { - pageData.putTime(timestamp); - pageData.putDouble(aDouble); - } - } - - private void readText(BatchData pageData, long timestamp) { - Binary aBinary = valueDecoder.readBinary(valueBuffer); - if (timestamp > deletedAt && filter.satisfy(timestamp, aBinary)) { - pageData.putTime(timestamp); - pageData.putBinary(aBinary); - } - } public void close() { timeBuffer = null; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java index 069c4352aa1ad..14bb4be34dce4 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java @@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.read.controller.IChunkLoader; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.IAggregateReader; -import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; import java.io.IOException; import java.util.List; @@ -37,7 +37,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader { protected IChunkLoader chunkLoader; protected List chunkMetaDataList; - protected AbstractChunkReader chunkReader; + protected ChunkReader chunkReader; private int chunkToRead; private BatchData data; @@ -63,7 +63,7 @@ public AbstractFileSeriesReader(IChunkLoader chunkLoader, List ch public boolean hasNextBatch() throws IOException { // current chunk has additional batch - if (chunkReader != null && chunkReader.hasNextBatch()) { + if (chunkReader != null && chunkReader.hasNextSatisfiedPage()) { return true; } @@ -75,7 +75,7 @@ public boolean hasNextBatch() throws IOException { // chunk metadata satisfy the condition initChunkReader(chunkMetaData); - if (chunkReader.hasNextBatch()) { + if (chunkReader.hasNextSatisfiedPage()) { return true; } } @@ -87,7 +87,7 @@ public boolean hasNextBatch() throws IOException { * get next batch data. */ public BatchData nextBatch() throws IOException { - data = chunkReader.nextBatch(); + data = chunkReader.nextPageData(); return data; } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java index 13be71bbb4cd8..1843229fd0d73 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java @@ -25,7 +25,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.controller.IChunkLoader; -import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp; /** @@ -40,7 +40,7 @@ public class FileSeriesReaderByTimestamp { protected List chunkMetaDataList; private int currentChunkIndex = 0; - private AbstractChunkReader chunkReader; + private ChunkReader chunkReader; private long currentTimestamp; private BatchData data = null; // current batch data @@ -69,8 +69,8 @@ public Object getValueInTimestamp(long timestamp) throws IOException { return null; } - if (chunkReader.hasNextBatch()) { - data = chunkReader.nextBatch(); + if (chunkReader.hasNextSatisfiedPage()) { + data = chunkReader.nextPageData(); } else { return null; } @@ -93,8 +93,8 @@ public Object getValueInTimestamp(long timestamp) throws IOException { } return null; } else { - if (chunkReader.hasNextBatch()) { - data = chunkReader.nextBatch(); + if (chunkReader.hasNextSatisfiedPage()) { + data = chunkReader.nextPageData(); } else if (!constructNextSatisfiedChunkReader()) { return null; } @@ -115,16 +115,16 @@ public boolean hasNext() throws IOException { if (data != null && data.hasCurrent()) { return true; } - while (chunkReader.hasNextBatch()) { - data = chunkReader.nextBatch(); + while (chunkReader.hasNextSatisfiedPage()) { + data = chunkReader.nextPageData(); if (data != null && data.hasCurrent()) { return true; } } } while (constructNextSatisfiedChunkReader()) { - while (chunkReader.hasNextBatch()) { - data = chunkReader.nextBatch(); + while (chunkReader.hasNextSatisfiedPage()) { + data = chunkReader.nextPageData(); if (data != null && data.hasCurrent()) { return true; } diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/NodeTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/NodeTest.java index 9afc1ccc7d0d0..08a76c9201d92 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/NodeTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/NodeTest.java @@ -95,7 +95,7 @@ private static class FakedFileSeriesReader extends AbstractFileSeriesReader { public FakedFileSeriesReader(long[] timestamps) { super(null, null, null); - data = new BatchData(TSDataType.INT32, true); + data = new BatchData(TSDataType.INT32); for (long time : timestamps) { data.putTime(time); } diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java index bcce85c52d0c0..3a9b4a8eab45a 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java @@ -171,15 +171,12 @@ public void test(TSDataType dataType) { ByteBuffer page = ByteBuffer.wrap(pageWriter.getUncompressedBytes().array()); PageReader pageReader = new PageReader(page, dataType, decoder, - new DeltaBinaryDecoder.LongDeltaDecoder()); + new DeltaBinaryDecoder.LongDeltaDecoder(), null); int index = 0; long startTimestamp = System.currentTimeMillis(); - BatchData data = null; - if (pageReader.hasNextBatch()) { - data = pageReader.nextBatch(); - } - assert data != null; + BatchData data = pageReader.getAllSatisfiedPageData(); + Assert.assertNotNull(data); while (data.hasCurrent()) { Assert.assertEquals(Long.valueOf(index), (Long) data.currentTime());