Skip to content

Commit

Permalink
Revert "[IOTDB-520] Result of IBatchReader should not cross partition (
Browse files Browse the repository at this point in the history
…#845)"

This reverts commit 2b8b154.
  • Loading branch information
samperson1997 authored Feb 29, 2020
1 parent 9340de3 commit 797a2e3
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ BatchData nextBatch() throws IOException;
#### 一般使用流程

```
while (batchReader.hasNextBatch()) {
while (batchReader. hasNextBatch()) {
BatchData batchData = batchReader.nextBatch();
// use batchData to do some work
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
*/
package org.apache.iotdb.db.query.reader.universal;

import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.read.TimeValuePair;

import java.io.IOException;
import java.util.List;
import java.util.PriorityQueue;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.IPointReader;

/**
* This class implements {@link IPointReader} for data sources with different priorities.
Expand All @@ -34,8 +33,6 @@ public class PriorityMergeReader implements IPointReader {
// largest end time of all added readers
private long currentLargestEndTime;

private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

PriorityQueue<Element> heap = new PriorityQueue<>((o1, o2) -> {
int timeCompare = Long.compare(o1.timeValuePair.getTimestamp(),
o2.timeValuePair.getTimestamp());
Expand All @@ -61,24 +58,9 @@ public void addReader(IPointReader reader, long priority) throws IOException {
}

public void addReader(IPointReader reader, long priority, long endTime) throws IOException {
long partitionInterval = config.getPartitionInterval();
switch (config.getTimestampPrecision()) {
case "ns":
partitionInterval *= 1000_000_000L;
break;
case "us":
partitionInterval *= 1000_000L;
break;
default:
partitionInterval *= 1000;
break;
}
if (reader.hasNextTimeValuePair()) {
heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
long partition = reader.currentTimeValuePair().getTimestamp() / partitionInterval;
// set end time before current partition ends
currentLargestEndTime = Math.min((partition + 1) * partitionInterval - 1,
Math.max(currentLargestEndTime, endTime));
currentLargestEndTime = Math.max(currentLargestEndTime, endTime);
} else {
reader.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ public static void setUp() throws Exception {
tsFileConfig.setGroupSizeInByte(1024 * 1024 * 150);
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 16);

// test result of IBatchReader should not cross partition
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(2);

EnvironmentUtils.envSetUp();

insertData();
Expand All @@ -99,7 +96,6 @@ public static void tearDown() throws Exception {
tsFileConfig.setPageSizeInByte(pageSizeInByte);
tsFileConfig.setGroupSizeInByte(groupSizeInByte);
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(604800);

EnvironmentUtils.cleanEnv();
}
Expand Down

0 comments on commit 797a2e3

Please sign in to comment.