Skip to content

Commit

Permalink
Merge 4d86120 into 777dd00
Browse files Browse the repository at this point in the history
  • Loading branch information
Jialin Qiao committed Dec 27, 2019
2 parents 777dd00 + 4d86120 commit ffcbbd0
Show file tree
Hide file tree
Showing 25 changed files with 334 additions and 489 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,13 @@ public static void main(String[] args) throws IOException {
System.out
.println("\t\tUncompressed page data size: " + pageHeader.getUncompressedSize());
PageReader reader1 = new PageReader(pageData, header.getDataType(), valueDecoder,
defaultTimeDecoder);
while (reader1.hasNextBatch()) {
BatchData batchData = reader1.nextBatch();
while (batchData.hasCurrent()) {
System.out.println(
"\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
.currentValue());
batchData.next();
}
defaultTimeDecoder, null);
BatchData batchData = reader1.getAllSatisfiedPageData();
while (batchData.hasCurrent()) {
System.out.println(
"\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
.currentValue());
batchData.next();
}
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
Expand Down Expand Up @@ -401,9 +400,9 @@ private int writeRemainingUnseq(IChunkWriter chunkWriter,
private int writeChunkWithUnseq(Chunk chunk, IChunkWriter chunkWriter, IPointReader unseqReader,
long chunkLimitTime, int pathIdx) throws IOException {
int cnt = 0;
AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
while (chunkReader.hasNextBatch()) {
BatchData batchData = chunkReader.nextBatch();
ChunkReader chunkReader = new ChunkReader(chunk, null);
while (chunkReader.hasNextSatisfiedPage()) {
BatchData batchData = chunkReader.nextPageData();
cnt += mergeWriteBatch(batchData, chunkWriter, unseqReader, pathIdx);
}
cnt += writeRemainingUnseq(chunkWriter, unseqReader, chunkLimitTime, pathIdx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;

public class CachedDiskChunkReader implements IPointReader {

private AbstractChunkReader chunkReader;
private ChunkReader chunkReader;
private BatchData data;
private TimeValuePair prev;
private TimeValuePair current;

public CachedDiskChunkReader(AbstractChunkReader chunkReader) {
public CachedDiskChunkReader(ChunkReader chunkReader) {
this.chunkReader = chunkReader;
this.prev =
TimeValuePairUtils.getEmptyTimeValuePair(chunkReader.getChunkHeader().getDataType());
Expand All @@ -44,8 +44,8 @@ public boolean hasNext() throws IOException {
if (data != null && data.hasCurrent()) {
return true;
}
while (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
data = chunkReader.nextPageData();
if (data.hasCurrent()) {
return true;
}
Expand All @@ -60,8 +60,8 @@ public TimeValuePair next() throws IOException {
if (data.hasCurrent()) {
TimeValuePairUtils.setCurrentTimeValuePair(data, current());
} else {
while (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
data = chunkReader.nextPageData();
if (data.hasCurrent()) {
TimeValuePairUtils.setCurrentTimeValuePair(data, current());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;

Expand Down Expand Up @@ -71,7 +70,7 @@ public ChunkReaderWrap(ReadOnlyMemChunk readOnlyMemChunk, Filter filter) {
public IPointReader getIPointReader() throws IOException {
if (type.equals(ChunkReaderType.DISK_CHUNK)) {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
AbstractChunkReader chunkReader = new ChunkReader(chunk, filter);
ChunkReader chunkReader = new ChunkReader(chunk, filter);
return new DiskChunkReader(chunkReader);
} else {
return new MemChunkReader(readOnlyMemChunk, filter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,26 @@

import java.io.IOException;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.resourceRelated.NewUnseqResourceMergeReader;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;

/**
* To read chunk data on disk, this class implements an interface {@link IPointReader} based on the
* data reader {@link AbstractChunkReader}.
* data reader {@link ChunkReader}.
* <p>
* Note that <code>ChunkReader</code> is an abstract class with three concrete classes, two of which
* are used here: <code>ChunkReaderWithoutFilter</code> and <code>ChunkReaderWithFilter</code>.
* <p>
*/
public class DiskChunkReader implements IPointReader, IBatchReader {

private AbstractChunkReader chunkReader;
private ChunkReader chunkReader;
private BatchData data;

public DiskChunkReader(AbstractChunkReader chunkReader) {
public DiskChunkReader(ChunkReader chunkReader) {
this.chunkReader = chunkReader;
}

Expand All @@ -49,8 +48,8 @@ public boolean hasNext() throws IOException {
if (data != null && data.hasCurrent()) {
return true;
}
while (chunkReader.hasNextBatch()) {
data = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
data = chunkReader.nextPageData();
if (data.hasCurrent()) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public Object getValueInTimestamp(long timestamp) throws IOException {
return null;
} else {
chunkReaderByTimestamp.setCurrentTimestamp(timestamp);
if (chunkReaderByTimestamp.hasNextBatch()) {
data = chunkReaderByTimestamp.nextBatch();
if (chunkReaderByTimestamp.hasNextSatisfiedPage()) {
data = chunkReaderByTimestamp.nextPageData();
} else {
return null;
}
Expand All @@ -70,8 +70,8 @@ public boolean hasNext() throws IOException {
if (data != null && data.hasCurrent()) {
return true;
}
if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextBatch()) {
data = chunkReaderByTimestamp.nextBatch();
if (chunkReaderByTimestamp != null && chunkReaderByTimestamp.hasNextSatisfiedPage()) {
data = chunkReaderByTimestamp.nextPageData();
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public boolean hasNextBatch() throws IOException {

@Override
public BatchData nextBatch() {
BatchData batchData = new BatchData(dataType, true);
BatchData batchData = new BatchData(dataType);
if (hasCachedTimeValuePair) {
hasCachedTimeValuePair = false;
batchData.putTime(cachedTimeValuePair.getTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.iotdb.db.query.reader.universal.CachedPriorityMergeReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;

public class CachedUnseqResourceMergeReader extends CachedPriorityMergeReader {
Expand All @@ -35,7 +34,7 @@ public CachedUnseqResourceMergeReader(List<Chunk> chunks, TSDataType dataType)
super(dataType);
int priorityValue = 1;
for (Chunk chunk : chunks) {
AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
ChunkReader chunkReader = new ChunkReader(chunk, null);
addReaderWithPriority(new CachedDiskChunkReader(chunkReader), priorityValue++);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,15 @@ public NewUnseqResourceMergeReader(Path seriesPath, TSDataType dataType,
if (tsFileResource.isClosed()) {
// get chunk metadata list of current closed tsfile
currentChunkMetaDataList = DeviceMetaDataCache.getInstance().get(tsFileResource, seriesPath);

// get modifications and apply to chunk metadatas
List<Modification> pathModifications = context
.getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
if (!pathModifications.isEmpty()) {
QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
}
} else {
// metadata list of already flushed chunk groups
// metadata list of already flushed chunks in unsealed file, already applied modifications
currentChunkMetaDataList = tsFileResource.getChunkMetaDataList();
}

Expand Down Expand Up @@ -143,7 +145,7 @@ public boolean hasNextBatch() throws IOException {
return true;
}

batchData = new BatchData(dataType, true);
batchData = new BatchData(dataType);

for (int rowCount = 0; rowCount < batchSize; rowCount++) {
if (priorityMergeReader.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,27 +132,6 @@ public boolean constructNextReader(int idx) throws IOException {
}
}

/**
* Returns true if the start and end time of the series data in this sequence TsFile do not
* satisfy the filter condition. Returns false if satisfy.
* <p>
* This method is used to in <code>constructNextReader</code> to check whether this TsFile can be
* skipped.
*
* @param tsFile the TsFileResource corresponding to this TsFile
* @param filter filter condition. Null if no filter.
* @return True if the TsFile's start and end time do not satisfy the filter condition; False if
* satisfy.
*/
private boolean isTsFileNotSatisfied(TsFileResource tsFile, Filter filter) {
if (filter == null) {
return false;
}
long startTime = tsFile.getStartTimeMap().get(seriesPath.getDevice());
long endTime = tsFile.getEndTimeMap().get(seriesPath.getDevice());
return !filter.satisfyStartEndTime(startTime, endTime);
}

private IAggregateReader initSealedTsFileReader(TsFileResource sealedTsFile, Filter filter,
QueryContext context) throws IOException {
// prepare metaDataList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public BatchData nextBatch() throws IOException {
if (hasNextInSeq() && hasNextInUnSeq()) {
// if the count reaches batch data size
int count = 0;
BatchData batchData = new BatchData(seqBatchData.getDataType(), true);
BatchData batchData = new BatchData(seqBatchData.getDataType());
while (count < batchSize && hasNextInSeq() && hasNextInUnSeq()) {
long timeInSeq = seqBatchData.currentTime();
long timeInUnseq = unseqBatchData.currentTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -101,10 +100,10 @@ public static long collectFileSizes(List<TsFileResource> seqFiles, List<TsFileRe
}

public static int writeChunkWithoutUnseq(Chunk chunk, IChunkWriter chunkWriter) throws IOException {
AbstractChunkReader chunkReader = new ChunkReader(chunk, null);
ChunkReader chunkReader = new ChunkReader(chunk, null);
int ptWritten = 0;
while (chunkReader.hasNextBatch()) {
BatchData batchData = chunkReader.nextBatch();
while (chunkReader.hasNextSatisfiedPage()) {
BatchData batchData = chunkReader.nextPageData();
for (int i = 0; i < batchData.length(); i++) {
writeBatchPoint(batchData, i, chunkWriter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,27 @@
import static org.junit.Assert.assertFalse;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.junit.After;
Expand Down Expand Up @@ -71,6 +77,43 @@ public void tearDown() throws Exception {
}


@Test
public void testUnseqUnsealedDelete() throws QueryProcessException, IOException {
TSRecord record = new TSRecord(10000, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
processor.insert(new InsertPlan(record));
processor.waitForAllCurrentTsFileProcessorsClosed();


for (int j = 1; j <= 10; j++) {
record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
}

processor.getWorkUnSequenceTsFileProcessor().syncFlush();

for (int j = 11; j <= 20; j++) {
record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
}

processor.delete(deviceId, measurementId, 15L);

Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor.getWorkUnSequenceTsFileProcessor()
.query(deviceId, measurementId, TSDataType.INT32, Collections.emptyMap(), new QueryContext());

List<TimeValuePair> timeValuePairs = pair.left.getSortedTimeValuePairList();

long time = 16;
for (TimeValuePair timeValuePair : timeValuePairs) {
Assert.assertEquals(time++, timeValuePair.getTimestamp());
}

Assert.assertEquals(0, pair.right.size());
}

@Test
public void testSequenceSyncClose() throws QueryProcessException {
for (int j = 1; j <= 10; j++) {
Expand Down

0 comments on commit ffcbbd0

Please sign in to comment.