Skip to content

Commit

Permalink
Merge 0687943 into f0f229d
Browse files Browse the repository at this point in the history
  • Loading branch information
Jialin Qiao committed Dec 26, 2019
2 parents f0f229d + 0687943 commit e56d433
Show file tree
Hide file tree
Showing 23 changed files with 227 additions and 493 deletions.
Expand Up @@ -85,15 +85,13 @@ public static void main(String[] args) throws IOException {
System.out
.println("\t\tUncompressed page data size: " + pageHeader.getUncompressedSize());
PageReader reader1 = new PageReader(pageData, header.getDataType(), valueDecoder,
defaultTimeDecoder);
while (reader1.hasNextBatch()) {
BatchData batchData = reader1.nextBatch();
while (batchData.hasCurrent()) {
System.out.println(
"\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
.currentValue());
batchData.next();
}
defaultTimeDecoder, null);
BatchData batchData = reader1.getAllSatisfiedPageData();
while (batchData.hasCurrent()) {
System.out.println(
"\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
.currentValue());
batchData.next();
}
}
break;
Expand Down
Expand Up @@ -49,7 +49,6 @@
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
Expand Down Expand Up @@ -401,9 +400,9 @@ private int writeRemainingUnseq(IChunkWriter chunkWriter,
private int writeChunkWithUnseq(Chunk chunk, IChunkWriter chunkWriter, IPointReader unseqReader,
long chunkLimitTime, int pathIdx) throws IOException {
int cnt = 0;
AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
while (chunkReader.hasNextBatch()) {
BatchData batchData = chunkReader.nextBatch();
ChunkReader chunkReader = new ChunkReader(chunk, null);
while (chunkReader.hasNextSatisfiedPage()) {
BatchData batchData = chunkReader.nextPageData();
cnt += mergeWriteBatch(batchData, chunkWriter, unseqReader, pathIdx);
}
cnt += writeRemainingUnseq(chunkWriter, unseqReader, chunkLimitTime, pathIdx);
Expand Down
Expand Up @@ -24,16 +24,16 @@
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;

public class CachedDiskChunkReader implements IPointReader {

private AbstractChunkReader chunkReader;
private ChunkReader chunkReader;
private BatchData data;
private TimeValuePair prev;
private TimeValuePair current;

public CachedDiskChunkReader(AbstractChunkReader chunkReader) {
public CachedDiskChunkReader(ChunkReader chunkReader) {
this.chunkReader = chunkReader;
this.prev =
TimeValuePairUtils.getEmptyTimeValuePair(chunkReader.getChunkHeader().getDataType());
Expand All @@ -44,8 +44,8 @@ public boolean hasNext() throws IOException {
if (data != null && data.hasCurrent()) {
return true;
}
while (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
data = chunkReader.nextPageData();
if (data.hasCurrent()) {
return true;
}
Expand All @@ -60,8 +60,8 @@ public TimeValuePair next() throws IOException {
if (data.hasCurrent()) {
TimeValuePairUtils.setCurrentTimeValuePair(data, current());
} else {
while (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
data = chunkReader.nextPageData();
if (data.hasCurrent()) {
TimeValuePairUtils.setCurrentTimeValuePair(data, current());
break;
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;

Expand Down Expand Up @@ -71,7 +70,7 @@ public ChunkReaderWrap(ReadOnlyMemChunk readOnlyMemChunk, Filter filter) {
public IPointReader getIPointReader() throws IOException {
if (type.equals(ChunkReaderType.DISK_CHUNK)) {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
AbstractChunkReader chunkReader = new ChunkReader(chunk, filter);
ChunkReader chunkReader = new ChunkReader(chunk, filter);
return new DiskChunkReader(chunkReader);
} else {
return new MemChunkReader(readOnlyMemChunk, filter);
Expand Down
Expand Up @@ -20,27 +20,26 @@

import java.io.IOException;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.resourceRelated.NewUnseqResourceMergeReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;

/**
* To read chunk data on disk, this class implements an interface {@link IPointReader} based on the
* data reader {@link AbstractChunkReader}.
* data reader {@link ChunkReader}.
* <p>
* Note that <code>ChunkReader</code> is an abstract class with three concrete classes, two of which
* are used here: <code>ChunkReaderWithoutFilter</code> and <code>ChunkReaderWithFilter</code>.
* <p>
*/
public class DiskChunkReader implements IPointReader, IBatchReader {

private AbstractChunkReader chunkReader;
private ChunkReader chunkReader;
private BatchData data;

public DiskChunkReader(AbstractChunkReader chunkReader) {
public DiskChunkReader(ChunkReader chunkReader) {
this.chunkReader = chunkReader;
}

Expand All @@ -49,8 +48,8 @@ public boolean hasNext() throws IOException {
if (data != null && data.hasCurrent()) {
return true;
}
while (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
data = chunkReader.nextPageData();
if (data.hasCurrent()) {
return true;
}
Expand Down
Expand Up @@ -54,8 +54,8 @@ public Object getValueInTimestamp(long timestamp) throws IOException {
return null;
} else {
chunkReaderByTimestamp.setCurrentTimestamp(timestamp);
if (chunkReaderByTimestamp.hasNextBatch()) {
data = chunkReaderByTimestamp.nextBatch();
if (chunkReaderByTimestamp.hasNextSatisfiedPage()) {
data = chunkReaderByTimestamp.nextPageData();
} else {
return null;
}
Expand All @@ -70,8 +70,8 @@ public boolean hasNext() throws IOException {
if (data != null && data.hasCurrent()) {
return true;
}
if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextBatch()) {
data = chunkReaderByTimestamp.nextBatch();
if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextSatisfiedPage()) {
data = chunkReaderByTimestamp.nextPageData();
return true;
}
return false;
Expand Down
Expand Up @@ -92,7 +92,7 @@ public boolean hasNextBatch() throws IOException {

@Override
public BatchData nextBatch() {
BatchData batchData = new BatchData(dataType, true);
BatchData batchData = new BatchData(dataType);
if (hasCachedTimeValuePair) {
hasCachedTimeValuePair = false;
batchData.putTime(cachedTimeValuePair.getTimestamp());
Expand Down
Expand Up @@ -25,7 +25,6 @@
import org.apache.iotdb.db.query.reader.universal.CachedPriorityMergeReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;

public class CachedUnseqResourceMergeReader extends CachedPriorityMergeReader {
Expand All @@ -35,7 +34,7 @@ public CachedUnseqResourceMergeReader(List<Chunk> chunks, TSDataType dataType)
super(dataType);
int priorityValue = 1;
for (Chunk chunk : chunks) {
AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
ChunkReader chunkReader = new ChunkReader(chunk, null);
addReaderWithPriority(new CachedDiskChunkReader(chunkReader), priorityValue++);
}
}
Expand Down
Expand Up @@ -88,16 +88,18 @@ public NewUnseqResourceMergeReader(Path seriesPath, TSDataType dataType,
if (tsFileResource.isClosed()) {
// get chunk metadata list of current closed tsfile
currentChunkMetaDataList = DeviceMetaDataCache.getInstance().get(tsFileResource, seriesPath);
List<Modification> pathModifications = context
.getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
if (!pathModifications.isEmpty()) {
QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
}
} else {
// metadata list of already flushed chunk groups
// metadata list of already flushed chunks
currentChunkMetaDataList = tsFileResource.getChunkMetaDataList();
}

// get modifications and apply to chunk metadatas
List<Modification> pathModifications = context
.getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
if (!pathModifications.isEmpty()) {
QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
}

if (!currentChunkMetaDataList.isEmpty()) {
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
.get(tsFileResource, tsFileResource.isClosed());
Expand Down Expand Up @@ -143,7 +145,7 @@ public boolean hasNextBatch() throws IOException {
return true;
}

batchData = new BatchData(dataType, true);
batchData = new BatchData(dataType);

for (int rowCount = 0; rowCount < batchSize; rowCount++) {
if (priorityMergeReader.hasNext()) {
Expand Down
Expand Up @@ -132,26 +132,6 @@ public boolean constructNextReader(int idx) throws IOException {
}
}

/**
* Returns true if the start and end time of the series data in this sequence TsFile do not
* satisfy the filter condition. Returns false if satisfy.
* <p>
* This method is used to in <code>constructNextReader</code> to check whether this TsFile can be
* skipped.
*
* @param tsFile the TsFileResource corresponding to this TsFile
* @param filter filter condition. Null if no filter.
* @return True if the TsFile's start and end time do not satisfy the filter condition; False if
* satisfy.
*/
private boolean isTsFileNotSatisfied(TsFileResource tsFile, Filter filter) {
if (filter == null) {
return false;
}
long startTime = tsFile.getStartTimeMap().get(seriesPath.getDevice());
long endTime = tsFile.getEndTimeMap().get(seriesPath.getDevice());
return !filter.satisfyStartEndTime(startTime, endTime);
}

private IAggregateReader initSealedTsFileReader(TsFileResource sealedTsFile, Filter filter,
QueryContext context) throws IOException {
Expand Down
Expand Up @@ -133,7 +133,7 @@ public BatchData nextBatch() throws IOException {
if (hasNextInSeq() && hasNextInUnSeq()) {
// if the count reaches batch data size
int count = 0;
BatchData batchData = new BatchData(seqBatchData.getDataType(), true);
BatchData batchData = new BatchData(seqBatchData.getDataType());
while (count < batchSize && hasNextInSeq() && hasNextInUnSeq()) {
long timeInSeq = seqBatchData.currentTime();
long timeInUnseq = unseqBatchData.currentTime();
Expand Down
Expand Up @@ -37,7 +37,6 @@
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -101,10 +100,10 @@ public static long collectFileSizes(List<TsFileResource> seqFiles, List<TsFileRe
}

public static int writeChunkWithoutUnseq(Chunk chunk, IChunkWriter chunkWriter) throws IOException {
AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
ChunkReader chunkReader = new ChunkReader(chunk, null);
int ptWritten = 0;
while (chunkReader.hasNextBatch()) {
BatchData batchData = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
BatchData batchData = chunkReader.nextPageData();
for (int i = 0; i < batchData.length(); i++) {
writeBatchPoint(batchData, i, chunkWriter);
}
Expand Down
Expand Up @@ -90,7 +90,7 @@ private void constructBatchData() {
if (!hasEmptyBatch) {
num += 1;
}
batchData = new BatchData(TSDataType.INT64, true);
batchData = new BatchData(TSDataType.INT64);
while (num > 0 && iterator.hasNext()) {
TimeValuePair timeValuePair = iterator.next();
batchData.putTime(timeValuePair.getTimestamp());
Expand Down
Expand Up @@ -46,7 +46,6 @@
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier;
import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
Expand Down Expand Up @@ -165,7 +164,7 @@ public void test() throws StorageGroupProcessorException, IOException {
int priorityValue = 1;
for (ChunkMetaData chunkMetaData : metadataQuerier.getChunkMetaDataList(path)) {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
ChunkReader chunkReader = new ChunkReader(chunk, null);
unSeqMergeReader
.addReaderWithPriority(new DiskChunkReader(chunkReader), priorityValue);
priorityValue++;
Expand Down

0 comments on commit e56d433

Please sign in to comment.