Skip to content

Commit

Permalink
simplify GroupByExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
qiaojialin committed Mar 13, 2020
1 parent 24540dd commit 596e1c1
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 41 deletions.
Expand Up @@ -531,18 +531,17 @@ private String getSgByEngineFile(File file) {
*
* @return TsFiles (seq or unseq) grouped by their storage group and partition number.
*/
public Map<String, Map<Integer, List<TsFileResource>>> getAllClosedStorageGroupTsFile() {
Map<String, Map<Integer, List<TsFileResource>>> ret = new HashMap<>();
for (Entry<String, StorageGroupProcessor> entry : processorMap
.entrySet()) {
public Map<String, Map<Long, List<TsFileResource>>> getAllClosedStorageGroupTsFile() {
Map<String, Map<Long, List<TsFileResource>>> ret = new HashMap<>();
for (Entry<String, StorageGroupProcessor> entry : processorMap.entrySet()) {
List<TsFileResource> sequenceFiles = entry.getValue().getSequenceFileTreeSet();
for (TsFileResource sequenceFile : sequenceFiles) {
if (!sequenceFile.isClosed()) {
continue;
}
String[] fileSplits = FilePathUtils.splitTsFilePath(sequenceFile);
int partitionNum = Integer.parseInt(fileSplits[fileSplits.length - 2]);
Map<Integer, List<TsFileResource>> storageGroupFiles = ret.computeIfAbsent(entry.getKey()
long partitionNum = Long.parseLong(fileSplits[fileSplits.length - 2]);
Map<Long, List<TsFileResource>> storageGroupFiles = ret.computeIfAbsent(entry.getKey()
,n -> new HashMap<>());
storageGroupFiles.computeIfAbsent(partitionNum, n -> new ArrayList<>()).add(sequenceFile);
}
Expand Down
Expand Up @@ -24,6 +24,10 @@

public class AggregationPlan extends RawDataQueryPlan {

// e.g., for select count(s1), count(s1), count(s2), count(s2), sum (s1)
// aggregations are count, count, count, count, sum
// deduplicatedAggregations are count, count, sum

private List<String> aggregations = new ArrayList<>();
private List<String> deduplicatedAggregations = new ArrayList<>();

Expand Down
Expand Up @@ -25,10 +25,19 @@
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.tsfile.utils.Pair;


/**
* Each executor calculates results of all aggregations on this series
*/
public interface GroupByExecutor {
void addAggregateResult(AggregateResult aggrResult, int index);

void resetAggregateResults();
/**
* add reusable result cache in executor
*/
void addAggregateResult(AggregateResult aggrResult);

List<Pair<AggregateResult, Integer>> calcResult(long curStartTime, long curEndTime) throws IOException, QueryProcessException;
/**
* calculate result in [curStartTime, curEndTime)
*/
List<AggregateResult> calcResult(long curStartTime, long curEndTime) throws IOException, QueryProcessException;
}
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.query.dataset.groupby;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -48,6 +49,19 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet {

private Map<Path, GroupByExecutor> pathExecutors = new HashMap<>();

/**
* path -> result index for each aggregation
*
* e.g.,
*
* deduplicated paths : s1, s2, s1
* deduplicated aggregations : count, count, sum
*
* s1 -> 0, 2
* s2 -> 1
*/
private Map<Path, List<Integer>> resultIndexes = new HashMap<>();

public GroupByWithoutValueFilterDataSet() {
}

Expand All @@ -70,17 +84,19 @@ protected void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
timeFilter = ((GlobalTimeExpression) expression).getFilter();
}

// init resultIndexes, group result indexes by path
for (int i = 0; i < paths.size(); i++) {
Path path = paths.get(i);
if (!pathExecutors.containsKey(path)) {
//init GroupByExecutor
pathExecutors.put(path,
getGroupByExecutor(path, dataTypes.get(i), context, timeFilter, null));
getGroupByExecutor(path, dataTypes.get(i), context, timeFilter, null));
resultIndexes.put(path, new ArrayList<>());
}
resultIndexes.get(path).add(i);
AggregateResult aggrResult = AggregateResultFactory
.getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(i),
dataTypes.get(i));
pathExecutors.get(path).addAggregateResult(aggrResult, i);
.getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(i), dataTypes.get(i));
pathExecutors.get(path).addAggregateResult(aggrResult);
}
}

Expand All @@ -96,12 +112,12 @@ protected RowRecord nextWithoutConstraint() throws IOException {
AggregateResult[] fields = new AggregateResult[paths.size()];

try {
for (Entry<Path, GroupByExecutor> pathGroupByExecutorEntry : pathExecutors.entrySet()) {
GroupByExecutor executor = pathGroupByExecutorEntry.getValue();
executor.resetAggregateResults();
List<Pair<AggregateResult, Integer>> aggregations = executor.calcResult(curStartTime, curEndTime);
for (Pair<AggregateResult, Integer> aggregation : aggregations) {
fields[aggregation.right] = aggregation.left;
for (Entry<Path, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) {
GroupByExecutor executor = pathToExecutorEntry.getValue();
List<AggregateResult> aggregations = executor.calcResult(curStartTime, curEndTime);
for (int i = 0; i < aggregations.size(); i++) {
int resultIndex = resultIndexes.get(pathToExecutorEntry.getKey()).get(i);
fields[resultIndex] = aggregations.get(i);
}
}
} catch (QueryProcessException e) {
Expand Down
Expand Up @@ -43,8 +43,9 @@ public class LocalGroupByExecutor implements GroupByExecutor {

private IAggregateReader reader;
private BatchData preCachedData;
//<aggFunction - indexForRecord> of path
private List<Pair<AggregateResult, Integer>> results = new ArrayList<>();

// Aggregate result buffer of this path
private List<AggregateResult> results = new ArrayList<>();
private TimeRange timeRange;

public LocalGroupByExecutor(Path path, TSDataType dataType, QueryContext context, Filter timeFilter,
Expand All @@ -61,13 +62,13 @@ public LocalGroupByExecutor(Path path, TSDataType dataType, QueryContext context
}

@Override
public void addAggregateResult(AggregateResult aggrResult, int index) {
results.add(new Pair<>(aggrResult, index));
public void addAggregateResult(AggregateResult aggrResult) {
results.add(aggrResult);
}

private boolean isEndCalc() {
for (Pair<AggregateResult, Integer> result : results) {
if (!result.left.isCalculatedAggregationResult()) {
for (AggregateResult result : results) {
if (!result.isCalculatedAggregationResult()) {
return false;
}
}
Expand All @@ -90,9 +91,9 @@ private void calcFromBatch(BatchData batchData, long curStartTime, long curEndTi
return;
}

for (Pair<AggregateResult, Integer> result : results) {
for (AggregateResult result : results) {
//current agg method has been calculated
if (result.left.isCalculatedAggregationResult()) {
if (result.isCalculatedAggregationResult()) {
continue;
}
//lazy reset batch data for calculation
Expand All @@ -102,7 +103,7 @@ private void calcFromBatch(BatchData batchData, long curStartTime, long curEndTi
batchData.next();
}
if (batchData.hasCurrent()) {
result.left.updateResultFromPageData(batchData, curEndTime);
result.updateResultFromPageData(batchData, curEndTime);
}
}
//can calc for next interval
Expand All @@ -113,18 +114,24 @@ private void calcFromBatch(BatchData batchData, long curStartTime, long curEndTi

private void calcFromStatistics(Statistics pageStatistics)
throws QueryProcessException {
for (Pair<AggregateResult, Integer> result : results) {
for (AggregateResult result : results) {
//cacl is compile
if (result.left.isCalculatedAggregationResult()) {
if (result.isCalculatedAggregationResult()) {
continue;
}
result.left.updateResultFromStatistics(pageStatistics);
result.updateResultFromStatistics(pageStatistics);
}
}

@Override
public List<Pair<AggregateResult, Integer>> calcResult(long curStartTime, long curEndTime)
public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
throws IOException, QueryProcessException {

// clear result cache
for (AggregateResult result : results) {
result.reset();
}

timeRange.set(curStartTime, curEndTime - 1);
if (calcFromCacheData(curStartTime, curEndTime)) {
return results;
Expand Down Expand Up @@ -155,15 +162,6 @@ public List<Pair<AggregateResult, Integer>> calcResult(long curStartTime, long c
return results;
}

// clear all results
@Override
public void resetAggregateResults() {
for (Pair<AggregateResult, Integer> result : results) {
result.left.reset();
}
}


private boolean readAndCalcFromPage(long curStartTime, long curEndTime) throws IOException,
QueryProcessException {
while (reader.hasNextPage()) {
Expand Down

0 comments on commit 596e1c1

Please sign in to comment.