Skip to content

Commit

Permalink
Merge 40893c4 into 9340de3
Browse files Browse the repository at this point in the history
  • Loading branch information
Jialin Qiao committed Feb 28, 2020
2 parents 9340de3 + 40893c4 commit d274bbd
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 103 deletions.
Expand Up @@ -25,6 +25,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
Expand All @@ -36,6 +39,7 @@
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.SingleDataSet;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.IAggregateReader;
import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
Expand All @@ -50,6 +54,9 @@
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AggregationExecutor {

Expand All @@ -58,6 +65,8 @@ public class AggregationExecutor {
private List<String> aggregations;
private IExpression expression;

private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();

/**
* aggregation batch calculation size.
**/
Expand All @@ -72,131 +81,169 @@ public AggregationExecutor(AggregationPlan aggregationPlan) {
}

/**
* execute aggregate function with only time filter or no filter.
*
* @param context query context
* Aggregate one series
*/
public QueryDataSet executeWithoutValueFilter(QueryContext context)
throws StorageEngineException, IOException, QueryProcessException {
class AggregationTask implements Callable<Pair<Path, List<AggregateResult>>> {

Filter timeFilter = null;
if (expression != null) {
timeFilter = ((GlobalTimeExpression) expression).getFilter();
// path to aggregation result indexes
private Map.Entry<Path, List<Integer>> pathToAggrIndexes;
private IAggregateReader reader;

public AggregationTask(IAggregateReader reader, Map.Entry<Path, List<Integer>> pathToAggrIndexes) {
this.reader = reader;
this.pathToAggrIndexes = pathToAggrIndexes;
}

// TODO use multi-thread
Map<Path, List<Integer>> pathToAggrIndexesMap = groupAggregationsBySeries(selectedSeries);
AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()];
for (Map.Entry<Path, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) {
List<AggregateResult> aggregateResults = aggregateOneSeries(entry, timeFilter, context);
int index = 0;
for (int i : entry.getValue()) {
aggregateResultList[i] = aggregateResults.get(index);
index++;
}
@Override
public Pair<Path, List<AggregateResult>> call() throws QueryProcessException, IOException {
return aggregateOneSeries();
}

return constructDataSet(Arrays.asList(aggregateResultList));
}
/**
* get aggregation result for one series
*
* @return AggregateResult list
*/
private Pair<Path, List<AggregateResult>> aggregateOneSeries()
throws IOException, QueryProcessException {
List<AggregateResult> aggregateResultList = new ArrayList<>();
List<Boolean> isCalculatedList = new ArrayList<>();
Path seriesPath = pathToAggrIndexes.getKey();

/**
* get aggregation result for one series
*
* @param pathToAggrIndexes entry of path to aggregation indexes map
* @param timeFilter time filter
* @param context query context
* @return AggregateResult list
*/
private List<AggregateResult> aggregateOneSeries(
Map.Entry<Path, List<Integer>> pathToAggrIndexes,
Filter timeFilter, QueryContext context)
throws IOException, QueryProcessException, StorageEngineException {
List<AggregateResult> aggregateResultList = new ArrayList<>();
List<Boolean> isCalculatedList = new ArrayList<>();
Path seriesPath = pathToAggrIndexes.getKey();
TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0));

// construct series reader without value filter
QueryDataSource queryDataSource = QueryResourceManager.getInstance()
.getQueryDataSource(seriesPath, context, timeFilter);
// update filter by TTL
timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);

IAggregateReader seriesReader = new SeriesAggregateReader(pathToAggrIndexes.getKey(),
tsDataType, context, queryDataSource, timeFilter, null, null);

for (int i : pathToAggrIndexes.getValue()) {
// construct AggregateResult
AggregateResult aggregateResult = AggregateResultFactory
.getAggrResultByName(aggregations.get(i), tsDataType);
aggregateResultList.add(aggregateResult);
isCalculatedList.add(false);
}
int remainingToCalculate = pathToAggrIndexes.getValue().size();

while (seriesReader.hasNextChunk()) {
// cal by chunk statistics
if (seriesReader.canUseCurrentChunkStatistics()) {
Statistics chunkStatistics = seriesReader.currentChunkStatistics();
for (int i = 0; i < aggregateResultList.size(); i++) {
if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
AggregateResult aggregateResult = aggregateResultList.get(i);
aggregateResult.updateResultFromStatistics(chunkStatistics);
if (aggregateResult.isCalculatedAggregationResult()) {
isCalculatedList.set(i, true);
remainingToCalculate--;
if (remainingToCalculate == 0) {
return aggregateResultList;
}
}
}
}
seriesReader.skipCurrentChunk();
continue;
for (int i : pathToAggrIndexes.getValue()) {
// construct AggregateResult
AggregateResult aggregateResult = AggregateResultFactory
.getAggrResultByName(aggregations.get(i), reader.getSeriesDataType());
aggregateResultList.add(aggregateResult);
isCalculatedList.add(false);
}
while (seriesReader.hasNextPage()) {
//cal by page statistics
if (seriesReader.canUseCurrentPageStatistics()) {
Statistics pageStatistic = seriesReader.currentPageStatistics();
int remainingToCalculate = pathToAggrIndexes.getValue().size();

while (reader.hasNextChunk()) {
// cal by chunk statistics
if (reader.canUseCurrentChunkStatistics()) {
Statistics chunkStatistics = reader.currentChunkStatistics();
for (int i = 0; i < aggregateResultList.size(); i++) {
if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
AggregateResult aggregateResult = aggregateResultList.get(i);
aggregateResult.updateResultFromStatistics(pageStatistic);
aggregateResult.updateResultFromStatistics(chunkStatistics);
if (aggregateResult.isCalculatedAggregationResult()) {
isCalculatedList.set(i, true);
remainingToCalculate--;
if (remainingToCalculate == 0) {
return aggregateResultList;
return new Pair<>(seriesPath, aggregateResultList);
}
}
}
}
seriesReader.skipCurrentPage();
reader.skipCurrentChunk();
continue;
}
// cal by page data
while (seriesReader.hasNextOverlappedPage()) {
BatchData nextOverlappedPageData = seriesReader.nextOverlappedPage();
for (int i = 0; i < aggregateResultList.size(); i++) {
if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
AggregateResult aggregateResult = aggregateResultList.get(i);
aggregateResult.updateResultFromPageData(nextOverlappedPageData);
nextOverlappedPageData.resetBatchData();
if (aggregateResult.isCalculatedAggregationResult()) {
isCalculatedList.set(i, true);
remainingToCalculate--;
if (remainingToCalculate == 0) {
return aggregateResultList;
while (reader.hasNextPage()) {
//cal by page statistics
if (reader.canUseCurrentPageStatistics()) {
Statistics pageStatistic = reader.currentPageStatistics();
for (int i = 0; i < aggregateResultList.size(); i++) {
if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
AggregateResult aggregateResult = aggregateResultList.get(i);
aggregateResult.updateResultFromStatistics(pageStatistic);
if (aggregateResult.isCalculatedAggregationResult()) {
isCalculatedList.set(i, true);
remainingToCalculate--;
if (remainingToCalculate == 0) {
return new Pair<>(seriesPath, aggregateResultList);
}
}
}
}
reader.skipCurrentPage();
continue;
}
// cal by page data
while (reader.hasNextOverlappedPage()) {
BatchData nextOverlappedPageData = reader.nextOverlappedPage();
for (int i = 0; i < aggregateResultList.size(); i++) {
if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
AggregateResult aggregateResult = aggregateResultList.get(i);
aggregateResult.updateResultFromPageData(nextOverlappedPageData);
nextOverlappedPageData.resetBatchData();
if (aggregateResult.isCalculatedAggregationResult()) {
isCalculatedList.set(i, true);
remainingToCalculate--;
if (remainingToCalculate == 0) {
return new Pair<>(seriesPath, aggregateResultList);
}
}
}
}
}
}
}
return new Pair<>(seriesPath, aggregateResultList);
}
}

/**
* execute aggregate function with only time filter or no filter.
*
* @param context query context
*/
public QueryDataSet executeWithoutValueFilter(QueryContext context)
throws StorageEngineException, QueryProcessException {

Filter timeFilter = null;
if (expression != null) {
timeFilter = ((GlobalTimeExpression) expression).getFilter();
}

AggregateResult[] finalAggregateResults = new AggregateResult[selectedSeries.size()];
Map<Path, List<Integer>> pathToAggrIndexesMap = groupAggregationsBySeries(selectedSeries);

/*
* submit AggregationTask for each series
*/
List<Future<Pair<Path, List<AggregateResult>>>> futureList = new ArrayList<>();
for (Map.Entry<Path, List<Integer>> pathToAggrIndex : pathToAggrIndexesMap.entrySet()) {

Path seriesPath = pathToAggrIndex.getKey();
// construct series reader without value filter
QueryDataSource queryDataSource = QueryResourceManager.getInstance()
.getQueryDataSource(seriesPath, context, timeFilter);
// update filter by TTL
timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);

TSDataType tsDataType = dataTypes.get(pathToAggrIndex.getValue().get(0));

IAggregateReader seriesReader = new SeriesAggregateReader(pathToAggrIndex.getKey(),
tsDataType, context, queryDataSource, timeFilter, null, null);

Future<Pair<Path, List<AggregateResult>>> future = pool
.submit(new AggregationTask(seriesReader, pathToAggrIndex));
futureList.add(future);
}

/*
* get AggregateResults for each series and put to final finalAggregateResults
*/
for (Future<Pair<Path, List<AggregateResult>>> future: futureList) {
try {
Pair<Path, List<AggregateResult>> currentSeriesResults = future.get();
// final result index of current series
List<Integer> resultIndexList = pathToAggrIndexesMap.get(currentSeriesResults.left);
List<AggregateResult> resultList = currentSeriesResults.right;

// put current series results to final finalAggregateResults
for (int i = 0; i < resultIndexList.size(); i++) {
finalAggregateResults[resultIndexList.get(i)] = resultList.get(i);
}
} catch (Exception e) {
throw new QueryProcessException(e.getMessage());
}
}
return aggregateResultList;
return constructDataSet(Arrays.asList(finalAggregateResults));
}


/**
* execute aggregate function with value filter.
*
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.query.reader.series;

import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;

Expand Down Expand Up @@ -50,4 +51,6 @@ public interface IAggregateReader {
boolean hasNextOverlappedPage() throws IOException;

BatchData nextOverlappedPage() throws IOException;

TSDataType getSeriesDataType();
}
Expand Up @@ -93,6 +93,11 @@ public BatchData nextOverlappedPage() throws IOException {
return seriesReader.nextOverlappedPage();
}

@Override
public TSDataType getSeriesDataType() {
return seriesReader.getSeriesDataType();
}


private boolean containedByTimeFilter(Statistics statistics) {
Filter timeFilter = seriesReader.getTimeFilter();
Expand Down
Expand Up @@ -53,6 +53,7 @@
public class SeriesReader {

private final Path seriesPath;

private final TSDataType dataType;
private final QueryContext context;
private final Filter timeFilter;
Expand Down Expand Up @@ -456,6 +457,10 @@ public Filter getTimeFilter() {
return timeFilter;
}

public TSDataType getSeriesDataType() {
return dataType;
}

private class VersionPair<T> {

protected long version;
Expand Down
Expand Up @@ -463,7 +463,7 @@ public void avgSumTest() {
}

@Test
public void avgSumErrorTest() throws SQLException {
public void avgSumErrorTest() {
try (Connection connection = DriverManager.
getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
Expand All @@ -474,7 +474,7 @@ public void avgSumErrorTest() throws SQLException {
resultSet.next();
fail();
} catch (Exception e) {
Assert.assertEquals("500: Unsupported data type in aggregation AVG : TEXT", e.getMessage());
Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation AVG : TEXT"));
}
try {
statement.execute("SELECT sum(s3)" +
Expand All @@ -483,7 +483,7 @@ public void avgSumErrorTest() throws SQLException {
resultSet.next();
fail();
} catch (Exception e) {
Assert.assertEquals("500: Unsupported data type in aggregation SUM : TEXT", e.getMessage());
Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation SUM : TEXT"));
}
try {
statement.execute("SELECT avg(s4)" +
Expand All @@ -492,7 +492,7 @@ public void avgSumErrorTest() throws SQLException {
resultSet.next();
fail();
} catch (Exception e) {
Assert.assertEquals("500: Unsupported data type in aggregation AVG : BOOLEAN", e.getMessage());
Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation AVG : BOOLEAN"));
}
try {
statement.execute("SELECT sum(s4)" +
Expand All @@ -501,7 +501,7 @@ public void avgSumErrorTest() throws SQLException {
resultSet.next();
fail();
} catch (Exception e) {
Assert.assertEquals("500: Unsupported data type in aggregation SUM : BOOLEAN", e.getMessage());
Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation SUM : BOOLEAN"));
}
} catch (Exception e) {
e.printStackTrace();
Expand Down

0 comments on commit d274bbd

Please sign in to comment.