Skip to content

Commit

Permalink
[IOTDB-520] Result of IBatchReader should not cross partition (#845)
Browse files Browse the repository at this point in the history
* [IOTDB-520] Result of IBatchReader should not cross partition
  • Loading branch information
samperson1997 committed Feb 26, 2020
1 parent 8fdf9f8 commit 2b8b154
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 5 deletions.
Expand Up @@ -78,7 +78,7 @@ BatchData nextBatch() throws IOException;
#### 一般使用流程

```
while (batchReader. hasNextBatch()) {
while (batchReader.hasNextBatch()) {
BatchData batchData = batchReader.nextBatch();
// use batchData to do some work
Expand Down
Expand Up @@ -18,12 +18,13 @@
*/
package org.apache.iotdb.db.query.reader.universal;

import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.read.TimeValuePair;

import java.io.IOException;
import java.util.List;
import java.util.PriorityQueue;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.IPointReader;

/**
* This class implements {@link IPointReader} for data sources with different priorities.
Expand All @@ -33,6 +34,8 @@ public class PriorityMergeReader implements IPointReader {
// largest end time of all added readers
private long currentLargestEndTime;

private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

PriorityQueue<Element> heap = new PriorityQueue<>((o1, o2) -> {
int timeCompare = Long.compare(o1.timeValuePair.getTimestamp(),
o2.timeValuePair.getTimestamp());
Expand All @@ -58,9 +61,24 @@ public void addReader(IPointReader reader, long priority) throws IOException {
}

public void addReader(IPointReader reader, long priority, long endTime) throws IOException {
long partitionInterval = config.getPartitionInterval();
switch (config.getTimestampPrecision()) {
case "ns":
partitionInterval *= 1000_000_000L;
break;
case "us":
partitionInterval *= 1000_000L;
break;
default:
partitionInterval *= 1000;
break;
}
if (reader.hasNextTimeValuePair()) {
heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
currentLargestEndTime = Math.max(currentLargestEndTime, endTime);
long partition = reader.currentTimeValuePair().getTimestamp() / partitionInterval;
// set end time before current partition ends
currentLargestEndTime = Math.min((partition + 1) * partitionInterval - 1,
Math.max(currentLargestEndTime, endTime));
} else {
reader.close();
}
Expand Down
Expand Up @@ -80,6 +80,9 @@ public static void setUp() throws Exception {
tsFileConfig.setGroupSizeInByte(1024 * 1024 * 150);
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 16);

// test result of IBatchReader should not cross partition
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(2);

EnvironmentUtils.envSetUp();

insertData();
Expand All @@ -96,6 +99,7 @@ public static void tearDown() throws Exception {
tsFileConfig.setPageSizeInByte(pageSizeInByte);
tsFileConfig.setGroupSizeInByte(groupSizeInByte);
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(604800);

EnvironmentUtils.cleanEnv();
}
Expand Down

0 comments on commit 2b8b154

Please sign in to comment.