Skip to content

Commit

Permalink
Merge 8389530 into 3404293
Browse files Browse the repository at this point in the history
  • Loading branch information
samperson1997 committed Feb 6, 2020
2 parents 3404293 + 8389530 commit ec572f5
Show file tree
Hide file tree
Showing 7 changed files with 575 additions and 262 deletions.
Expand Up @@ -19,6 +19,11 @@

package org.apache.iotdb.db.query.dataset.groupby;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
Expand All @@ -28,19 +33,29 @@
import org.apache.iotdb.db.query.factory.AggreResultFactory;
import org.apache.iotdb.db.query.reader.series.IAggregateReader;
import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.*;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {

private List<IAggregateReader> seriesReaders;
/**
* Merges same series to one map. For example: Given: paths: s1, s2, s3, s1 and aggregations:
* count, sum, count, sum seriesMap: s1 -> 0, 3; s2 -> 2; s3 -> 3
*/
private Map<Path, List<Integer>> pathToAggrIndexesMap;

/**
* Maps path and its aggregate reader
*/
private Map<Path, IAggregateReader> aggregateReaders;
private List<BatchData> cachedBatchDataList;
private Filter timeFilter;
private GroupByPlan groupByPlan;
Expand All @@ -52,7 +67,8 @@ public GroupByWithoutValueFilterDataSet(QueryContext context, GroupByPlan groupB
throws StorageEngineException {
super(context, groupByPlan);

this.seriesReaders = new ArrayList<>();
this.pathToAggrIndexesMap = new HashMap<>();
this.aggregateReaders = new HashMap<>();
this.timeFilter = null;
this.cachedBatchDataList = new ArrayList<>();
for (int i = 0; i < paths.size(); i++) {
Expand All @@ -76,10 +92,15 @@ private void initGroupBy(QueryContext context, GroupByPlan groupByPlan)

for (int i = 0; i < paths.size(); i++) {
Path path = paths.get(i);
IAggregateReader seriesReader = new SeriesAggregateReader(path, dataTypes.get(i), context,
QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter),
timeFilter, null);
seriesReaders.add(seriesReader);
List<Integer> indexList = pathToAggrIndexesMap
.computeIfAbsent(path, key -> new ArrayList<>());
indexList.add(i);
if (!aggregateReaders.containsKey(path)) {
IAggregateReader seriesReader = new SeriesAggregateReader(path, dataTypes.get(i), context,
QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter),
timeFilter, null);
aggregateReaders.put(path, seriesReader);
}
}
}

Expand All @@ -91,83 +112,140 @@ protected RowRecord nextWithoutConstraint() throws IOException {
}
hasCachedTimeInterval = false;
RowRecord record = new RowRecord(curStartTime);
for (int i = 0; i < paths.size(); i++) {
AggregateResult res;
AggregateResult[] aggregateResultList = new AggregateResult[paths.size()];
for (Map.Entry<Path, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
List<AggregateResult> aggregateResults;
try {
res = nextIntervalAggregation(i);
aggregateResults = nextIntervalAggregation(entry);
} catch (QueryProcessException e) {
throw new IOException(e);
}
if (res == null) {
record.addField(new Field(null));
} else {
int index = 0;
for (int i : entry.getValue()) {
aggregateResultList[i] = aggregateResults.get(index);
index++;
}
}
if (aggregateResultList.length == 0) {
record.addField(new Field(null));
} else {
for (AggregateResult res : aggregateResultList) {
record.addField(res.getResult(), res.getDataType());
}
}
return record;
}

/**
* calculate the group by result of the series indexed by idx.
* calculate the group by result of one series
*
* @param idx series id
* @param pathToAggrIndexes entry of path to aggregation indexes map
*/
private AggregateResult nextIntervalAggregation(int idx)
throws IOException, QueryProcessException {
IAggregateReader reader = seriesReaders.get(idx);
AggregateResult result = AggreResultFactory
.getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(idx),
groupByPlan.getDeduplicatedDataTypes().get(idx));
private List<AggregateResult> nextIntervalAggregation(Map.Entry<Path,
List<Integer>> pathToAggrIndexes) throws IOException, QueryProcessException {
List<AggregateResult> aggregateResultList = new ArrayList<>();
List<BatchData> batchDataList = new ArrayList<>();
List<Boolean> isCalculatedList = new ArrayList<>();
List<Integer> indexList = pathToAggrIndexes.getValue();

TimeRange timeRange = new TimeRange(curStartTime, curEndTime - 1);
int remainingToCalculate = indexList.size();
TSDataType tsDataType = groupByPlan.getDeduplicatedDataTypes().get(indexList.get(0));

BatchData lastBatch = cachedBatchDataList.get(idx);
calcBatchData(result, lastBatch);
if (isEndCalc(result, lastBatch)) {
return result;
for (int index : indexList) {
AggregateResult result = AggreResultFactory
.getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(index), tsDataType);
aggregateResultList.add(result);

BatchData lastBatch = cachedBatchDataList.get(index);
batchDataList.add(lastBatch);

calcBatchData(result, lastBatch);
if (isEndCalc(result, lastBatch)) {
isCalculatedList.add(true);
remainingToCalculate--;
if (remainingToCalculate == 0) {
return aggregateResultList;
}
} else {
isCalculatedList.add(false);
}
}
TimeRange timeRange = new TimeRange(curStartTime, curEndTime - 1);
IAggregateReader reader = aggregateReaders.get(pathToAggrIndexes.getKey());

while (reader.hasNextChunk()) {
// cal by chunk statistics
Statistics chunkStatistics = reader.currentChunkStatistics();
if (chunkStatistics.getStartTime() >= curEndTime) {
return result;
return aggregateResultList;
}
if (reader.canUseCurrentChunkStatistics() && timeRange.contains(
new TimeRange(chunkStatistics.getStartTime(), chunkStatistics.getEndTime()))) {
result.updateResultFromStatistics(chunkStatistics);
if (result.isCalculatedAggregationResult()) {
return result;
for (int i = 0; i < aggregateResultList.size(); i++) {
if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
AggregateResult result = aggregateResultList.get(i);
result.updateResultFromStatistics(chunkStatistics);
if (result.isCalculatedAggregationResult()) {
isCalculatedList.set(i, true);
remainingToCalculate--;
if (remainingToCalculate == 0) {
return aggregateResultList;
}
}
}
}
reader.skipCurrentChunk();
continue;
}

while (reader.hasNextPage()) {
//cal by page statistics
Statistics pageStatistics = reader.currentPageStatistics();
if (pageStatistics.getStartTime() >= curEndTime) {
return result;
return aggregateResultList;
}
if (reader.canUseCurrentPageStatistics() && timeRange.contains(
new TimeRange(pageStatistics.getStartTime(), pageStatistics.getEndTime()))) {
result.updateResultFromStatistics(pageStatistics);
if (result.isCalculatedAggregationResult()) {
return result;
for (int i = 0; i < aggregateResultList.size(); i++) {
if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
AggregateResult result = aggregateResultList.get(i);
result.updateResultFromStatistics(pageStatistics);
if (result.isCalculatedAggregationResult()) {
isCalculatedList.set(i, true);
remainingToCalculate--;
if (remainingToCalculate == 0) {
return aggregateResultList;
}
}
}
}
reader.skipCurrentPage();
continue;
}
while (reader.hasNextOverlappedPage()) {
// cal by page data
BatchData batchData = reader.nextOverlappedPage();
calcBatchData(result, batchData);
if (batchData.hasCurrent()) {
cachedBatchDataList.set(idx, batchData);
}
if (isEndCalc(result, lastBatch)) {
break;
for (int i = 0; i < aggregateResultList.size(); i++) {
if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
AggregateResult result = aggregateResultList.get(i);
calcBatchData(result, batchData);
int idx = pathToAggrIndexes.getValue().get(i);
if (batchData.hasCurrent()) {
cachedBatchDataList.set(idx, batchData);
}
if (isEndCalc(result, batchDataList.get(i))) {
isCalculatedList.set(i, true);
remainingToCalculate--;
if (remainingToCalculate == 0) {
break;
}
}
}
}
}
}
}
return result;
return aggregateResultList;
}

private boolean isEndCalc(AggregateResult function, BatchData lastBatch) {
Expand All @@ -178,8 +256,7 @@ private boolean isEndCalc(AggregateResult function, BatchData lastBatch) {
/**
* this batchData >= curEndTime
*/
private void calcBatchData(AggregateResult result, BatchData batchData)
throws IOException {
private void calcBatchData(AggregateResult result, BatchData batchData) throws IOException {
if (batchData == null || !batchData.hasCurrent()) {
return;
}
Expand All @@ -188,6 +265,8 @@ private void calcBatchData(AggregateResult result, BatchData batchData)
}
if (batchData.hasCurrent()) {
result.updateResultFromPageData(batchData, curEndTime);
// reset batch data for next calculation
batchData.resetBatchData();
}
}
}
Expand Up @@ -19,6 +19,12 @@

package org.apache.iotdb.db.query.executor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.path.PathException;
Expand All @@ -36,16 +42,14 @@
import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;

import java.io.IOException;
import java.util.*;

public class AggregationExecutor {

private List<Path> selectedSeries;
Expand Down Expand Up @@ -80,9 +84,9 @@ public QueryDataSet executeWithoutValueFilter(QueryContext context)
}

//TODO use multi-thread
Map<Path, List<Integer>> seriesMap = mergeSameSeries(selectedSeries);
Map<Path, List<Integer>> pathToAggrIndexesMap = mergeSameSeries(selectedSeries);
AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()];
for (Map.Entry<Path, List<Integer>> entry : seriesMap.entrySet()) {
for (Map.Entry<Path, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
List<AggregateResult> aggregateResults = groupAggregationsBySeries(entry, timeFilter,
context);
int index = 0;
Expand All @@ -98,33 +102,35 @@ public QueryDataSet executeWithoutValueFilter(QueryContext context)
/**
* get aggregation result for one series
*
* @param series series map
* @param pathToAggrIndexes entry of path to aggregation indexes map
* @param timeFilter time filter
* @param context query context
* @param context query context
* @return AggregateResult list
*/
private List<AggregateResult> groupAggregationsBySeries(Map.Entry<Path, List<Integer>> series,
private List<AggregateResult> groupAggregationsBySeries(
Map.Entry<Path, List<Integer>> pathToAggrIndexes,
Filter timeFilter, QueryContext context)
throws IOException, QueryProcessException, StorageEngineException {
List<AggregateResult> aggregateResultList = new ArrayList<>();
List<Boolean> isCalculatedList = new ArrayList<>();
Path seriesPath = series.getKey();
TSDataType tsDataType = dataTypes.get(series.getValue().get(0));
Path seriesPath = pathToAggrIndexes.getKey();
TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0));
// construct series reader without value filter
IAggregateReader seriesReader = new SeriesAggregateReader(
series.getKey(), tsDataType, context, QueryResourceManager.getInstance()
pathToAggrIndexes.getKey(), tsDataType, context, QueryResourceManager.getInstance()
.getQueryDataSource(seriesPath, context, timeFilter), timeFilter, null);

for (int i : series.getValue()) {
for (int i : pathToAggrIndexes.getValue()) {
// construct AggregateResult
AggregateResult aggregateResult = AggreResultFactory
.getAggrResultByName(aggregations.get(i), tsDataType);
aggregateResultList.add(aggregateResult);
isCalculatedList.add(false);
}
int remainingToCalculate = series.getValue().size();
int remainingToCalculate = pathToAggrIndexes.getValue().size();

while (seriesReader.hasNextChunk()) {
// cal by chunk statistics
if (seriesReader.canUseCurrentChunkStatistics()) {
Statistics chunkStatistics = seriesReader.currentChunkStatistics();
for (int i = 0; i < aggregateResultList.size(); i++) {
Expand All @@ -144,7 +150,7 @@ private List<AggregateResult> groupAggregationsBySeries(Map.Entry<Path, List<Int
continue;
}
while (seriesReader.hasNextPage()) {
//cal by pageheader
//cal by page statistics
if (seriesReader.canUseCurrentPageStatistics()) {
Statistics pageStatistic = seriesReader.currentPageStatistics();
for (int i = 0; i < aggregateResultList.size(); i++) {
Expand All @@ -163,18 +169,20 @@ private List<AggregateResult> groupAggregationsBySeries(Map.Entry<Path, List<Int
seriesReader.skipCurrentPage();
continue;
}
//cal by pagedata
// cal by page data
while (seriesReader.hasNextOverlappedPage()) {
BatchData nextOverlappedPageData = seriesReader.nextOverlappedPage();
for (int i = 0; i < aggregateResultList.size(); i++) {
if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
AggregateResult aggregateResult = aggregateResultList.get(i);
aggregateResult.updateResultFromPageData(seriesReader.nextOverlappedPage());
aggregateResult.updateResultFromPageData(nextOverlappedPageData);
nextOverlappedPageData.resetBatchData();
if (aggregateResult.isCalculatedAggregationResult()) {
isCalculatedList.set(i, true);
remainingToCalculate--;
}
if (remainingToCalculate == 0) {
return aggregateResultList;
if (remainingToCalculate == 0) {
return aggregateResultList;
}
}
}
}
Expand Down

0 comments on commit ec572f5

Please sign in to comment.