Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/optimize_series_reader' into opt…
Browse files Browse the repository at this point in the history
…imize_series_reader
  • Loading branch information
qiaojialin committed Mar 2, 2020
2 parents 80456bd + 6ba2969 commit 23d620a
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
Expand All @@ -35,13 +33,11 @@
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
import org.apache.iotdb.db.query.reader.series.IAggregateReader;
import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.common.TimeRange;
Expand Down Expand Up @@ -81,15 +77,11 @@ private void initGroupBy(QueryContext context, GroupByPlan groupByPlan)

for (int i = 0; i < paths.size(); i++) {
Path path = paths.get(i);

QueryDataSource queryDataSource = QueryResourceManager.getInstance()
.getQueryDataSource(path, context, timeFilter);
// update filter by TTL
timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
//init reader
pathExecutors.putIfAbsent(path,
new GroupByExecutor(path, dataTypes.get(i), context, queryDataSource, timeFilter));

if (!pathExecutors.containsKey(path)) {
//init reader
pathExecutors.put(path,
new GroupByExecutor(path, dataTypes.get(i), context, timeFilter));
}
AggregateResult aggrResult = AggregateResultFactory
.getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(i),
dataTypes.get(i));
Expand All @@ -107,53 +99,47 @@ protected RowRecord nextWithoutConstraint() throws IOException {
RowRecord record = new RowRecord(curStartTime);
timeRange = new TimeRange(curStartTime, curEndTime - 1);

final AggregateResult[] fields = new AggregateResult[paths.size()];
final List<Future> asyncResult = new ArrayList(pathExecutors.size());

for (Entry<Path, GroupByExecutor> executorEntry : pathExecutors.entrySet()) {
asyncResult.add(QueryTaskPoolManager.getInstance().submit((Callable<?>) () -> {
GroupByExecutor executor = executorEntry.getValue();
AggregateResult[] fields = new AggregateResult[paths.size()];

try {
for (Entry<Path, GroupByExecutor> pathGroupByExecutorEntry : pathExecutors.entrySet()) {
GroupByExecutor executor = pathGroupByExecutorEntry.getValue();
executor.resetAggregateResults();
List<Pair<AggregateResult, Integer>> aggregations = executor.calcResult();
for (int i = 0; i < aggregations.size(); i++) {
fields[aggregations.get(i).right] = aggregations.get(i).left;
}
return null;
}));
}
//waiting for data
for (Future future : asyncResult) {
try {
future.get();
} catch (Exception e) {
logger.error("GroupByWithoutValueFilterDataSet execute has error,{}", e);
throw new IOException(e);
}
} catch (QueryProcessException e) {
logger.error("GroupByWithoutValueFilterDataSet execute has error,{}", e);
throw new IOException(e);
}

for (AggregateResult res : fields) {
if (res == null) {
record.addField(new Field(null));
record.addField(null);
continue;
}
record.addField(res.getResult(), res.getResultDataType());
}
return record;
}


private class GroupByExecutor {

private IAggregateReader reader;
private BatchData preCachedData;
//<aggFunction - indexForRecord> of path
private List<Pair<AggregateResult, Integer>> results = new ArrayList<>();

public GroupByExecutor(Path path, TSDataType dataType, QueryContext context,
QueryDataSource dataSource, Filter timeFilter) {
this.reader = new SeriesAggregateReader(path, dataType, context,
dataSource, timeFilter, null, null);
public GroupByExecutor(Path path, TSDataType dataType, QueryContext context, Filter timeFilter)
throws StorageEngineException {
QueryDataSource queryDataSource = QueryResourceManager.getInstance()
.getQueryDataSource(path, context, timeFilter);
// update filter by TTL
timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
this.reader = new SeriesAggregateReader(path, dataType, context, queryDataSource, timeFilter,
null, null);
this.preCachedData = null;
}

Expand All @@ -177,7 +163,8 @@ public boolean isEndCalc() {
public boolean calcFromCacheData() throws IOException {
calcFromBatch(preCachedData);
// The result is calculated from the cache
if ((preCachedData != null && preCachedData.getMaxTimestamp() >= curEndTime) || isEndCalc()) {
if ((preCachedData != null && preCachedData.getMaxTimestamp() >= curEndTime)
|| isEndCalc()) {
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ private boolean hasNext(long timestamp) throws IOException {
* consume chunk secondly
*/
while (seriesReader.hasNextChunk()) {
Statistics statistics = seriesReader.currentChunkStatistics();
if (!satisfyTimeFilter(statistics)) {
seriesReader.skipCurrentChunk();
continue;
}
if (readPageData(timestamp)) {
return true;
}
Expand Down

0 comments on commit 23d620a

Please sign in to comment.