Skip to content

Commit

Permalink
Merge c4fc9c3 into 9fef008
Browse files Browse the repository at this point in the history
  • Loading branch information
samperson1997 committed Feb 26, 2020
2 parents 9fef008 + c4fc9c3 commit b2fde97
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 7 deletions.
Expand Up @@ -78,7 +78,7 @@ BatchData nextBatch() throws IOException;
#### 一般使用流程

```
while (batchReader. hasNextBatch()) {
while (batchReader.hasNextBatch()) {
BatchData batchData = batchReader.nextBatch();
// use batchData to do some work
Expand Down
Expand Up @@ -143,8 +143,8 @@ public class StorageGroupProcessor {
// includes sealed and unsealed sequence TsFiles
private TreeSet<TsFileResource> sequenceFileTreeSet = new TreeSet<>(
(o1, o2) -> {
int rangeCompare = o1.getFile().getParentFile().getName()
.compareTo(o2.getFile().getParentFile().getName());
int rangeCompare = Long.compare(Long.parseLong(o1.getFile().getParentFile().getName()),
Long.parseLong(o2.getFile().getParentFile().getName()));
return rangeCompare == 0 ? compareFileName(o1.getFile(), o2.getFile()) : rangeCompare;
});

Expand Down
Expand Up @@ -18,12 +18,13 @@
*/
package org.apache.iotdb.db.query.reader.universal;

import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.read.TimeValuePair;

import java.io.IOException;
import java.util.List;
import java.util.PriorityQueue;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.IPointReader;

/**
* This class implements {@link IPointReader} for data sources with different priorities.
Expand All @@ -33,6 +34,8 @@ public class PriorityMergeReader implements IPointReader {
// largest end time of all added readers
private long currentLargestEndTime;

private final static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

PriorityQueue<Element> heap = new PriorityQueue<>((o1, o2) -> {
int timeCompare = Long.compare(o1.timeValuePair.getTimestamp(),
o2.timeValuePair.getTimestamp());
Expand All @@ -58,9 +61,24 @@ public void addReader(IPointReader reader, long priority) throws IOException {
}

public void addReader(IPointReader reader, long priority, long endTime) throws IOException {
long partitionInterval = config.getPartitionInterval();
switch (config.getTimestampPrecision()) {
case "ns":
partitionInterval *= 1000_000_000L;
break;
case "us":
partitionInterval *= 1000_000L;
break;
default:
partitionInterval *= 1000;
break;
}
if (reader.hasNextTimeValuePair()) {
heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
currentLargestEndTime = Math.max(currentLargestEndTime, endTime);
int partition = Math.round(reader.currentTimeValuePair().getTimestamp() / partitionInterval);
// set end time before current partition ends
currentLargestEndTime = Math.min((partition + 1) * partitionInterval - 1,
Math.max(currentLargestEndTime, endTime));
} else {
reader.close();
}
Expand Down
Expand Up @@ -80,6 +80,9 @@ public static void setUp() throws Exception {
tsFileConfig.setGroupSizeInByte(1024 * 1024 * 150);
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 16);

// test result of IBatchReader should not cross partition
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(6);

EnvironmentUtils.envSetUp();

insertData();
Expand All @@ -96,6 +99,7 @@ public static void tearDown() throws Exception {
tsFileConfig.setPageSizeInByte(pageSizeInByte);
tsFileConfig.setGroupSizeInByte(groupSizeInByte);
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(604800);

EnvironmentUtils.cleanEnv();
}
Expand Down

0 comments on commit b2fde97

Please sign in to comment.