From 3fb340e1408cd6371426f3d58b1d2d4d74144b79 Mon Sep 17 00:00:00 2001 From: qiaojialin <646274302@qq.com> Date: Fri, 28 Feb 2020 17:30:07 +0800 Subject: [PATCH 1/2] parallel aggregation query --- .../query/executor/AggregationExecutor.java | 237 +++++++++++------- .../query/reader/series/IAggregateReader.java | 3 + .../reader/series/SeriesAggregateReader.java | 5 + .../db/query/reader/series/SeriesReader.java | 5 + .../db/integration/IoTDBAggregationIT.java | 10 +- .../IoTDBAggregationSmallDataIT.java | 2 +- 6 files changed, 161 insertions(+), 101 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index c39a392c3c6a4..a5f56c0305a37 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java @@ -25,6 +25,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.StorageEngineException; @@ -36,6 +39,7 @@ import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.dataset.SingleDataSet; import org.apache.iotdb.db.query.factory.AggregateResultFactory; +import org.apache.iotdb.db.query.pool.QueryTaskPoolManager; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; import org.apache.iotdb.db.query.reader.series.IAggregateReader; import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader; @@ -50,6 +54,9 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; +import org.apache.iotdb.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AggregationExecutor { @@ -58,6 +65,8 @@ public class AggregationExecutor { private List aggregations; private IExpression expression; + private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance(); + /** * aggregation batch calculation size. **/ @@ -72,131 +81,169 @@ public AggregationExecutor(AggregationPlan aggregationPlan) { } /** - * execute aggregate function with only time filter or no filter. - * - * @param context query context + * Aggregate one series */ - public QueryDataSet executeWithoutValueFilter(QueryContext context) - throws StorageEngineException, IOException, QueryProcessException { + class AggregationTask implements Callable>> { - Filter timeFilter = null; - if (expression != null) { - timeFilter = ((GlobalTimeExpression) expression).getFilter(); + // path to aggregation result indexes + private Map.Entry> pathToAggrIndexes; + private IAggregateReader reader; + + public AggregationTask(IAggregateReader reader, Map.Entry> pathToAggrIndexes) { + this.reader = reader; + this.pathToAggrIndexes = pathToAggrIndexes; } - // TODO use multi-thread - Map> pathToAggrIndexesMap = groupAggregationsBySeries(selectedSeries); - AggregateResult[] aggregateResultList = new AggregateResult[selectedSeries.size()]; - for (Map.Entry> entry : pathToAggrIndexesMap.entrySet()) { - List aggregateResults = aggregateOneSeries(entry, timeFilter, context); - int index = 0; - for (int i : entry.getValue()) { - aggregateResultList[i] = aggregateResults.get(index); - index++; - } + @Override + public Pair> call() throws QueryProcessException, IOException { + return aggregateOneSeries(); } - return constructDataSet(Arrays.asList(aggregateResultList)); - } + /** + * get aggregation result for one series + * + * @return AggregateResult list + */ + private Pair> aggregateOneSeries() + throws IOException, QueryProcessException { + List aggregateResultList = new ArrayList<>(); + List isCalculatedList = new ArrayList<>(); + Path seriesPath = pathToAggrIndexes.getKey(); - /** - * get aggregation result for one series - * - * @param pathToAggrIndexes entry of path to aggregation indexes map - * @param timeFilter time filter - * @param context query context - * @return AggregateResult list - */ - private List aggregateOneSeries( - Map.Entry> pathToAggrIndexes, - Filter timeFilter, QueryContext context) - throws IOException, QueryProcessException, StorageEngineException { - List aggregateResultList = new ArrayList<>(); - List isCalculatedList = new ArrayList<>(); - Path seriesPath = pathToAggrIndexes.getKey(); - TSDataType tsDataType = dataTypes.get(pathToAggrIndexes.getValue().get(0)); - - // construct series reader without value filter - QueryDataSource queryDataSource = QueryResourceManager.getInstance() - .getQueryDataSource(seriesPath, context, timeFilter); - // update filter by TTL - timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); - - IAggregateReader seriesReader = new SeriesAggregateReader(pathToAggrIndexes.getKey(), - tsDataType, context, queryDataSource, timeFilter, null, null); - - for (int i : pathToAggrIndexes.getValue()) { - // construct AggregateResult - AggregateResult aggregateResult = AggregateResultFactory - .getAggrResultByName(aggregations.get(i), tsDataType); - aggregateResultList.add(aggregateResult); - isCalculatedList.add(false); - } - int remainingToCalculate = pathToAggrIndexes.getValue().size(); - - while (seriesReader.hasNextChunk()) { - // cal by chunk statistics - if (seriesReader.canUseCurrentChunkStatistics()) { - Statistics chunkStatistics = seriesReader.currentChunkStatistics(); - for (int i = 0; i < aggregateResultList.size(); i++) { - if (Boolean.FALSE.equals(isCalculatedList.get(i))) { - AggregateResult aggregateResult = aggregateResultList.get(i); - aggregateResult.updateResultFromStatistics(chunkStatistics); - if (aggregateResult.isCalculatedAggregationResult()) { - isCalculatedList.set(i, true); - remainingToCalculate--; - if (remainingToCalculate == 0) { - return aggregateResultList; - } - } - } - } - seriesReader.skipCurrentChunk(); - continue; + for (int i : pathToAggrIndexes.getValue()) { + // construct AggregateResult + AggregateResult aggregateResult = AggregateResultFactory + .getAggrResultByName(aggregations.get(i), reader.getSeriesDataType()); + aggregateResultList.add(aggregateResult); + isCalculatedList.add(false); } - while (seriesReader.hasNextPage()) { - //cal by page statistics - if (seriesReader.canUseCurrentPageStatistics()) { - Statistics pageStatistic = seriesReader.currentPageStatistics(); + int remainingToCalculate = pathToAggrIndexes.getValue().size(); + + while (reader.hasNextChunk()) { + // cal by chunk statistics + if (reader.canUseCurrentChunkStatistics()) { + Statistics chunkStatistics = reader.currentChunkStatistics(); for (int i = 0; i < aggregateResultList.size(); i++) { if (Boolean.FALSE.equals(isCalculatedList.get(i))) { AggregateResult aggregateResult = aggregateResultList.get(i); - aggregateResult.updateResultFromStatistics(pageStatistic); + aggregateResult.updateResultFromStatistics(chunkStatistics); if (aggregateResult.isCalculatedAggregationResult()) { isCalculatedList.set(i, true); remainingToCalculate--; if (remainingToCalculate == 0) { - return aggregateResultList; + return new Pair<>(seriesPath, aggregateResultList); } } } } - seriesReader.skipCurrentPage(); + reader.skipCurrentChunk(); continue; } - // cal by page data - while (seriesReader.hasNextOverlappedPage()) { - BatchData nextOverlappedPageData = seriesReader.nextOverlappedPage(); - for (int i = 0; i < aggregateResultList.size(); i++) { - if (Boolean.FALSE.equals(isCalculatedList.get(i))) { - AggregateResult aggregateResult = aggregateResultList.get(i); - aggregateResult.updateResultFromPageData(nextOverlappedPageData); - nextOverlappedPageData.resetBatchData(); - if (aggregateResult.isCalculatedAggregationResult()) { - isCalculatedList.set(i, true); - remainingToCalculate--; - if (remainingToCalculate == 0) { - return aggregateResultList; + while (reader.hasNextPage()) { + //cal by page statistics + if (reader.canUseCurrentPageStatistics()) { + Statistics pageStatistic = reader.currentPageStatistics(); + for (int i = 0; i < aggregateResultList.size(); i++) { + if (Boolean.FALSE.equals(isCalculatedList.get(i))) { + AggregateResult aggregateResult = aggregateResultList.get(i); + aggregateResult.updateResultFromStatistics(pageStatistic); + if (aggregateResult.isCalculatedAggregationResult()) { + isCalculatedList.set(i, true); + remainingToCalculate--; + if (remainingToCalculate == 0) { + return new Pair<>(seriesPath, aggregateResultList); + } + } + } + } + reader.skipCurrentPage(); + continue; + } + // cal by page data + while (reader.hasNextOverlappedPage()) { + BatchData nextOverlappedPageData = reader.nextOverlappedPage(); + for (int i = 0; i < aggregateResultList.size(); i++) { + if (Boolean.FALSE.equals(isCalculatedList.get(i))) { + AggregateResult aggregateResult = aggregateResultList.get(i); + aggregateResult.updateResultFromPageData(nextOverlappedPageData); + nextOverlappedPageData.resetBatchData(); + if (aggregateResult.isCalculatedAggregationResult()) { + isCalculatedList.set(i, true); + remainingToCalculate--; + if (remainingToCalculate == 0) { + return new Pair<>(seriesPath, aggregateResultList); + } } } } } } } + return new Pair<>(seriesPath, aggregateResultList); + } + } + + /** + * execute aggregate function with only time filter or no filter. + * + * @param context query context + */ + public QueryDataSet executeWithoutValueFilter(QueryContext context) + throws StorageEngineException, QueryProcessException { + + Filter timeFilter = null; + if (expression != null) { + timeFilter = ((GlobalTimeExpression) expression).getFilter(); + } + + AggregateResult[] finalAggregateResults = new AggregateResult[selectedSeries.size()]; + Map> pathToAggrIndexesMap = groupAggregationsBySeries(selectedSeries); + + /* + * submit AggregationTask for each series + */ + List>>> futureList = new ArrayList<>(); + for (Map.Entry> pathToAggrIndex : pathToAggrIndexesMap.entrySet()) { + + Path seriesPath = pathToAggrIndex.getKey(); + // construct series reader without value filter + QueryDataSource queryDataSource = QueryResourceManager.getInstance() + .getQueryDataSource(seriesPath, context, timeFilter); + // update filter by TTL + timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); + + TSDataType tsDataType = dataTypes.get(pathToAggrIndex.getValue().get(0)); + + IAggregateReader seriesReader = new SeriesAggregateReader(pathToAggrIndex.getKey(), + tsDataType, context, queryDataSource, timeFilter, null, null); + + Future>> future = pool + .submit(new AggregationTask(seriesReader, pathToAggrIndex)); + futureList.add(future); + } + + /* + * get AggregateResults for each series and put to final finalAggregateResults + */ + for (Future>> future: futureList) { + try { + Pair> currentSeriesResults = future.get(); + // final result index of current series + List resultIndexList = pathToAggrIndexesMap.get(currentSeriesResults.left); + List resultList = currentSeriesResults.right; + + // put current series results to final finalAggregateResults + for (int i = 0; i < resultIndexList.size(); i++) { + finalAggregateResults[resultIndexList.get(i)] = resultList.get(i); + } + } catch (Exception e) { + throw new QueryProcessException(e.getMessage()); + } } - return aggregateResultList; + return constructDataSet(Arrays.asList(finalAggregateResults)); } + /** * execute aggregate function with value filter. * diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java index bd22f3eb984d7..58e6894ecc63c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.query.reader.series; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; @@ -50,4 +51,6 @@ public interface IAggregateReader { boolean hasNextOverlappedPage() throws IOException; BatchData nextOverlappedPage() throws IOException; + + TSDataType getSeriesDataType(); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java index d711f02a6763a..7081c6e60b7e9 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReader.java @@ -93,6 +93,11 @@ public BatchData nextOverlappedPage() throws IOException { return seriesReader.nextOverlappedPage(); } + @Override + public TSDataType getSeriesDataType() { + return seriesReader.getSeriesDataType(); + } + private boolean containedByTimeFilter(Statistics statistics) { Filter timeFilter = seriesReader.getTimeFilter(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java index 8b1f795436342..c08fcdc6e7e1e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java @@ -53,6 +53,7 @@ public class SeriesReader { private final Path seriesPath; + private final TSDataType dataType; private final QueryContext context; private final Filter timeFilter; @@ -456,6 +457,10 @@ public Filter getTimeFilter() { return timeFilter; } + public TSDataType getSeriesDataType() { + return dataType; + } + private class VersionPair { protected long version; diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java index da6d3bd5e6759..4d1d237e9abb5 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationIT.java @@ -463,7 +463,7 @@ public void avgSumTest() { } @Test - public void avgSumErrorTest() throws SQLException { + public void avgSumErrorTest() { try (Connection connection = DriverManager. getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { @@ -474,7 +474,7 @@ public void avgSumErrorTest() throws SQLException { resultSet.next(); fail(); } catch (Exception e) { - Assert.assertEquals("500: Unsupported data type in aggregation AVG : TEXT", e.getMessage()); + Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation AVG : TEXT")); } try { statement.execute("SELECT sum(s3)" + @@ -483,7 +483,7 @@ public void avgSumErrorTest() throws SQLException { resultSet.next(); fail(); } catch (Exception e) { - Assert.assertEquals("500: Unsupported data type in aggregation SUM : TEXT", e.getMessage()); + Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation SUM : TEXT")); } try { statement.execute("SELECT avg(s4)" + @@ -492,7 +492,7 @@ public void avgSumErrorTest() throws SQLException { resultSet.next(); fail(); } catch (Exception e) { - Assert.assertEquals("500: Unsupported data type in aggregation AVG : BOOLEAN", e.getMessage()); + Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation AVG : BOOLEAN")); } try { statement.execute("SELECT sum(s4)" + @@ -501,7 +501,7 @@ public void avgSumErrorTest() throws SQLException { resultSet.next(); fail(); } catch (Exception e) { - Assert.assertEquals("500: Unsupported data type in aggregation SUM : BOOLEAN", e.getMessage()); + Assert.assertTrue(e.getMessage().contains("Unsupported data type in aggregation SUM : BOOLEAN")); } } catch (Exception e) { e.printStackTrace(); diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java index 05fbcb1057a95..59fef215aef68 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAggregationSmallDataIT.java @@ -205,7 +205,7 @@ public void functionsNoFilterTest() throws ClassNotFoundException, SQLException "SELECT max_value(d0.s0),max_value(d1.s1),max_value(d0.s3) FROM root.vehicle"); fail(); } catch (IoTDBSQLException e) { - Assert.assertEquals("500: Binary statistics does not support: max", e.getMessage()); + Assert.assertTrue(e.getMessage().contains("Binary statistics does not support: max")); } hasResultSet = statement.execute( From 40893c4d317cf9a1b7a23717241f0f322099df08 Mon Sep 17 00:00:00 2001 From: qiaojialin <646274302@qq.com> Date: Fri, 28 Feb 2020 21:09:00 +0800 Subject: [PATCH 2/2] fix bug --- .../apache/iotdb/tsfile/read/reader/BatchDataIterator.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/BatchDataIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/BatchDataIterator.java index 72e292ca6debd..384a1cbf0b5f0 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/BatchDataIterator.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/BatchDataIterator.java @@ -20,13 +20,14 @@ import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.BatchData; - import java.io.IOException; public class BatchDataIterator implements IPointReader { private BatchData batchData; + private TimeValuePair currentTimeValue; + public BatchDataIterator(BatchData batchData) { this.batchData = batchData; } @@ -39,13 +40,14 @@ public boolean hasNextTimeValuePair() { @Override public TimeValuePair nextTimeValuePair() { TimeValuePair timeValuePair = new TimeValuePair(batchData.currentTime(), batchData.currentTsPrimitiveType()); + currentTimeValue = timeValuePair; batchData.next(); return timeValuePair; } @Override public TimeValuePair currentTimeValuePair() { - return new TimeValuePair(batchData.currentTime(), batchData.currentTsPrimitiveType()); + return currentTimeValue; } @Override