From c0c85cd3bb60e186213373f64c2d6c9cee788d68 Mon Sep 17 00:00:00 2001
From: qiaojialin <646274302@qq.com>
Date: Thu, 26 Dec 2019 17:12:13 +0800
Subject: [PATCH 1/3] optimize PageReader and ChunkReader
---
.../iotdb/tsfile/TsFileSequenceRead.java | 16 +-
.../merge/task/MergeMultiChunkTask.java | 7 +-
.../chunkRelated/CachedDiskChunkReader.java | 14 +-
.../reader/chunkRelated/ChunkReaderWrap.java | 3 +-
.../reader/chunkRelated/DiskChunkReader.java | 13 +-
.../DiskChunkReaderByTimestamp.java | 8 +-
.../CachedUnseqResourceMergeReader.java | 3 +-
.../NewUnseqResourceMergeReader.java | 14 +-
.../org/apache/iotdb/db/utils/MergeUtils.java | 7 +-
.../recover/UnseqTsFileRecoverTest.java | 3 +-
.../reader/chunk/AbstractChunkReader.java | 163 ------------------
.../tsfile/read/reader/chunk/ChunkReader.java | 135 ++++++++++++++-
.../reader/chunk/ChunkReaderByTimestamp.java | 2 +-
.../tsfile/read/reader/page/PageReader.java | 135 ++-------------
.../series/AbstractFileSeriesReader.java | 10 +-
.../series/FileSeriesReaderByTimestamp.java | 20 +--
.../tsfile/read/reader/PageReaderTest.java | 7 +-
17 files changed, 200 insertions(+), 360 deletions(-)
delete mode 100644 tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java
diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
index 9eba9fbc2d815..ddf11d5b1420d 100644
--- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
@@ -85,15 +85,13 @@ public static void main(String[] args) throws IOException {
System.out
.println("\t\tUncompressed page data size: " + pageHeader.getUncompressedSize());
PageReader reader1 = new PageReader(pageData, header.getDataType(), valueDecoder,
- defaultTimeDecoder);
- while (reader1.hasNextBatch()) {
- BatchData batchData = reader1.nextBatch();
- while (batchData.hasCurrent()) {
- System.out.println(
- "\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
- .currentValue());
- batchData.next();
- }
+ defaultTimeDecoder, null);
+ BatchData batchData = reader1.getAllSatisfiedPageData();
+ while (batchData.hasCurrent()) {
+ System.out.println(
+ "\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
+ .currentValue());
+ batchData.next();
}
}
break;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index cecf6eee695dc..87f29c5cf2181 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -49,7 +49,6 @@
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -401,9 +400,9 @@ private int writeRemainingUnseq(IChunkWriter chunkWriter,
private int writeChunkWithUnseq(Chunk chunk, IChunkWriter chunkWriter, IPointReader unseqReader,
long chunkLimitTime, int pathIdx) throws IOException {
int cnt = 0;
- AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
- while (chunkReader.hasNextBatch()) {
- BatchData batchData = chunkReader.nextBatch();
+ ChunkReader chunkReader = new ChunkReader(chunk, null);
+ while (chunkReader.hasNextSatisfiedPage()) {
+ BatchData batchData = chunkReader.nextPageData();
cnt += mergeWriteBatch(batchData, chunkWriter, unseqReader, pathIdx);
}
cnt += writeRemainingUnseq(chunkWriter, unseqReader, chunkLimitTime, pathIdx);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java
index bdca2e55912bf..822815d30b4cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/CachedDiskChunkReader.java
@@ -24,16 +24,16 @@
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
public class CachedDiskChunkReader implements IPointReader {
- private AbstractChunkReader chunkReader;
+ private ChunkReader chunkReader;
private BatchData data;
private TimeValuePair prev;
private TimeValuePair current;
- public CachedDiskChunkReader(AbstractChunkReader chunkReader) {
+ public CachedDiskChunkReader(ChunkReader chunkReader) {
this.chunkReader = chunkReader;
this.prev =
TimeValuePairUtils.getEmptyTimeValuePair(chunkReader.getChunkHeader().getDataType());
@@ -44,8 +44,8 @@ public boolean hasNext() throws IOException {
if (data != null && data.hasCurrent()) {
return true;
}
- while (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ while (chunkReader.hasNextSatisfiedPage()) {
+ data = chunkReader.nextPageData();
if (data.hasCurrent()) {
return true;
}
@@ -60,8 +60,8 @@ public TimeValuePair next() throws IOException {
if (data.hasCurrent()) {
TimeValuePairUtils.setCurrentTimeValuePair(data, current());
} else {
- while (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ while (chunkReader.hasNextSatisfiedPage()) {
+ data = chunkReader.nextPageData();
if (data.hasCurrent()) {
TimeValuePairUtils.setCurrentTimeValuePair(data, current());
break;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
index 2bb2bb6e9f96c..4dd57bc4b611e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
@@ -26,7 +26,6 @@
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
@@ -71,7 +70,7 @@ public ChunkReaderWrap(ReadOnlyMemChunk readOnlyMemChunk, Filter filter) {
public IPointReader getIPointReader() throws IOException {
if (type.equals(ChunkReaderType.DISK_CHUNK)) {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
- AbstractChunkReader chunkReader = new ChunkReader(chunk, filter);
+ ChunkReader chunkReader = new ChunkReader(chunk, filter);
return new DiskChunkReader(chunkReader);
} else {
return new MemChunkReader(readOnlyMemChunk, filter);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java
index 1e0064284ca0d..240867e3d1b88 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java
@@ -20,16 +20,15 @@
import java.io.IOException;
import org.apache.iotdb.db.query.reader.IPointReader;
-import org.apache.iotdb.db.query.reader.resourceRelated.NewUnseqResourceMergeReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
/**
* To read chunk data on disk, this class implements an interface {@link IPointReader} based on the
- * data reader {@link AbstractChunkReader}.
+ * data reader {@link ChunkReader}.
*
* Note that ChunkReader
is an abstract class with three concrete classes, two of which
* are used here: ChunkReaderWithoutFilter
and ChunkReaderWithFilter
.
@@ -37,10 +36,10 @@
*/
public class DiskChunkReader implements IPointReader, IBatchReader {
- private AbstractChunkReader chunkReader;
+ private ChunkReader chunkReader;
private BatchData data;
- public DiskChunkReader(AbstractChunkReader chunkReader) {
+ public DiskChunkReader(ChunkReader chunkReader) {
this.chunkReader = chunkReader;
}
@@ -49,8 +48,8 @@ public boolean hasNext() throws IOException {
if (data != null && data.hasCurrent()) {
return true;
}
- while (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ while (chunkReader.hasNextSatisfiedPage()) {
+ data = chunkReader.nextPageData();
if (data.hasCurrent()) {
return true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java
index 81d98661127cc..521a4db9f16c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReaderByTimestamp.java
@@ -54,8 +54,8 @@ public Object getValueInTimestamp(long timestamp) throws IOException {
return null;
} else {
chunkReaderByTimestamp.setCurrentTimestamp(timestamp);
- if (chunkReaderByTimestamp.hasNextBatch()) {
- data = chunkReaderByTimestamp.nextBatch();
+ if (chunkReaderByTimestamp.hasNextSatisfiedPage()) {
+ data = chunkReaderByTimestamp.nextPageData();
} else {
return null;
}
@@ -70,8 +70,8 @@ public boolean hasNext() throws IOException {
if (data != null && data.hasCurrent()) {
return true;
}
- if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextBatch()) {
- data = chunkReaderByTimestamp.nextBatch();
+ if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextSatisfiedPage()) {
+ data = chunkReaderByTimestamp.nextPageData();
return true;
}
return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java
index ee0fdd8ee6a66..33c3462da762a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/CachedUnseqResourceMergeReader.java
@@ -25,7 +25,6 @@
import org.apache.iotdb.db.query.reader.universal.CachedPriorityMergeReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
public class CachedUnseqResourceMergeReader extends CachedPriorityMergeReader {
@@ -35,7 +34,7 @@ public CachedUnseqResourceMergeReader(List chunks, TSDataType dataType)
super(dataType);
int priorityValue = 1;
for (Chunk chunk : chunks) {
- AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
+ ChunkReader chunkReader = new ChunkReader(chunk, null);
addReaderWithPriority(new CachedDiskChunkReader(chunkReader), priorityValue++);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
index 61e876355d25a..ce2d379f4bae2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
@@ -88,16 +88,18 @@ public NewUnseqResourceMergeReader(Path seriesPath, TSDataType dataType,
if (tsFileResource.isClosed()) {
// get chunk metadata list of current closed tsfile
currentChunkMetaDataList = DeviceMetaDataCache.getInstance().get(tsFileResource, seriesPath);
- List pathModifications = context
- .getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
- if (!pathModifications.isEmpty()) {
- QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
- }
} else {
- // metadata list of already flushed chunk groups
+ // metadata list of already flushed chunks
currentChunkMetaDataList = tsFileResource.getChunkMetaDataList();
}
+ // get modifications and apply to chunk metadatas
+ List pathModifications = context
+ .getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
+ if (!pathModifications.isEmpty()) {
+ QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
+ }
+
if (!currentChunkMetaDataList.isEmpty()) {
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
.get(tsFileResource, tsFileResource.isClosed());
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
index a0d125375fa19..eb065d5efd008 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
@@ -37,7 +37,6 @@
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.slf4j.Logger;
@@ -101,10 +100,10 @@ public static long collectFileSizes(List seqFiles, List 0) {
- // deserialize a PageHeader from chunkDataBuffer
- pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType());
-
- // if the current page satisfies
- if (pageSatisfied(pageHeader)) {
- hasCachedPageHeader = true;
- return true;
- } else {
- skipBytesInStreamByLength(pageHeader.getCompressedSize());
- }
- }
- return false;
- }
-
- /**
- * get next data batch.
- *
- * @return next data batch
- * @throws IOException IOException
- */
- public BatchData nextBatch() throws IOException {
- PageReader pageReader = constructPageReaderForNextPage(pageHeader.getCompressedSize());
- hasCachedPageHeader = false;
- if (pageReader.hasNextBatch()) {
- data = pageReader.nextBatch();
- return data;
- }
- return data;
- }
-
-
- public PageHeader nextPageHeader() {
- return pageHeader;
- }
-
- public void skipPageData() {
- skipBytesInStreamByLength(pageHeader.getCompressedSize());
- hasCachedPageHeader = false;
- }
-
- private void skipBytesInStreamByLength(long length) {
- chunkDataBuffer.position(chunkDataBuffer.position() + (int) length);
- }
-
- public abstract boolean pageSatisfied(PageHeader pageHeader);
-
- private PageReader constructPageReaderForNextPage(int compressedPageBodyLength)
- throws IOException {
- byte[] compressedPageBody = new byte[compressedPageBodyLength];
-
- // already in memory
- if (compressedPageBodyLength > chunkDataBuffer.remaining()) {
- throw new IOException(
- "unexpected byte read length when read compressedPageBody. Expected:"
- + Arrays.toString(compressedPageBody) + ". Actual:" + chunkDataBuffer
- .remaining());
- }
- chunkDataBuffer.get(compressedPageBody, 0, compressedPageBodyLength);
- valueDecoder.reset();
- ByteBuffer pageData = ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody));
- PageReader reader = new PageReader(pageData, chunkHeader.getDataType(),
- valueDecoder, timeDecoder, filter);
- reader.setDeletedAt(deletedAt);
- return reader;
- }
-
- public void close() {
- }
-
- public ChunkHeader getChunkHeader() {
- return chunkHeader;
- }
-}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
index cf831fce8eb5d..1d8abdf379bbe 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
@@ -16,28 +16,147 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.tsfile.read.reader.chunk;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.encoding.common.EndianType;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+
+public class ChunkReader {
+
+ private ChunkHeader chunkHeader;
+ private ByteBuffer chunkDataBuffer;
+
+ private IUnCompressor unCompressor;
+ private Decoder valueDecoder;
+ private Decoder timeDecoder = Decoder.getDecoderByType(
+ TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64);
-public class ChunkReader extends AbstractChunkReader {
+ protected Filter filter;
+ private PageHeader pageHeader;
+ private boolean hasCachedPageHeader;
+
+ /**
+ * Data whose timestamp <= deletedAt should be considered deleted(not be returned).
+ */
+ protected long deletedAt;
+
+ /**
+ * constructor of ChunkReader.
+ *
+ * @param chunk input Chunk object
+ * @param filter filter
+ */
public ChunkReader(Chunk chunk, Filter filter) {
- super(chunk, filter);
+ this.filter = filter;
+ this.chunkDataBuffer = chunk.getData();
+ this.deletedAt = chunk.getDeletedAt();
+ EndianType endianType = chunk.getEndianType();
+ chunkHeader = chunk.getHeader();
+ this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
+ valueDecoder = Decoder
+ .getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType());
+ valueDecoder.setEndianType(endianType);
+ hasCachedPageHeader = false;
+ }
+
+ /**
+ * judge if has next page whose page header satisfies the filter.
+ */
+ public boolean hasNextSatisfiedPage() {
+ if (hasCachedPageHeader) {
+ return true;
+ }
+ // construct next satisfied page header
+ while (chunkDataBuffer.remaining() > 0) {
+ // deserialize a PageHeader from chunkDataBuffer
+ pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType());
+
+ // if the current page satisfies
+ if (pageSatisfied(pageHeader)) {
+ hasCachedPageHeader = true;
+ return true;
+ } else {
+ skipBytesInStreamByLength(pageHeader.getCompressedSize());
+ }
+ }
+ return false;
+ }
+
+ /**
+ * get next data batch.
+ *
+ * @return next data batch
+ * @throws IOException IOException
+ */
+ public BatchData nextPageData() throws IOException {
+ if(hasCachedPageHeader || hasNextSatisfiedPage()) {
+ PageReader pageReader = constructPageReaderForNextPage(pageHeader);
+ hasCachedPageHeader = false;
+ return pageReader.getAllSatisfiedPageData();
+ } else {
+ throw new IOException("no next page data");
+ }
+ }
+
+ public PageHeader nextPageHeader() {
+ return pageHeader;
+ }
+
+ public void skipPageData() {
+ skipBytesInStreamByLength(pageHeader.getCompressedSize());
+ hasCachedPageHeader = false;
+ }
+
+ private void skipBytesInStreamByLength(long length) {
+ chunkDataBuffer.position(chunkDataBuffer.position() + (int) length);
}
- @Override
public boolean pageSatisfied(PageHeader pageHeader) {
- if (pageHeader.getEndTime() < deletedAt) {
+ if (pageHeader.getEndTime() <= deletedAt) {
return false;
}
- if (filter == null ) {
- return true;
- } else {
- return filter.satisfy(pageHeader.getStatistics());
+ return filter == null || filter.satisfy(pageHeader.getStatistics());
+ }
+
+ private PageReader constructPageReaderForNextPage(PageHeader pageHeader)
+ throws IOException {
+ int compressedPageBodyLength = pageHeader.getCompressedSize();
+ byte[] compressedPageBody = new byte[compressedPageBodyLength];
+
+ // doesn't has a complete page body
+ if (compressedPageBodyLength > chunkDataBuffer.remaining()) {
+ throw new IOException("do not has a complete page body. Expected:" + compressedPageBodyLength
+ + ". Actual:" + chunkDataBuffer.remaining());
}
+
+ chunkDataBuffer.get(compressedPageBody);
+ valueDecoder.reset();
+ ByteBuffer pageData = ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody));
+ PageReader reader = new PageReader(pageData, chunkHeader.getDataType(),
+ valueDecoder, timeDecoder, filter);
+ reader.setDeletedAt(deletedAt);
+ return reader;
}
+ public void close() {
+ }
+
+ public ChunkHeader getChunkHeader() {
+ return chunkHeader;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
index 1630a632a6bf3..1f487d0e42c4e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
@@ -21,7 +21,7 @@
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.read.common.Chunk;
-public class ChunkReaderByTimestamp extends AbstractChunkReader {
+public class ChunkReaderByTimestamp extends ChunkReader {
private long currentTimestamp;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 7d113e9303d8f..2e43f77ac25e2 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -25,7 +25,9 @@
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
@@ -45,32 +47,24 @@ public class PageReader {
/** value column in memory */
private ByteBuffer valueBuffer;
- private BatchData data = null;
-
- private Filter filter = null;
+ private Filter filter;
+ /** Data whose timestamp <= deletedAt should be considered deleted(not be returned). */
private long deletedAt = Long.MIN_VALUE;
public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder,
- Decoder timeDecoder,
- Filter filter) {
- this(pageData, dataType, valueDecoder, timeDecoder);
- this.filter = filter;
- }
-
- public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder,
- Decoder timeDecoder) {
+ Decoder timeDecoder, Filter filter) {
this.dataType = dataType;
this.valueDecoder = valueDecoder;
this.timeDecoder = timeDecoder;
+ this.filter = filter;
splitDataToTimeStampAndValue(pageData);
}
/**
* split pageContent into two stream: time and value
*
- * @param pageData
- * uncompressed bytes size of time column, time column, value column
+ * @param pageData uncompressed bytes size of time column, time column, value column
*/
private void splitDataToTimeStampAndValue(ByteBuffer pageData) {
int timeBufferLength = ReadWriteForEncodingUtils.readUnsignedVarInt(pageData);
@@ -82,28 +76,7 @@ private void splitDataToTimeStampAndValue(ByteBuffer pageData) {
valueBuffer.position(timeBufferLength);
}
- public boolean hasNextBatch() throws IOException {
- return timeDecoder.hasNext(timeBuffer);
- }
-
- /**
- * may return an empty BatchData
- */
- public BatchData nextBatch() throws IOException {
- if (filter == null) {
- data = getAllPageData();
- } else {
- data = getAllPageDataWithFilter();
- }
-
- return data;
- }
-
- public BatchData currentBatch() {
- return data;
- }
-
- private BatchData getAllPageData() throws IOException {
+ public BatchData getAllSatisfiedPageData() throws IOException {
BatchData pageData = new BatchData(dataType, true);
@@ -113,42 +86,42 @@ private BatchData getAllPageData() throws IOException {
switch (dataType) {
case BOOLEAN:
boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
- if (timestamp > deletedAt) {
+ if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aBoolean))) {
pageData.putTime(timestamp);
pageData.putBoolean(aBoolean);
}
break;
case INT32:
int anInt = valueDecoder.readInt(valueBuffer);
- if (timestamp > deletedAt) {
+ if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, anInt))) {
pageData.putTime(timestamp);
pageData.putInt(anInt);
}
break;
case INT64:
long aLong = valueDecoder.readLong(valueBuffer);
- if (timestamp > deletedAt) {
+ if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aLong))) {
pageData.putTime(timestamp);
pageData.putLong(aLong);
}
break;
case FLOAT:
float aFloat = valueDecoder.readFloat(valueBuffer);
- if (timestamp > deletedAt) {
+ if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aFloat))) {
pageData.putTime(timestamp);
pageData.putFloat(aFloat);
}
break;
case DOUBLE:
double aDouble = valueDecoder.readDouble(valueBuffer);
- if (timestamp > deletedAt) {
+ if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aDouble))) {
pageData.putTime(timestamp);
pageData.putDouble(aDouble);
}
break;
case TEXT:
Binary aBinary = valueDecoder.readBinary(valueBuffer);
- if (timestamp > deletedAt) {
+ if (timestamp > deletedAt && (filter == null || filter.satisfy(timestamp, aBinary))) {
pageData.putTime(timestamp);
pageData.putBinary(aBinary);
}
@@ -160,86 +133,6 @@ private BatchData getAllPageData() throws IOException {
return pageData;
}
- private BatchData getAllPageDataWithFilter() throws IOException {
- BatchData pageData = new BatchData(dataType, true);
-
- while (timeDecoder.hasNext(timeBuffer)) {
- long timestamp = timeDecoder.readLong(timeBuffer);
-
- switch (dataType) {
- case BOOLEAN:
- readBoolean(pageData, timestamp);
- break;
- case INT32:
- readInt(pageData, timestamp);
- break;
- case INT64:
- readLong(pageData, timestamp);
- break;
- case FLOAT:
- readFloat(pageData, timestamp);
- break;
- case DOUBLE:
- readDouble(pageData, timestamp);
- break;
- case TEXT:
- readText(pageData, timestamp);
- break;
- default:
- throw new UnSupportedDataTypeException(String.valueOf(dataType));
- }
- }
-
- return pageData;
- }
-
- private void readBoolean(BatchData pageData, long timestamp) {
- boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
- if (timestamp > deletedAt && filter.satisfy(timestamp, aBoolean)) {
- pageData.putTime(timestamp);
- pageData.putBoolean(aBoolean);
- }
- }
-
- private void readInt(BatchData pageData, long timestamp) {
- int anInt = valueDecoder.readInt(valueBuffer);
- if (timestamp > deletedAt && filter.satisfy(timestamp, anInt)) {
- pageData.putTime(timestamp);
- pageData.putInt(anInt);
- }
- }
-
- private void readLong(BatchData pageData, long timestamp) {
- long aLong = valueDecoder.readLong(valueBuffer);
- if (timestamp > deletedAt && filter.satisfy(timestamp, aLong)) {
- pageData.putTime(timestamp);
- pageData.putLong(aLong);
- }
- }
-
- private void readFloat(BatchData pageData, long timestamp) {
- float aFloat = valueDecoder.readFloat(valueBuffer);
- if (timestamp > deletedAt && filter.satisfy(timestamp, aFloat)) {
- pageData.putTime(timestamp);
- pageData.putFloat(aFloat);
- }
- }
-
- private void readDouble(BatchData pageData, long timestamp) {
- double aDouble = valueDecoder.readDouble(valueBuffer);
- if (timestamp > deletedAt && filter.satisfy(timestamp, aDouble)) {
- pageData.putTime(timestamp);
- pageData.putDouble(aDouble);
- }
- }
-
- private void readText(BatchData pageData, long timestamp) {
- Binary aBinary = valueDecoder.readBinary(valueBuffer);
- if (timestamp > deletedAt && filter.satisfy(timestamp, aBinary)) {
- pageData.putTime(timestamp);
- pageData.putBinary(aBinary);
- }
- }
public void close() {
timeBuffer = null;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java
index 069c4352aa1ad..14bb4be34dce4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/AbstractFileSeriesReader.java
@@ -25,7 +25,7 @@
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IAggregateReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import java.io.IOException;
import java.util.List;
@@ -37,7 +37,7 @@ public abstract class AbstractFileSeriesReader implements IAggregateReader {
protected IChunkLoader chunkLoader;
protected List chunkMetaDataList;
- protected AbstractChunkReader chunkReader;
+ protected ChunkReader chunkReader;
private int chunkToRead;
private BatchData data;
@@ -63,7 +63,7 @@ public AbstractFileSeriesReader(IChunkLoader chunkLoader, List ch
public boolean hasNextBatch() throws IOException {
// current chunk has additional batch
- if (chunkReader != null && chunkReader.hasNextBatch()) {
+ if (chunkReader != null && chunkReader.hasNextSatisfiedPage()) {
return true;
}
@@ -75,7 +75,7 @@ public boolean hasNextBatch() throws IOException {
// chunk metadata satisfy the condition
initChunkReader(chunkMetaData);
- if (chunkReader.hasNextBatch()) {
+ if (chunkReader.hasNextSatisfiedPage()) {
return true;
}
}
@@ -87,7 +87,7 @@ public boolean hasNextBatch() throws IOException {
* get next batch data.
*/
public BatchData nextBatch() throws IOException {
- data = chunkReader.nextBatch();
+ data = chunkReader.nextPageData();
return data;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java
index 13be71bbb4cd8..1843229fd0d73 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderByTimestamp.java
@@ -25,7 +25,7 @@
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
-import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
/**
@@ -40,7 +40,7 @@ public class FileSeriesReaderByTimestamp {
protected List chunkMetaDataList;
private int currentChunkIndex = 0;
- private AbstractChunkReader chunkReader;
+ private ChunkReader chunkReader;
private long currentTimestamp;
private BatchData data = null; // current batch data
@@ -69,8 +69,8 @@ public Object getValueInTimestamp(long timestamp) throws IOException {
return null;
}
- if (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ if (chunkReader.hasNextSatisfiedPage()) {
+ data = chunkReader.nextPageData();
} else {
return null;
}
@@ -93,8 +93,8 @@ public Object getValueInTimestamp(long timestamp) throws IOException {
}
return null;
} else {
- if (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ if (chunkReader.hasNextSatisfiedPage()) {
+ data = chunkReader.nextPageData();
} else if (!constructNextSatisfiedChunkReader()) {
return null;
}
@@ -115,16 +115,16 @@ public boolean hasNext() throws IOException {
if (data != null && data.hasCurrent()) {
return true;
}
- while (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ while (chunkReader.hasNextSatisfiedPage()) {
+ data = chunkReader.nextPageData();
if (data != null && data.hasCurrent()) {
return true;
}
}
}
while (constructNextSatisfiedChunkReader()) {
- while (chunkReader.hasNextBatch()) {
- data = chunkReader.nextBatch();
+ while (chunkReader.hasNextSatisfiedPage()) {
+ data = chunkReader.nextPageData();
if (data != null && data.hasCurrent()) {
return true;
}
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java
index bcce85c52d0c0..6a55bd175a663 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/reader/PageReaderTest.java
@@ -171,14 +171,11 @@ public void test(TSDataType dataType) {
ByteBuffer page = ByteBuffer.wrap(pageWriter.getUncompressedBytes().array());
PageReader pageReader = new PageReader(page, dataType, decoder,
- new DeltaBinaryDecoder.LongDeltaDecoder());
+ new DeltaBinaryDecoder.LongDeltaDecoder(), null);
int index = 0;
long startTimestamp = System.currentTimeMillis();
- BatchData data = null;
- if (pageReader.hasNextBatch()) {
- data = pageReader.nextBatch();
- }
+ BatchData data = pageReader.getAllSatisfiedPageData();
assert data != null;
while (data.hasCurrent()) {
From bcf05023fa1fe3f047e46d9ed7b8191242a391b9 Mon Sep 17 00:00:00 2001
From: qiaojialin <646274302@qq.com>
Date: Thu, 26 Dec 2019 17:30:36 +0800
Subject: [PATCH 2/3] refactor BatchData
---
.../reader/chunkRelated/MemChunkReader.java | 2 +-
.../NewUnseqResourceMergeReader.java | 2 +-
.../SeriesReaderWithoutValueFilter.java | 2 +-
.../seriesRelated/FakedIBatchPoint.java | 2 +-
.../iotdb/tsfile/read/common/BatchData.java | 122 ++----------------
.../tsfile/read/reader/page/PageReader.java | 5 +-
.../read/query/timegenerator/NodeTest.java | 2 +-
7 files changed, 23 insertions(+), 114 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java
index 5c7aa47cc6e78..9d596d0a83fa4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/MemChunkReader.java
@@ -92,7 +92,7 @@ public boolean hasNextBatch() throws IOException {
@Override
public BatchData nextBatch() {
- BatchData batchData = new BatchData(dataType, true);
+ BatchData batchData = new BatchData(dataType);
if (hasCachedTimeValuePair) {
hasCachedTimeValuePair = false;
batchData.putTime(cachedTimeValuePair.getTimestamp());
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
index ce2d379f4bae2..2349fb82ff3bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/NewUnseqResourceMergeReader.java
@@ -145,7 +145,7 @@ public boolean hasNextBatch() throws IOException {
return true;
}
- batchData = new BatchData(dataType, true);
+ batchData = new BatchData(dataType);
for (int rowCount = 0; rowCount < batchSize; rowCount++) {
if (priorityMergeReader.hasNext()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java
index ed71dc8c49ada..eb8503fc2f021 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java
@@ -133,7 +133,7 @@ public BatchData nextBatch() throws IOException {
if (hasNextInSeq() && hasNextInUnSeq()) {
// if the count reaches batch data size
int count = 0;
- BatchData batchData = new BatchData(seqBatchData.getDataType(), true);
+ BatchData batchData = new BatchData(seqBatchData.getDataType());
while (count < batchSize && hasNextInSeq() && hasNextInUnSeq()) {
long timeInSeq = seqBatchData.currentTime();
long timeInUnseq = unseqBatchData.currentTime();
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/FakedIBatchPoint.java b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/FakedIBatchPoint.java
index 54fe52b46e8f8..7ab6a35749017 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/FakedIBatchPoint.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/FakedIBatchPoint.java
@@ -90,7 +90,7 @@ private void constructBatchData() {
if (!hasEmptyBatch) {
num += 1;
}
- batchData = new BatchData(TSDataType.INT64, true);
+ batchData = new BatchData(TSDataType.INT64);
while (num > 0 && iterator.hasNext()) {
TimeValuePair timeValuePair = iterator.next();
batchData.putTime(timeValuePair.getTimestamp());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
index 811f8037ed071..43013ae9be88c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java
@@ -32,7 +32,12 @@
* BatchData
is a self-defined data structure which is optimized for different type of
* values. This class can be viewed as a collection which is more efficient than ArrayList.
*
- * We don't return empty batch data. If you get a batch data, you can iterate the data as the following codes:
+ * This class records a time list and a value list, which could be replaced by TVList in the future
+ *
+ * When you use BatchData in query process, it does not contain duplicated timestamps. The batch data
+ * may be empty.
+ *
+ * If you get a batch data, you can iterate the data as the following codes:
*
* while (batchData.hasCurrent()) {
* long time = batchData.currentTime();
@@ -45,7 +50,6 @@ public class BatchData implements Serializable {
private static final long serialVersionUID = -4620310601188394839L;
private int timeCapacity = 16;
private int valueCapacity = 16;
- private int emptyTimeCapacity = 1;
private int capacityThreshold = 1024;
private TSDataType dataType;
@@ -78,7 +82,6 @@ public class BatchData implements Serializable {
private int valueLength;
private ArrayList timeRet;
- private ArrayList emptyTimeRet;
private ArrayList booleanRet;
private ArrayList intRet;
private ArrayList longRet;
@@ -90,22 +93,13 @@ public BatchData() {
dataType = null;
}
- public BatchData(TSDataType type) {
- dataType = type;
- }
-
/**
* BatchData Constructor.
*
* @param type Data type to record for this BatchData
- * @param recordTime whether to record time value for this BatchData
*/
- public BatchData(TSDataType type, boolean recordTime) {
- init(type, recordTime, false);
- }
-
- public BatchData(TSDataType type, boolean recordTime, boolean hasEmptyTime) {
- init(type, recordTime, hasEmptyTime);
+ public BatchData(TSDataType type) {
+ init(type);
}
public boolean isEmpty() {
@@ -176,10 +170,8 @@ public TSDataType getDataType() {
* initialize batch data.
*
* @param type TSDataType
- * @param recordTime if record time
- * @param hasEmptyTime if has empty time
*/
- public void init(TSDataType type, boolean recordTime, boolean hasEmptyTime) {
+ public void init(TSDataType type) {
this.dataType = type;
this.valueArrayIdx = 0;
this.curValueIdx = 0;
@@ -187,18 +179,11 @@ public void init(TSDataType type, boolean recordTime, boolean hasEmptyTime) {
this.curIdx = 0;
capacityThreshold = TSFileConfig.DYNAMIC_DATA_SIZE;
- if (recordTime) {
- timeRet = new ArrayList<>();
- timeRet.add(new long[timeCapacity]);
- timeArrayIdx = 0;
- curTimeIdx = 0;
- count = 0;
- }
-
- if (hasEmptyTime) {
- emptyTimeRet = new ArrayList<>();
- emptyTimeRet.add(new long[emptyTimeCapacity]);
- }
+ timeRet = new ArrayList<>();
+ timeRet.add(new long[timeCapacity]);
+ timeArrayIdx = 0;
+ curTimeIdx = 0;
+ count = 0;
switch (dataType) {
case BOOLEAN:
@@ -422,13 +407,6 @@ private void rangeCheckForTime(int idx) {
}
}
- private void rangeCheckForEmptyTime(int idx) {
- if (idx < 0) {
- throw new IndexOutOfBoundsException(
- "BatchData empty time range check, Index is negative: " + idx);
- }
- }
-
public boolean getBoolean() {
rangeCheck(curIdx);
return this.booleanRet.get(curIdx / timeCapacity)[curIdx % timeCapacity];
@@ -493,24 +471,6 @@ public void setTime(int idx, long v) {
this.timeRet.get(idx / timeCapacity)[idx % timeCapacity] = v;
}
- public long getEmptyTime(int idx) {
- rangeCheckForEmptyTime(idx);
- return this.emptyTimeRet.get(idx / emptyTimeCapacity)[idx % emptyTimeCapacity];
- }
-
- /**
- * get time as array in long[] structure.
- *
- * @return time array
- */
- public long[] getTimeAsArray() {
- long[] res = new long[count];
- for (int i = 0; i < count; i++) {
- res[i] = timeRet.get(i / timeCapacity)[i % timeCapacity];
- }
- return res;
- }
-
/**
* put an object.
*
@@ -541,45 +501,10 @@ public void putAnObject(Object v) {
}
}
- /**
- * set an object.
- *
- * @param idx object id
- * @param v object value
- */
- public void setAnObject(int idx, Comparable> v) {
- switch (dataType) {
- case BOOLEAN:
- setBoolean(idx, (Boolean) v);
- break;
- case DOUBLE:
- setDouble(idx, (Double) v);
- break;
- case TEXT:
- setBinary(idx, (Binary) v);
- break;
- case FLOAT:
- setFloat(idx, (Float) v);
- break;
- case INT32:
- setInt(idx, (Integer) v);
- break;
- case INT64:
- setLong(idx, (Long) v);
- break;
- default:
- throw new UnSupportedDataTypeException(String.valueOf(dataType));
- }
- }
-
public int length() {
return this.count;
}
- public int getCurIdx() {
- return curIdx;
- }
-
public long getTimeByIndex(int idx) {
rangeCheckForTime(idx);
return this.timeRet.get(idx / timeCapacity)[idx % timeCapacity];
@@ -615,25 +540,6 @@ public boolean getBooleanByIndex(int idx) {
return booleanRet.get(idx / timeCapacity)[idx % timeCapacity];
}
- public Object getValueByIndex(int idx) {
- switch (dataType) {
- case INT32:
- return getIntByIndex(idx);
- case INT64:
- return getLongByIndex(idx);
- case FLOAT:
- return getFloatByIndex(idx);
- case DOUBLE:
- return getDoubleByIndex(idx);
- case BOOLEAN:
- return getBooleanByIndex(idx);
- case TEXT:
- return getBinaryByIndex(idx);
- default:
- return null;
- }
- }
-
public Object getValueInTimestamp(long time) {
while (hasCurrent()) {
if (currentTime() < time) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 2e43f77ac25e2..9bf75aa3c2578 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -76,9 +76,12 @@ private void splitDataToTimeStampAndValue(ByteBuffer pageData) {
valueBuffer.position(timeBufferLength);
}
+ /**
+ * @return the returned BatchData may be empty, but never be null
+ */
public BatchData getAllSatisfiedPageData() throws IOException {
- BatchData pageData = new BatchData(dataType, true);
+ BatchData pageData = new BatchData(dataType);
while (timeDecoder.hasNext(timeBuffer)) {
long timestamp = timeDecoder.readLong(timeBuffer);
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/NodeTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/NodeTest.java
index 9afc1ccc7d0d0..08a76c9201d92 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/NodeTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/NodeTest.java
@@ -95,7 +95,7 @@ private static class FakedFileSeriesReader extends AbstractFileSeriesReader {
public FakedFileSeriesReader(long[] timestamps) {
super(null, null, null);
- data = new BatchData(TSDataType.INT32, true);
+ data = new BatchData(TSDataType.INT32);
for (long time : timestamps) {
data.putTime(time);
}
From 072877cac14f81b09788502d4a5b336169cacb55 Mon Sep 17 00:00:00 2001
From: qiaojialin <646274302@qq.com>
Date: Thu, 26 Dec 2019 18:29:35 +0800
Subject: [PATCH 3/3] remove useless code
---
.../SeqResourceIterateReader.java | 21 -------------------
1 file changed, 21 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/SeqResourceIterateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/SeqResourceIterateReader.java
index 1226331023bc1..e46a37acf1529 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/SeqResourceIterateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/SeqResourceIterateReader.java
@@ -132,27 +132,6 @@ public boolean constructNextReader(int idx) throws IOException {
}
}
- /**
- * Returns true if the start and end time of the series data in this sequence TsFile do not
- * satisfy the filter condition. Returns false if satisfy.
- *
- * This method is used to in constructNextReader
to check whether this TsFile can be
- * skipped.
- *
- * @param tsFile the TsFileResource corresponding to this TsFile
- * @param filter filter condition. Null if no filter.
- * @return True if the TsFile's start and end time do not satisfy the filter condition; False if
- * satisfy.
- */
- private boolean isTsFileNotSatisfied(TsFileResource tsFile, Filter filter) {
- if (filter == null) {
- return false;
- }
- long startTime = tsFile.getStartTimeMap().get(seriesPath.getDevice());
- long endTime = tsFile.getEndTimeMap().get(seriesPath.getDevice());
- return !filter.satisfyStartEndTime(startTime, endTime);
- }
-
private IAggregateReader initSealedTsFileReader(TsFileResource sealedTsFile, Filter filter,
QueryContext context) throws IOException {
// prepare metaDataList