From 8eb542898175a8b4d36e65d17e4909e080e831c9 Mon Sep 17 00:00:00 2001 From: Beyyes Date: Mon, 14 Oct 2024 10:10:53 +0800 Subject: [PATCH 1/5] add group by all id test --- .../IoTDBMultiIDsWithAttributesTableIT.java | 35 +++++++++++++++++++ .../TableAggregationTableScanOperator.java | 12 ++++--- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java index 8933d848159af..b62ea9bbed848 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java @@ -605,6 +605,41 @@ public void countStarTest() { // TODO select count(*),count(t1) from (select avg(num+1) as t1 from table0) where time < 0 } + @Test + public void groupByAggregationTest() { + expectedHeader = + new String[] { + "device", + "level", + "count_num", + "count_star", + "count_device", + "count_date", + "count_attr1", + "count_attr2", + "count_time" + }; + retArray = + new String[] { + "d1,l1,3,3,3,0,3,3,3,", + "d1,l2,3,3,3,0,3,3,3,", + "d1,l3,3,3,3,0,3,3,3,", + "d1,l4,3,3,3,0,0,0,3,", + "d1,l5,3,3,3,1,0,0,3,", + "d2,l1,3,3,3,0,3,3,3,", + "d2,l2,3,3,3,0,3,0,3,", + "d2,l3,3,3,3,0,0,0,3,", + "d2,l4,3,3,3,0,0,0,3,", + "d2,l5,3,3,3,1,0,0,3,", + }; + String sql = + "select device, level, " + + "count(num) as count_num, count(*) as count_star, count(device) as count_device, count(date) as count_date, " + + "count(attr1) as count_attr1, count(attr2) as count_attr2, count(time) as count_time " + + "from table0 group by device,level order by device, level"; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + } + // ============================ Join Test =========================== // no filter @Test diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java index 1718f460e58d7..3b3700479d6f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java @@ -616,7 +616,7 @@ protected List getResultDataTypes() { (groupingKeyIndex != null ? groupingKeyIndex.length : 0) + tableAggregators.size()); if (groupingKeyIndex != null) { for (int i = 0; i < groupingKeyIndex.length; i++) { - resultDataTypes.add(TSDataType.TEXT); + resultDataTypes.add(TSDataType.STRING); } } @@ -636,8 +636,12 @@ public void appendAggregationResult( for (int i = 0; i < groupingKeyIndex.length; i++) { if (TsTableColumnCategory.ID == groupingKeySchemas.get(i).getColumnCategory()) { columnBuilders[i].writeBinary( - (Binary) - deviceEntries.get(currentDeviceIndex).getNthSegment(groupingKeyIndex[i] + 1)); + new Binary( + ((String) + deviceEntries + .get(currentDeviceIndex) + .getNthSegment(groupingKeyIndex[i] + 1)) + .getBytes())); } else { columnBuilders[i].writeBinary( new Binary( @@ -653,7 +657,7 @@ public void appendAggregationResult( int groupKeyLength = groupingKeyIndex == null ? 0 : groupingKeyIndex.length; for (int i = 0; i < aggregators.size(); i++) { - aggregators.get(groupKeyLength + i).evaluate(columnBuilders[groupKeyLength + i]); + aggregators.get(i).evaluate(columnBuilders[groupKeyLength + i]); } tsBlockBuilder.declarePosition(); From 0319dc6c68b7b7ab5cacff9e4315918a2125f18d Mon Sep 17 00:00:00 2001 From: Beyyes Date: Mon, 14 Oct 2024 18:31:30 +0800 Subject: [PATCH 2/5] add group by datebin impl --- .../relational/DateBinTimeRangeIterator.java | 124 ++++++++++ .../TableAggregationTableScanOperator.java | 213 ++++++++++++------ .../plan/planner/TableOperatorGenerator.java | 70 ++++-- .../DateBinFunctionColumnTransformer.java | 38 ++++ 4 files changed, 358 insertions(+), 87 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DateBinTimeRangeIterator.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DateBinTimeRangeIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DateBinTimeRangeIterator.java new file mode 100644 index 0000000000000..484198d3f85ca --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DateBinTimeRangeIterator.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITimeRangeIterator; +import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.DateBinFunctionColumnTransformer; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.read.common.TimeRange; + +public class DateBinTimeRangeIterator implements ITimeRangeIterator { + + DateBinFunctionColumnTransformer dateBinTransformer; + + boolean finished = false; + + // total query [startTime, endTime) + private long startTime; + private long endTime; + + private TimeRange curTimeRange; + private boolean hasCachedTimeRange; + + public DateBinTimeRangeIterator(DateBinFunctionColumnTransformer dateBinTransformer) { + this.dateBinTransformer = dateBinTransformer; + } + + public void updateCurTimeRange(Column timeColumn) { + // long startTime = timeColumn.getLong(); + this.hasCachedTimeRange = true; + } + + public boolean overCurrentTimeRange(long startTime) { + if (!hasCachedTimeRange) { + return false; + } + + return dateBinTransformer.dateBin(startTime) > curTimeRange.getMin(); + } + + public TimeRange updateCurTimeRange(long startTime) { + long[] timeArray = dateBinTransformer.dateBinStartEnd(startTime); + + if (hasCachedTimeRange) { + // meet new time range, remove old time range + if (timeArray[0] != curTimeRange.getMin()) { + this.curTimeRange = new TimeRange(timeArray[0], timeArray[1] - 1); + } + } else { + this.curTimeRange = new TimeRange(timeArray[0], timeArray[1] - 1); + this.hasCachedTimeRange = true; + } + + return this.curTimeRange; + } + + public void setFinished() { + this.finished = true; + } + + @Override + public TimeRange getFirstTimeRange() { + throw new IllegalStateException( + "Method getFirstTimeRange is not used in DateBinTimeRangeIterator"); + } + + @Override + public boolean hasNextTimeRange() { + return !finished; + } + + public TimeRange getCurTimeRange() { + return this.curTimeRange; + } + + public void resetCurTimeRange() { + this.curTimeRange = null; + this.hasCachedTimeRange = false; + } + + @Override + public TimeRange nextTimeRange() { + if (hasCachedTimeRange) { + return curTimeRange; + } + // hasCachedTimeRange = false; + // curTimeRange = null; + return null; + } + + @Override + public boolean isAscending() { + return false; + } + + @Override + public long currentOutputTime() { + throw new IllegalStateException( + "Method currentOutputTime is not used in DateBinTimeRangeIterator"); + } + + @Override + public long getTotalIntervalNum() { + throw new IllegalStateException( + "Method getTotalIntervalNum is not used in DateBinTimeRangeIterator"); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java index 3b3700479d6f6..cfba24133fc39 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java @@ -35,6 +35,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.DateBinFunctionColumnTransformer; import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; @@ -67,6 +68,8 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregationScanOperator { + // TODO(beyyes) variable groupBy is no need in table model + private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(TableAggregationTableScanOperator.class); @@ -105,6 +108,10 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation private int currentDeviceIndex; + DateBinFunctionColumnTransformer dateBinTransformer; + int dateBinColumnIdx; + DateBinTimeRangeIterator dateBinTimeRangeIterator; + public TableAggregationTableScanOperator( PlanNodeId sourceId, OperatorContext context, @@ -125,7 +132,9 @@ public TableAggregationTableScanOperator( GroupByTimeParameter groupByTimeParameter, long maxReturnSize, boolean canUseStatistics, - int[] layoutArray) { + int[] layoutArray, + DateBinFunctionColumnTransformer dateBinTransformer, + int dateBinColumnIdx) { super( sourceId, @@ -158,6 +167,11 @@ public TableAggregationTableScanOperator( measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); this.currentDeviceIndex = 0; this.layoutArray = layoutArray; + this.dateBinTransformer = dateBinTransformer; + if (dateBinTransformer != null) { + this.dateBinColumnIdx = dateBinColumnIdx; + this.dateBinTimeRangeIterator = new DateBinTimeRangeIterator(dateBinTransformer); + } this.maxReturnSize = maxReturnSize; this.maxTsBlockLineNum = maxTsBlockLineNum; @@ -167,7 +181,7 @@ public TableAggregationTableScanOperator( @Override public boolean hasNext() throws Exception { - return !isFinished(); + return curTimeRange != null || dateBinTimeRangeIterator.hasNextTimeRange(); } @Override @@ -183,60 +197,67 @@ public TsBlock next() throws Exception { // start stopwatch, reset leftRuntimeOfOneNextCall long start = System.nanoTime(); - leftRuntimeOfOneNextCall = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + leftRuntimeOfOneNextCall = 1000 * operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); long maxRuntime = leftRuntimeOfOneNextCall; while (System.nanoTime() - start < maxRuntime - && (curTimeRange != null || timeRangeIterator.hasNextTimeRange()) + && (curTimeRange != null || dateBinTimeRangeIterator.hasNextTimeRange()) && !resultTsBlockBuilder.isFull()) { + if (curTimeRange == null) { // move to the next time window - curTimeRange = timeRangeIterator.nextTimeRange(); - // clear previous aggregation result - for (TableAggregator aggregator : tableAggregators) { - aggregator.reset(); - } + curTimeRange = dateBinTimeRangeIterator.nextTimeRange(); + resetTableAggregators(); } // calculate aggregation result on current time window // Keep curTimeRange if the calculation of this timeRange is not done if (calculateAggregationResultForCurrentTimeRange()) { - // updateResultTsBlock(); + dateBinTimeRangeIterator.resetCurTimeRange(); curTimeRange = null; } } if (resultTsBlockBuilder.getPositionCount() > 0) { - int declaredPositions = resultTsBlockBuilder.getPositionCount(); - ColumnBuilder[] valueColumnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); - Column[] valueColumns = new Column[valueColumnBuilders.length]; - for (int i = 0; i < valueColumns.length; i++) { - valueColumns[i] = valueColumnBuilders[i].build(); - if (valueColumns[i].getPositionCount() != declaredPositions) { - throw new IllegalStateException( - format( - "Declared positions (%s) does not match column %s's number of entries (%s)", - declaredPositions, i, valueColumns[i].getPositionCount())); - } - } - - this.resultTsBlock = - new TsBlock( - resultTsBlockBuilder.getPositionCount(), - new RunLengthEncodedColumn( - TIME_COLUMN_TEMPLATE, resultTsBlockBuilder.getPositionCount()), - valueColumns); - resultTsBlockBuilder.reset(); + buildResultTsBlock(); return this.resultTsBlock; } else { return null; } } + private void buildResultTsBlock() { + int declaredPositions = resultTsBlockBuilder.getPositionCount(); + ColumnBuilder[] valueColumnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); + Column[] valueColumns = new Column[valueColumnBuilders.length]; + for (int i = 0; i < valueColumns.length; i++) { + valueColumns[i] = valueColumnBuilders[i].build(); + if (valueColumns[i].getPositionCount() != declaredPositions) { + throw new IllegalStateException( + format( + "Declared positions (%s) does not match column %s's number of entries (%s)", + declaredPositions, i, valueColumns[i].getPositionCount())); + } + } + + this.resultTsBlock = + new TsBlock( + resultTsBlockBuilder.getPositionCount(), + new RunLengthEncodedColumn( + TIME_COLUMN_TEMPLATE, resultTsBlockBuilder.getPositionCount()), + valueColumns); + resultTsBlockBuilder.reset(); + } + @Override public boolean isFinished() throws Exception { - return (retainedTsBlock == null) - && (currentDeviceIndex >= deviceCount || seriesScanOptions.limitConsumedUp()); + if (!finished) { + finished = !hasNextWithTimer(); + } + return finished; + + // return (retainedTsBlock == null) + // && (currentDeviceIndex >= deviceCount || seriesScanOptions.limitConsumedUp()); } @Override @@ -297,11 +318,10 @@ && readAndCalcFromFile()) { return false; } else { updateResultTsBlock(); - for (TableAggregator aggregator : tableAggregators) { - aggregator.reset(); - } + resetTableAggregators(); currentDeviceIndex++; } + if (currentDeviceIndex < deviceCount) { // construct AlignedSeriesScanUtil for next device this.seriesScanUtil = constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex)); @@ -310,7 +330,14 @@ && readAndCalcFromFile()) { queryDataSource.reset(); this.seriesScanUtil.initQueryDataSource(queryDataSource); } - return currentDeviceIndex >= deviceCount; + + if (currentDeviceIndex >= deviceCount) { + // all devices have been consumed + dateBinTimeRangeIterator.setFinished(); + return true; + } else { + return false; + } } catch (IOException e) { throw new RuntimeException("Error while scanning the file", e); } @@ -355,6 +382,14 @@ public Pair calculateAggregationFromRawData( return new Pair<>(false, inputTsBlock); } + if (dateBinTimeRangeIterator.overCurrentTimeRange(inputTsBlock.getStartTime())) { + updateResultTsBlock(); + dateBinTimeRangeIterator.resetCurTimeRange(); + resetTableAggregators(); + } + curTimeRange = dateBinTimeRangeIterator.updateCurTimeRange(inputTsBlock.getStartTime()); + this.curTimeRange = curTimeRange; + // check if the tsBlock does not contain points in current interval if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) { // skip points that cannot be calculated @@ -390,6 +425,25 @@ private TsBlock process( } TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1); + + // TODO(beyyes) add optimization: if only agg measurement columns, no need to transfer + Column[] valueColumns = new Column[tableAggregators.size()]; + for (int i = 0; i < tableAggregators.size(); i++) { + ColumnSchema columnSchema = columnSchemas.get(layoutArray[i]); + if (Streams.of( + TsTableColumnCategory.ID, TsTableColumnCategory.ATTRIBUTE, TsTableColumnCategory.TIME) + .anyMatch(columnSchema.getColumnCategory()::equals)) { + valueColumns[i] = inputRegion.getTimeColumn(); + } else { + valueColumns[i] = inputRegion.getColumn(i); + } + } + TsBlock tsBlock = + new TsBlock( + inputRegion.getPositionCount(), + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, inputRegion.getPositionCount()), + valueColumns); + for (int i = 0; i < aggregators.size(); i++) { if (isNullIdOrAttribute(i)) { continue; @@ -401,7 +455,7 @@ private TsBlock process( continue; } - aggregator.processBlock(inputRegion); + aggregator.processBlock(tsBlock); } int lastReadRowIndex = lastIndexToProcess + 1; if (lastReadRowIndex >= inputTsBlock.getPositionCount()) { @@ -459,6 +513,15 @@ protected boolean readAndCalcFromFile() throws IOException { while (System.nanoTime() - start < leftRuntimeOfOneNextCall && seriesScanUtil.hasNextFile()) { if (canUseStatistics && seriesScanUtil.canUseCurrentFileStatistics()) { Statistics fileTimeStatistics = seriesScanUtil.currentFileTimeStatistics(); + + if (dateBinTimeRangeIterator.overCurrentTimeRange(fileTimeStatistics.getStartTime())) { + updateResultTsBlock(); + dateBinTimeRangeIterator.resetCurTimeRange(); + resetTableAggregators(); + } + curTimeRange = + dateBinTimeRangeIterator.updateCurTimeRange(fileTimeStatistics.getStartTime()); + if (fileTimeStatistics.getStartTime() > curTimeRange.getMax()) { if (ascending) { return true; @@ -467,6 +530,7 @@ protected boolean readAndCalcFromFile() throws IOException { continue; } } + // calc from fileMetaData if (curTimeRange.contains( fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) { @@ -500,6 +564,15 @@ protected boolean readAndCalcFromChunk() throws IOException { while (System.nanoTime() - start < leftRuntimeOfOneNextCall && seriesScanUtil.hasNextChunk()) { if (canUseStatistics && seriesScanUtil.canUseCurrentChunkStatistics()) { Statistics chunkTimeStatistics = seriesScanUtil.currentChunkTimeStatistics(); + + if (dateBinTimeRangeIterator.overCurrentTimeRange(chunkTimeStatistics.getStartTime())) { + updateResultTsBlock(); + dateBinTimeRangeIterator.resetCurTimeRange(); + resetTableAggregators(); + } + curTimeRange = + dateBinTimeRangeIterator.updateCurTimeRange(chunkTimeStatistics.getStartTime()); + if (chunkTimeStatistics.getStartTime() > curTimeRange.getMax()) { if (ascending) { return true; @@ -508,6 +581,7 @@ protected boolean readAndCalcFromChunk() throws IOException { continue; } } + // calc from chunkMetaData if (curTimeRange.contains( chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) { @@ -543,6 +617,15 @@ protected boolean readAndCalcFromPage() throws IOException { while (System.nanoTime() - start < leftRuntimeOfOneNextCall && seriesScanUtil.hasNextPage()) { if (canUseStatistics && seriesScanUtil.canUseCurrentPageStatistics()) { Statistics pageTimeStatistics = seriesScanUtil.currentPageTimeStatistics(); + + if (dateBinTimeRangeIterator.overCurrentTimeRange(pageTimeStatistics.getStartTime())) { + updateResultTsBlock(); + dateBinTimeRangeIterator.resetCurTimeRange(); + resetTableAggregators(); + } + curTimeRange = + dateBinTimeRangeIterator.updateCurTimeRange(pageTimeStatistics.getStartTime()); + // There is no more eligible points in current time range if (pageTimeStatistics.getStartTime() > curTimeRange.getMax()) { if (ascending) { @@ -552,6 +635,7 @@ protected boolean readAndCalcFromPage() throws IOException { continue; } } + // can use pageHeader if (curTimeRange.contains( pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) { @@ -574,35 +658,13 @@ protected boolean readAndCalcFromPage() throws IOException { if (originalTsBlock == null) { continue; } - // TODO(beyyes) add optimization: if only agg measurement columns, no need to transfer - Column[] valueColumns = new Column[tableAggregators.size()]; - for (int i = 0; i < tableAggregators.size(); i++) { - ColumnSchema columnSchema = columnSchemas.get(layoutArray[i]); - if (Streams.of( - TsTableColumnCategory.ID, - TsTableColumnCategory.ATTRIBUTE, - TsTableColumnCategory.TIME) - .anyMatch(columnSchema.getColumnCategory()::equals)) { - valueColumns[i] = originalTsBlock.getTimeColumn(); - } else { - valueColumns[i] = originalTsBlock.getColumn(i); - } - } - TsBlock tsBlock = - new TsBlock( - originalTsBlock.getPositionCount(), - new RunLengthEncodedColumn( - TIME_COLUMN_TEMPLATE, originalTsBlock.getPositionCount()), - valueColumns); - if (tsBlock.isEmpty()) { - continue; - } // calc from raw data - if (calcFromRawData(tsBlock)) { + if (calcFromRawData(originalTsBlock)) { return true; } } + return false; } finally { leftRuntimeOfOneNextCall -= (System.nanoTime() - start); @@ -611,15 +673,19 @@ protected boolean readAndCalcFromPage() throws IOException { @Override protected List getResultDataTypes() { + int groupingKeySize = groupingKeySchemas != null ? groupingKeySchemas.size() : 0; + int dateBinSize = dateBinTimeRangeIterator != null ? 1 : 0; List resultDataTypes = - new ArrayList<>( - (groupingKeyIndex != null ? groupingKeyIndex.length : 0) + tableAggregators.size()); - if (groupingKeyIndex != null) { - for (int i = 0; i < groupingKeyIndex.length; i++) { + new ArrayList<>(groupingKeySize + dateBinSize + tableAggregators.size()); + + if (groupingKeySchemas != null) { + for (int i = 0; i < groupingKeySchemas.size(); i++) { resultDataTypes.add(TSDataType.STRING); } } - + if (dateBinTimeRangeIterator != null) { + resultDataTypes.add(TSDataType.TIMESTAMP); + } for (TableAggregator aggregator : tableAggregators) { resultDataTypes.add(aggregator.getType()); } @@ -627,13 +693,20 @@ protected List getResultDataTypes() { return resultDataTypes; } + private void resetTableAggregators() { + tableAggregators.forEach(TableAggregator::reset); + } + /** Append a row of aggregation results to the result tsBlock. */ public void appendAggregationResult( TsBlockBuilder tsBlockBuilder, List aggregators) { ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders(); + int groupKeySize = groupingKeySchemas == null ? 0 : groupingKeySchemas.size(); + int dateBinSize = dateBinTimeRangeIterator != null ? 1 : 0; + if (groupingKeyIndex != null) { - for (int i = 0; i < groupingKeyIndex.length; i++) { + for (int i = 0; i < groupKeySize; i++) { if (TsTableColumnCategory.ID == groupingKeySchemas.get(i).getColumnCategory()) { columnBuilders[i].writeBinary( new Binary( @@ -654,10 +727,12 @@ public void appendAggregationResult( } } - int groupKeyLength = groupingKeyIndex == null ? 0 : groupingKeyIndex.length; + if (dateBinSize > 0) { + columnBuilders[groupKeySize].writeLong(curTimeRange.getMin()); + } for (int i = 0; i < aggregators.size(); i++) { - aggregators.get(i).evaluate(columnBuilders[groupKeyLength + i]); + aggregators.get(i).evaluate(columnBuilders[groupKeySize + dateBinSize + i]); } tsBlockBuilder.declarePosition(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 934a5b539e141..b86cef46622c2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -117,9 +117,13 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.leaf.LeafColumnTransformer; +import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.DateBinFunctionColumnTransformer; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.IDeviceSchemaInfo; import org.apache.iotdb.db.utils.datastructure.SortKey; @@ -172,6 +176,8 @@ import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST; import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST; +import static org.apache.iotdb.db.utils.constant.SqlConstant.TABLE_TIME_COLUMN_NAME; +import static org.apache.tsfile.read.common.type.TimestampType.TIMESTAMP; /** This Visitor is responsible for transferring Table PlanNode Tree to Table Operator Tree. */ public class TableOperatorGenerator extends PlanVisitor { @@ -1281,22 +1287,6 @@ public Operator visitAggregationTableScan( List measurementColumnNames = new ArrayList<>(); List measurementSchemas = new ArrayList<>(); - List groupingKeySchemas = null; - int[] groupingKeyIndex = null; - if (!node.getGroupingKeys().isEmpty()) { - groupingKeySchemas = new ArrayList<>(node.getGroupingKeys().size()); - groupingKeyIndex = new int[node.getGroupingKeys().size()]; - for (int i = 0; i < node.getGroupingKeys().size(); i++) { - Symbol groupingKey = node.getGroupingKeys().get(i); - - checkArgument( - idAndAttributeColumnsIndexMap.containsKey(groupingKey), - "grouping key must be ID or Attribute in AggregationTableScan"); - groupingKeySchemas.add(columnSchemaMap.get(groupingKey)); - groupingKeyIndex[i] = idAndAttributeColumnsIndexMap.get(groupingKey); - } - } - // TODO test aggregation function which has more than one arguements for (Map.Entry entry : node.getAggregations().entrySet()) { idx++; @@ -1373,6 +1363,48 @@ public Operator visitAggregationTableScan( } } + DateBinFunctionColumnTransformer dateBinTransformer = null; + int dateBinColumnIdx = -2; + List groupingKeySchemas = null; + int[] groupingKeyIndex = null; + if (!node.getGroupingKeys().isEmpty()) { + groupingKeySchemas = new ArrayList<>(node.getGroupingKeys().size()); + groupingKeyIndex = new int[node.getGroupingKeys().size()]; + for (int i = 0; i < node.getGroupingKeys().size(); i++) { + Symbol groupingKey = node.getGroupingKeys().get(i); + + if (idAndAttributeColumnsIndexMap.containsKey(groupingKey)) { + groupingKeySchemas.add(columnSchemaMap.get(groupingKey)); + groupingKeyIndex[i] = idAndAttributeColumnsIndexMap.get(groupingKey); + } else { + if (node.getProjection() != null + && !node.getProjection().getMap().isEmpty() + && node.getProjection().contains(groupingKey)) { + FunctionCall dateBinFunc = (FunctionCall) node.getProjection().get(groupingKey); + List arguments = dateBinFunc.getArguments(); + dateBinTransformer = + new DateBinFunctionColumnTransformer( + TIMESTAMP, + ((LongLiteral) arguments.get(0)).getParsedValue(), + ((LongLiteral) arguments.get(1)).getParsedValue(), + null, + ((LongLiteral) arguments.get(3)).getParsedValue(), + context.getZoneId()); + SymbolReference dateBinColumn = (SymbolReference) arguments.get(2); + if (TABLE_TIME_COLUMN_NAME.equals(dateBinColumn.getName())) { + dateBinColumnIdx = -1; + } else { + dateBinColumnIdx = + columnsIndexArray[columnLayout.get(Symbol.of(dateBinColumn.getName()))]; + } + } else { + throw new IllegalStateException( + "grouping key must be ID or Attribute in AggregationTableScan"); + } + } + } + } + final OperatorContext operatorContext = context .getDriverContext() @@ -1412,11 +1444,13 @@ public Operator visitAggregationTableScan( groupingKeySchemas, groupingKeyIndex, timeRangeIterator, - false, + node.getScanOrder().isAscending(), null, calculateMaxAggregationResultSize(), canUseStatistic, - layoutArray); + layoutArray, + dateBinTransformer, + dateBinColumnIdx); ((DataDriverContext) context.getDriverContext()).addSourceOperator(aggTableScanOperator); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/DateBinFunctionColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/DateBinFunctionColumnTransformer.java index 1c82260eb69b6..ffdceca7ceb1f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/DateBinFunctionColumnTransformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/DateBinFunctionColumnTransformer.java @@ -143,6 +143,44 @@ public long dateBin(long source) { return origin + (n * nonMonthDuration); } + public long[] dateBinStartEnd(long source) { + // return source if interval is 0 + if (monthDuration == 0 && nonMonthDuration == 0) { + return new long[] {source, source}; + } + + if (monthDuration != 0) { + // convert to LocalDateTime + LocalDateTime sourceDate = convertToLocalDateTime(source); + LocalDateTime originDate = convertToLocalDateTime(origin); + + // calculate the number of months between the origin and source + long monthsDiff = ChronoUnit.MONTHS.between(originDate, sourceDate); + // calculate the number of month cycles completed + long completedMonthCycles = monthsDiff / monthDuration; + + LocalDateTime binStart = + originDate + .plusNanos(getNanoTimeStamp(nonMonthDuration) * completedMonthCycles) + .plusMonths(completedMonthCycles * monthDuration); + + if (binStart.isAfter(sourceDate)) { + binStart = + binStart.minusMonths(monthDuration).minusNanos(getNanoTimeStamp(nonMonthDuration)); + } + + return new long[] { + convertToTimestamp(binStart), convertToTimestamp(binStart.plusMonths(monthDuration)) + }; + } + + long diff = source - origin; + long n = diff >= 0 ? diff / nonMonthDuration : (diff - nonMonthDuration + 1) / nonMonthDuration; + return new long[] { + origin + (n * nonMonthDuration), origin + (n * nonMonthDuration) + nonMonthDuration + }; + } + @Override protected void doTransform(Column column, ColumnBuilder columnBuilder) { for (int i = 0, n = column.getPositionCount(); i < n; i++) { From d01ad834d06d1c8d13fe9f953b490e826ca6e589 Mon Sep 17 00:00:00 2001 From: Beyyes Date: Mon, 14 Oct 2024 21:25:03 +0800 Subject: [PATCH 3/5] refactor --- .../org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../ITableTimeRangeIterator.java | 47 ++++++++ .../TableDateBinTimeRangeIterator.java} | 48 +++------ .../TableSingleTimeWindowIterator.java | 85 +++++++++++++++ .../TableAggregationTableScanOperator.java | 102 +++++++----------- .../plan/planner/TableOperatorGenerator.java | 30 ++---- 6 files changed, 196 insertions(+), 118 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/ITableTimeRangeIterator.java rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/{operator/source/relational/DateBinTimeRangeIterator.java => aggregation/timerangeiterator/TableDateBinTimeRangeIterator.java} (64%) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableSingleTimeWindowIterator.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 055029b0c68c8..88cc634d388e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -632,7 +632,7 @@ public class IoTDBConfig { private long cacheFileReaderClearPeriod = 100000; /** the max executing time of query in ms. Unit: millisecond */ - private long queryTimeoutThreshold = 60000; + private long queryTimeoutThreshold = 60000000; /** the max time to live of a session in ms. Unit: millisecond */ private int sessionTimeoutThreshold = 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/ITableTimeRangeIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/ITableTimeRangeIterator.java new file mode 100644 index 0000000000000..9d945b6e3f155 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/ITableTimeRangeIterator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator; + +import org.apache.tsfile.read.common.TimeRange; + +public interface ITableTimeRangeIterator { + + TimeIteratorType getType(); + + /** + * @return whether current iterator has next time range. + */ + boolean hasNextTimeRange(); + + TimeRange nextTimeRange(); + + boolean canFinishCurrentTimeRange(long startTime); + + void resetCurTimeRange(); + + TimeRange updateCurTimeRange(long startTime); + + void setFinished(); + + enum TimeIteratorType { + DATE_BIN_TIME_ITERATOR, + SINGLE_TIME_ITERATOR + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DateBinTimeRangeIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableDateBinTimeRangeIterator.java similarity index 64% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DateBinTimeRangeIterator.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableDateBinTimeRangeIterator.java index 484198d3f85ca..f4f9725c2a88d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DateBinTimeRangeIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableDateBinTimeRangeIterator.java @@ -17,42 +17,32 @@ * under the License. */ -package org.apache.iotdb.db.queryengine.execution.operator.source.relational; +package org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator; -import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITimeRangeIterator; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.DateBinFunctionColumnTransformer; -import org.apache.tsfile.block.column.Column; import org.apache.tsfile.read.common.TimeRange; -public class DateBinTimeRangeIterator implements ITimeRangeIterator { +public class TableDateBinTimeRangeIterator implements ITableTimeRangeIterator { DateBinFunctionColumnTransformer dateBinTransformer; boolean finished = false; - // total query [startTime, endTime) - private long startTime; - private long endTime; - + // left close, right open private TimeRange curTimeRange; private boolean hasCachedTimeRange; - public DateBinTimeRangeIterator(DateBinFunctionColumnTransformer dateBinTransformer) { + public TableDateBinTimeRangeIterator(DateBinFunctionColumnTransformer dateBinTransformer) { this.dateBinTransformer = dateBinTransformer; } - public void updateCurTimeRange(Column timeColumn) { - // long startTime = timeColumn.getLong(); - this.hasCachedTimeRange = true; - } - - public boolean overCurrentTimeRange(long startTime) { + public boolean canFinishCurrentTimeRange(long startTime) { if (!hasCachedTimeRange) { return false; } - return dateBinTransformer.dateBin(startTime) > curTimeRange.getMin(); + return startTime >= curTimeRange.getMax(); } public TimeRange updateCurTimeRange(long startTime) { @@ -76,9 +66,8 @@ public void setFinished() { } @Override - public TimeRange getFirstTimeRange() { - throw new IllegalStateException( - "Method getFirstTimeRange is not used in DateBinTimeRangeIterator"); + public TimeIteratorType getType() { + return TimeIteratorType.DATE_BIN_TIME_ITERATOR; } @Override @@ -90,6 +79,7 @@ public TimeRange getCurTimeRange() { return this.curTimeRange; } + @Override public void resetCurTimeRange() { this.curTimeRange = null; this.hasCachedTimeRange = false; @@ -105,20 +95,8 @@ public TimeRange nextTimeRange() { return null; } - @Override - public boolean isAscending() { - return false; - } - - @Override - public long currentOutputTime() { - throw new IllegalStateException( - "Method currentOutputTime is not used in DateBinTimeRangeIterator"); - } - - @Override - public long getTotalIntervalNum() { - throw new IllegalStateException( - "Method getTotalIntervalNum is not used in DateBinTimeRangeIterator"); - } + // @Override + // public boolean isAscending() { + // return false; + // } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableSingleTimeWindowIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableSingleTimeWindowIterator.java new file mode 100644 index 0000000000000..c7fb85fefebd1 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableSingleTimeWindowIterator.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator; + +import org.apache.tsfile.read.common.TimeRange; + +/** Used for aggregation with only one time window. i.e. Aggregation without group by. */ +public class TableSingleTimeWindowIterator implements ITableTimeRangeIterator { + + // when all devices are consumed up, finished = true + boolean finished = false; + + // total query [startTime, endTime) + private final long startTime; + private final long endTime; + + private final TimeRange curTimeRange; + private boolean hasCachedTimeRange; + + public TableSingleTimeWindowIterator(long startTime, long endTime) { + this.startTime = startTime; + this.endTime = endTime; + curTimeRange = new TimeRange(startTime, endTime); + } + + @Override + public TimeIteratorType getType() { + return TimeIteratorType.SINGLE_TIME_ITERATOR; + } + + @Override + public boolean hasNextTimeRange() { + return !finished; + } + + @Override + public TimeRange nextTimeRange() { + if (hasCachedTimeRange || hasNextTimeRange()) { + hasCachedTimeRange = false; + return curTimeRange; + } + return null; + } + + @Override + public boolean canFinishCurrentTimeRange(long startTime) { + return false; + } + + @Override + public void resetCurTimeRange() {} + + @Override + public TimeRange updateCurTimeRange(long startTime) { + // not used + return null; + } + + @Override + public void setFinished() { + this.finished = true; + } + + // @Override + // public boolean isAscending() { + // return false; + // } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java index cfba24133fc39..4518f2fdc6073 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java @@ -22,7 +22,7 @@ import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; -import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITimeRangeIterator; +import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil; @@ -35,7 +35,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; -import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.DateBinFunctionColumnTransformer; import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; @@ -108,9 +107,7 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation private int currentDeviceIndex; - DateBinFunctionColumnTransformer dateBinTransformer; - int dateBinColumnIdx; - DateBinTimeRangeIterator dateBinTimeRangeIterator; + ITableTimeRangeIterator tableTimeRangeIterator; public TableAggregationTableScanOperator( PlanNodeId sourceId, @@ -127,14 +124,12 @@ public TableAggregationTableScanOperator( List tableAggregators, List groupingKeySchemas, int[] groupingKeyIndex, - ITimeRangeIterator timeRangeIterator, + ITableTimeRangeIterator tableTimeRangeIterator, boolean ascending, GroupByTimeParameter groupByTimeParameter, long maxReturnSize, boolean canUseStatistics, - int[] layoutArray, - DateBinFunctionColumnTransformer dateBinTransformer, - int dateBinColumnIdx) { + int[] layoutArray) { super( sourceId, @@ -142,7 +137,7 @@ public TableAggregationTableScanOperator( null, subSensorSize, null, - timeRangeIterator, + null, ascending, false, groupByTimeParameter, @@ -167,11 +162,7 @@ public TableAggregationTableScanOperator( measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); this.currentDeviceIndex = 0; this.layoutArray = layoutArray; - this.dateBinTransformer = dateBinTransformer; - if (dateBinTransformer != null) { - this.dateBinColumnIdx = dateBinColumnIdx; - this.dateBinTimeRangeIterator = new DateBinTimeRangeIterator(dateBinTransformer); - } + this.tableTimeRangeIterator = tableTimeRangeIterator; this.maxReturnSize = maxReturnSize; this.maxTsBlockLineNum = maxTsBlockLineNum; @@ -181,7 +172,9 @@ public TableAggregationTableScanOperator( @Override public boolean hasNext() throws Exception { - return curTimeRange != null || dateBinTimeRangeIterator.hasNextTimeRange(); + return curTimeRange != null + || tableTimeRangeIterator.hasNextTimeRange() + || !resultTsBlockBuilder.isEmpty(); } @Override @@ -201,19 +194,17 @@ public TsBlock next() throws Exception { long maxRuntime = leftRuntimeOfOneNextCall; while (System.nanoTime() - start < maxRuntime - && (curTimeRange != null || dateBinTimeRangeIterator.hasNextTimeRange()) + && (curTimeRange != null || tableTimeRangeIterator.hasNextTimeRange()) && !resultTsBlockBuilder.isFull()) { if (curTimeRange == null) { - // move to the next time window - curTimeRange = dateBinTimeRangeIterator.nextTimeRange(); + curTimeRange = tableTimeRangeIterator.nextTimeRange(); resetTableAggregators(); } // calculate aggregation result on current time window - // Keep curTimeRange if the calculation of this timeRange is not done if (calculateAggregationResultForCurrentTimeRange()) { - dateBinTimeRangeIterator.resetCurTimeRange(); + tableTimeRangeIterator.resetCurTimeRange(); curTimeRange = null; } } @@ -325,15 +316,13 @@ && readAndCalcFromFile()) { if (currentDeviceIndex < deviceCount) { // construct AlignedSeriesScanUtil for next device this.seriesScanUtil = constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex)); - - // reset QueryDataSource queryDataSource.reset(); this.seriesScanUtil.initQueryDataSource(queryDataSource); } if (currentDeviceIndex >= deviceCount) { // all devices have been consumed - dateBinTimeRangeIterator.setFinished(); + tableTimeRangeIterator.setFinished(); return true; } else { return false; @@ -362,7 +351,7 @@ protected boolean calcFromCachedData() { private boolean calcFromRawData(TsBlock tsBlock) { Pair calcResult = - calculateAggregationFromRawData(tsBlock, tableAggregators, curTimeRange, ascending); + calculateAggregationFromRawData(tsBlock, tableAggregators, ascending); inputTsBlock = calcResult.getRight(); return calcResult.getLeft(); } @@ -374,21 +363,12 @@ private boolean calcFromRawData(TsBlock tsBlock) { * remaining tsBlock */ public Pair calculateAggregationFromRawData( - TsBlock inputTsBlock, - List aggregators, - TimeRange curTimeRange, - boolean ascending) { + TsBlock inputTsBlock, List aggregators, boolean ascending) { if (inputTsBlock == null || inputTsBlock.isEmpty()) { return new Pair<>(false, inputTsBlock); } - if (dateBinTimeRangeIterator.overCurrentTimeRange(inputTsBlock.getStartTime())) { - updateResultTsBlock(); - dateBinTimeRangeIterator.resetCurTimeRange(); - resetTableAggregators(); - } - curTimeRange = dateBinTimeRangeIterator.updateCurTimeRange(inputTsBlock.getStartTime()); - this.curTimeRange = curTimeRange; + updateCurTimeRange(inputTsBlock.getStartTime()); // check if the tsBlock does not contain points in current interval if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) { @@ -514,13 +494,7 @@ protected boolean readAndCalcFromFile() throws IOException { if (canUseStatistics && seriesScanUtil.canUseCurrentFileStatistics()) { Statistics fileTimeStatistics = seriesScanUtil.currentFileTimeStatistics(); - if (dateBinTimeRangeIterator.overCurrentTimeRange(fileTimeStatistics.getStartTime())) { - updateResultTsBlock(); - dateBinTimeRangeIterator.resetCurTimeRange(); - resetTableAggregators(); - } - curTimeRange = - dateBinTimeRangeIterator.updateCurTimeRange(fileTimeStatistics.getStartTime()); + updateCurTimeRange(fileTimeStatistics.getStartTime()); if (fileTimeStatistics.getStartTime() > curTimeRange.getMax()) { if (ascending) { @@ -540,7 +514,7 @@ protected boolean readAndCalcFromFile() throws IOException { } calcFromStatistics(fileTimeStatistics, statisticsList); seriesScanUtil.skipCurrentFile(); - if (isAllAggregatorsHasFinalResult(tableAggregators) && !isGroupByQuery) { + if (isAllAggregatorsHasFinalResult(tableAggregators)) { return true; } else { continue; @@ -565,13 +539,7 @@ protected boolean readAndCalcFromChunk() throws IOException { if (canUseStatistics && seriesScanUtil.canUseCurrentChunkStatistics()) { Statistics chunkTimeStatistics = seriesScanUtil.currentChunkTimeStatistics(); - if (dateBinTimeRangeIterator.overCurrentTimeRange(chunkTimeStatistics.getStartTime())) { - updateResultTsBlock(); - dateBinTimeRangeIterator.resetCurTimeRange(); - resetTableAggregators(); - } - curTimeRange = - dateBinTimeRangeIterator.updateCurTimeRange(chunkTimeStatistics.getStartTime()); + updateCurTimeRange(chunkTimeStatistics.getStartTime()); if (chunkTimeStatistics.getStartTime() > curTimeRange.getMax()) { if (ascending) { @@ -592,7 +560,7 @@ protected boolean readAndCalcFromChunk() throws IOException { } calcFromStatistics(chunkTimeStatistics, statisticsList); seriesScanUtil.skipCurrentChunk(); - if (isAllAggregatorsHasFinalResult(tableAggregators) && !isGroupByQuery) { + if (isAllAggregatorsHasFinalResult(tableAggregators)) { return true; } else { continue; @@ -618,13 +586,7 @@ protected boolean readAndCalcFromPage() throws IOException { if (canUseStatistics && seriesScanUtil.canUseCurrentPageStatistics()) { Statistics pageTimeStatistics = seriesScanUtil.currentPageTimeStatistics(); - if (dateBinTimeRangeIterator.overCurrentTimeRange(pageTimeStatistics.getStartTime())) { - updateResultTsBlock(); - dateBinTimeRangeIterator.resetCurTimeRange(); - resetTableAggregators(); - } - curTimeRange = - dateBinTimeRangeIterator.updateCurTimeRange(pageTimeStatistics.getStartTime()); + updateCurTimeRange(pageTimeStatistics.getStartTime()); // There is no more eligible points in current time range if (pageTimeStatistics.getStartTime() > curTimeRange.getMax()) { @@ -645,7 +607,7 @@ protected boolean readAndCalcFromPage() throws IOException { } calcFromStatistics(pageTimeStatistics, statisticsList); seriesScanUtil.skipCurrentPage(); - if (isAllAggregatorsHasFinalResult(tableAggregators) && !isGroupByQuery) { + if (isAllAggregatorsHasFinalResult(tableAggregators)) { return true; } else { continue; @@ -671,10 +633,24 @@ protected boolean readAndCalcFromPage() throws IOException { } } + private void updateCurTimeRange(long startTime) { + if (tableTimeRangeIterator.getType() + == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) { + return; + } + + if (tableTimeRangeIterator.canFinishCurrentTimeRange(startTime)) { + updateResultTsBlock(); + tableTimeRangeIterator.resetCurTimeRange(); + resetTableAggregators(); + } + curTimeRange = tableTimeRangeIterator.updateCurTimeRange(startTime); + } + @Override protected List getResultDataTypes() { int groupingKeySize = groupingKeySchemas != null ? groupingKeySchemas.size() : 0; - int dateBinSize = dateBinTimeRangeIterator != null ? 1 : 0; + int dateBinSize = tableTimeRangeIterator != null ? 1 : 0; List resultDataTypes = new ArrayList<>(groupingKeySize + dateBinSize + tableAggregators.size()); @@ -683,7 +659,7 @@ protected List getResultDataTypes() { resultDataTypes.add(TSDataType.STRING); } } - if (dateBinTimeRangeIterator != null) { + if (tableTimeRangeIterator != null) { resultDataTypes.add(TSDataType.TIMESTAMP); } for (TableAggregator aggregator : tableAggregators) { @@ -703,7 +679,7 @@ public void appendAggregationResult( ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders(); int groupKeySize = groupingKeySchemas == null ? 0 : groupingKeySchemas.size(); - int dateBinSize = dateBinTimeRangeIterator != null ? 1 : 0; + int dateBinSize = tableTimeRangeIterator != null ? 1 : 0; if (groupingKeyIndex != null) { for (int i = 0; i < groupKeySize; i++) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index b86cef46622c2..2a64cc2d9895c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -24,8 +24,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; -import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITimeRangeIterator; -import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.SingleTimeWindowIterator; +import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; +import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.TableDateBinTimeRangeIterator; +import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.TableSingleTimeWindowIterator; import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext; import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager; import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeService; @@ -120,7 +121,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral; -import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.leaf.LeafColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.DateBinFunctionColumnTransformer; @@ -176,7 +176,6 @@ import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST; import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST; -import static org.apache.iotdb.db.utils.constant.SqlConstant.TABLE_TIME_COLUMN_NAME; import static org.apache.tsfile.read.common.type.TimestampType.TIMESTAMP; /** This Visitor is responsible for transferring Table PlanNode Tree to Table Operator Tree. */ @@ -1363,8 +1362,7 @@ public Operator visitAggregationTableScan( } } - DateBinFunctionColumnTransformer dateBinTransformer = null; - int dateBinColumnIdx = -2; + ITableTimeRangeIterator timeRangeIterator = null; List groupingKeySchemas = null; int[] groupingKeyIndex = null; if (!node.getGroupingKeys().isEmpty()) { @@ -1382,7 +1380,7 @@ public Operator visitAggregationTableScan( && node.getProjection().contains(groupingKey)) { FunctionCall dateBinFunc = (FunctionCall) node.getProjection().get(groupingKey); List arguments = dateBinFunc.getArguments(); - dateBinTransformer = + DateBinFunctionColumnTransformer dateBinTransformer = new DateBinFunctionColumnTransformer( TIMESTAMP, ((LongLiteral) arguments.get(0)).getParsedValue(), @@ -1390,13 +1388,7 @@ public Operator visitAggregationTableScan( null, ((LongLiteral) arguments.get(3)).getParsedValue(), context.getZoneId()); - SymbolReference dateBinColumn = (SymbolReference) arguments.get(2); - if (TABLE_TIME_COLUMN_NAME.equals(dateBinColumn.getName())) { - dateBinColumnIdx = -1; - } else { - dateBinColumnIdx = - columnsIndexArray[columnLayout.get(Symbol.of(dateBinColumn.getName()))]; - } + timeRangeIterator = new TableDateBinTimeRangeIterator(dateBinTransformer); } else { throw new IllegalStateException( "grouping key must be ID or Attribute in AggregationTableScan"); @@ -1404,6 +1396,9 @@ public Operator visitAggregationTableScan( } } } + if (timeRangeIterator == null) { + timeRangeIterator = new TableSingleTimeWindowIterator(Long.MIN_VALUE, Long.MAX_VALUE); + } final OperatorContext operatorContext = context @@ -1425,8 +1420,7 @@ public Operator visitAggregationTableScan( scanOptionsBuilder.withPushDownFilter( convertPredicateToFilter(pushDownPredicate, measurementColumnNames, columnSchemaMap)); } - ITimeRangeIterator timeRangeIterator = - new SingleTimeWindowIterator(Long.MIN_VALUE, Long.MAX_VALUE); + TableAggregationTableScanOperator aggTableScanOperator = new TableAggregationTableScanOperator( node.getPlanNodeId(), @@ -1448,9 +1442,7 @@ public Operator visitAggregationTableScan( null, calculateMaxAggregationResultSize(), canUseStatistic, - layoutArray, - dateBinTransformer, - dateBinColumnIdx); + layoutArray); ((DataDriverContext) context.getDriverContext()).addSourceOperator(aggTableScanOperator); From 9c8d570f86c64eb7624d4ac5ab99b351dbf95cd2 Mon Sep 17 00:00:00 2001 From: Beyyes Date: Tue, 15 Oct 2024 14:23:12 +0800 Subject: [PATCH 4/5] add it --- .../IoTDBMultiIDsWithAttributesTableIT.java | 127 +++++++++- .../ITableTimeRangeIterator.java | 8 +- .../TableDateBinTimeRangeIterator.java | 39 ++-- .../TableSingleTimeWindowIterator.java | 28 ++- .../TableAggregationTableScanOperator.java | 217 ++++++++++-------- .../aggregation/AccumulatorFactory.java | 2 + .../aggregation/AvgAccumulator.java | 2 +- .../aggregation/SumAccumulator.java | 173 ++++++++++++++ .../TableBuiltinAggregationFunction.java | 2 + 9 files changed, 468 insertions(+), 130 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/SumAccumulator.java diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java index b62ea9bbed848..cb26a2c42b85d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java @@ -110,8 +110,8 @@ public class IoTDBMultiIDsWithAttributesTableIT { }; String[] expectedHeader; - String[] retArray; + String sql; @BeforeClass public static void setUp() throws Exception { @@ -640,6 +640,131 @@ public void groupByAggregationTest() { tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); } + @Test + public void groupByDateBinTest() { + expectedHeader = + new String[] { + "device", + "level", + "bin", + "count_num", + "count_star", + "count_device", + "count_date", + "count_attr1", + "count_attr2", + "count_time", + "avg_num" + }; + retArray = + new String[] { + "d1,l1,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,3.0,", + "d1,l1,1971-01-01T00:00:00.000Z,2,2,2,0,2,2,2,8.5,", + "d1,l2,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,2.0,", + "d1,l2,1971-01-01T00:00:00.000Z,2,2,2,0,2,2,2,11.0,", + "d1,l3,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,1.0,", + "d1,l3,1971-01-01T00:00:00.000Z,2,2,2,0,2,2,2,9.0,", + "d1,l4,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,9.0,", + "d1,l4,1971-01-01T00:00:00.000Z,2,2,2,0,0,0,2,9.0,", + "d1,l5,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,8.0,", + "d1,l5,1971-01-01T00:00:00.000Z,2,2,2,1,0,0,2,11.0,", + "d2,l1,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,3.0,", + "d2,l1,1971-01-01T00:00:00.000Z,2,2,2,0,2,2,2,8.5,", + "d2,l2,1970-01-01T00:00:00.000Z,1,1,1,0,1,0,1,2.0,", + "d2,l2,1971-01-01T00:00:00.000Z,2,2,2,0,2,0,2,11.0,", + "d2,l3,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,1.0,", + "d2,l3,1971-01-01T00:00:00.000Z,2,2,2,0,0,0,2,9.0,", + "d2,l4,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,9.0,", + "d2,l4,1971-01-01T00:00:00.000Z,2,2,2,0,0,0,2,9.0,", + "d2,l5,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,8.0,", + "d2,l5,1971-01-01T00:00:00.000Z,2,2,2,1,0,0,2,11.0,", + }; + sql = + "select device, level, date_bin(1y, time) as bin," + + "count(num) as count_num, count(*) as count_star, count(device) as count_device, count(date) as count_date, " + + "count(attr1) as count_attr1, count(attr2) as count_attr2, count(time) as count_time, avg(num) as avg_num " + + "from table0 group by 3, device, level order by device, level, bin"; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + + retArray = + new String[] { + "d1,l1,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,3.0,", + "d1,l1,1971-01-01T00:00:00.000Z,2,2,2,0,2,2,2,8.5,", + "d1,l2,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,2.0,", + "d1,l2,1971-01-01T00:00:00.000Z,1,1,1,0,1,1,1,10.0,", + "d1,l2,1971-04-26T00:00:00.000Z,1,1,1,0,1,1,1,12.0,", + "d1,l3,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,1.0,", + "d1,l3,1971-01-01T00:00:00.000Z,1,1,1,0,1,1,1,4.0,", + "d1,l3,1971-04-26T00:00:00.000Z,1,1,1,0,1,1,1,14.0,", + "d1,l4,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,9.0,", + "d1,l4,1971-01-01T00:00:00.000Z,1,1,1,0,0,0,1,5.0,", + "d1,l4,1971-04-26T00:00:00.000Z,1,1,1,0,0,0,1,13.0,", + "d1,l5,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,8.0,", + "d1,l5,1971-01-01T00:00:00.000Z,1,1,1,0,0,0,1,7.0,", + "d1,l5,1971-08-20T00:00:00.000Z,1,1,1,1,0,0,1,15.0,", + "d2,l1,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,3.0,", + "d2,l1,1971-01-01T00:00:00.000Z,2,2,2,0,2,2,2,8.5,", + "d2,l2,1970-01-01T00:00:00.000Z,1,1,1,0,1,0,1,2.0,", + "d2,l2,1971-01-01T00:00:00.000Z,1,1,1,0,1,0,1,10.0,", + "d2,l2,1971-04-26T00:00:00.000Z,1,1,1,0,1,0,1,12.0,", + "d2,l3,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,1.0,", + "d2,l3,1971-01-01T00:00:00.000Z,1,1,1,0,0,0,1,4.0,", + "d2,l3,1971-04-26T00:00:00.000Z,1,1,1,0,0,0,1,14.0,", + "d2,l4,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,9.0,", + "d2,l4,1971-01-01T00:00:00.000Z,1,1,1,0,0,0,1,5.0,", + "d2,l4,1971-04-26T00:00:00.000Z,1,1,1,0,0,0,1,13.0,", + "d2,l5,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,8.0,", + "d2,l5,1971-01-01T00:00:00.000Z,1,1,1,0,0,0,1,7.0,", + "d2,l5,1971-08-20T00:00:00.000Z,1,1,1,1,0,0,1,15.0,", + }; + sql = + "select device, level, date_bin(1d, time) as bin," + + "count(num) as count_num, count(*) as count_star, count(device) as count_device, count(date) as count_date, " + + "count(attr1) as count_attr1, count(attr2) as count_attr2, count(time) as count_time, avg(num) as avg_num " + + "from table0 group by 3, device, level order by device, level, bin"; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + + retArray = + new String[] { + "d1,l1,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,3.0,", + "d1,l1,1971-01-01T00:00:00.000Z,1,1,1,0,1,1,1,6.0,", + "d1,l1,1971-01-01T00:01:40.000Z,1,1,1,0,1,1,1,11.0,", + "d1,l2,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,2.0,", + "d1,l2,1971-01-01T00:00:00.000Z,1,1,1,0,1,1,1,10.0,", + "d1,l2,1971-04-26T17:46:40.000Z,1,1,1,0,1,1,1,12.0,", + "d1,l3,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,1.0,", + "d1,l3,1971-01-01T00:00:00.000Z,1,1,1,0,1,1,1,4.0,", + "d1,l3,1971-04-26T17:46:40.000Z,1,1,1,0,1,1,1,14.0,", + "d1,l4,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,9.0,", + "d1,l4,1971-01-01T00:00:01.000Z,1,1,1,0,0,0,1,5.0,", + "d1,l4,1971-04-26T18:01:40.000Z,1,1,1,0,0,0,1,13.0,", + "d1,l5,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,8.0,", + "d1,l5,1971-01-01T00:00:10.000Z,1,1,1,0,0,0,1,7.0,", + "d1,l5,1971-08-20T11:33:20.000Z,1,1,1,1,0,0,1,15.0,", + "d2,l1,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,3.0,", + "d2,l1,1971-01-01T00:00:00.000Z,1,1,1,0,1,1,1,6.0,", + "d2,l1,1971-01-01T00:01:40.000Z,1,1,1,0,1,1,1,11.0,", + "d2,l2,1970-01-01T00:00:00.000Z,1,1,1,0,1,0,1,2.0,", + "d2,l2,1971-01-01T00:00:00.000Z,1,1,1,0,1,0,1,10.0,", + "d2,l2,1971-04-26T17:46:40.000Z,1,1,1,0,1,0,1,12.0,", + "d2,l3,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,1.0,", + "d2,l3,1971-01-01T00:00:00.000Z,1,1,1,0,0,0,1,4.0,", + "d2,l3,1971-04-26T17:46:40.000Z,1,1,1,0,0,0,1,14.0,", + "d2,l4,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,9.0,", + "d2,l4,1971-01-01T00:00:01.000Z,1,1,1,0,0,0,1,5.0,", + "d2,l4,1971-04-26T18:01:40.000Z,1,1,1,0,0,0,1,13.0,", + "d2,l5,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,8.0,", + "d2,l5,1971-01-01T00:00:10.000Z,1,1,1,0,0,0,1,7.0,", + "d2,l5,1971-08-20T11:33:20.000Z,1,1,1,1,0,0,1,15.0,", + }; + sql = + "select device, level, date_bin(1s, time) as bin," + + "count(num) as count_num, count(*) as count_star, count(device) as count_device, count(date) as count_date, " + + "count(attr1) as count_attr1, count(attr2) as count_attr2, count(time) as count_time, avg(num) as avg_num " + + "from table0 group by 3, device, level order by device, level, bin"; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + } + // ============================ Join Test =========================== // no filter @Test diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/ITableTimeRangeIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/ITableTimeRangeIterator.java index 9d945b6e3f155..60c82f280d479 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/ITableTimeRangeIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/ITableTimeRangeIterator.java @@ -30,13 +30,17 @@ public interface ITableTimeRangeIterator { */ boolean hasNextTimeRange(); - TimeRange nextTimeRange(); + boolean hasCachedTimeRange(); + + // TimeRange nextTimeRange(); + + TimeRange getCurTimeRange(); boolean canFinishCurrentTimeRange(long startTime); void resetCurTimeRange(); - TimeRange updateCurTimeRange(long startTime); + void updateCurTimeRange(long startTime); void setFinished(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableDateBinTimeRangeIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableDateBinTimeRangeIterator.java index f4f9725c2a88d..54286e7ea3b09 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableDateBinTimeRangeIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableDateBinTimeRangeIterator.java @@ -31,37 +31,36 @@ public class TableDateBinTimeRangeIterator implements ITableTimeRangeIterator { // left close, right open private TimeRange curTimeRange; - private boolean hasCachedTimeRange; + + // private boolean hasCachedTimeRange; public TableDateBinTimeRangeIterator(DateBinFunctionColumnTransformer dateBinTransformer) { this.dateBinTransformer = dateBinTransformer; } public boolean canFinishCurrentTimeRange(long startTime) { - if (!hasCachedTimeRange) { + if (curTimeRange == null) { return false; } - return startTime >= curTimeRange.getMax(); + return startTime > curTimeRange.getMax(); } - public TimeRange updateCurTimeRange(long startTime) { + public void updateCurTimeRange(long startTime) { long[] timeArray = dateBinTransformer.dateBinStartEnd(startTime); - if (hasCachedTimeRange) { + if (curTimeRange != null) { // meet new time range, remove old time range if (timeArray[0] != curTimeRange.getMin()) { this.curTimeRange = new TimeRange(timeArray[0], timeArray[1] - 1); } } else { this.curTimeRange = new TimeRange(timeArray[0], timeArray[1] - 1); - this.hasCachedTimeRange = true; } - - return this.curTimeRange; } public void setFinished() { + this.curTimeRange = null; this.finished = true; } @@ -75,6 +74,11 @@ public boolean hasNextTimeRange() { return !finished; } + @Override + public boolean hasCachedTimeRange() { + return curTimeRange != null; + } + public TimeRange getCurTimeRange() { return this.curTimeRange; } @@ -82,18 +86,17 @@ public TimeRange getCurTimeRange() { @Override public void resetCurTimeRange() { this.curTimeRange = null; - this.hasCachedTimeRange = false; } - @Override - public TimeRange nextTimeRange() { - if (hasCachedTimeRange) { - return curTimeRange; - } - // hasCachedTimeRange = false; - // curTimeRange = null; - return null; - } + // @Override + // public TimeRange nextTimeRange() { + // if (hasCachedTimeRange) { + // return curTimeRange; + // } + // // hasCachedTimeRange = false; + // // curTimeRange = null; + // return null; + // } // @Override // public boolean isAscending() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableSingleTimeWindowIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableSingleTimeWindowIterator.java index c7fb85fefebd1..8fee30556632e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableSingleTimeWindowIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableSingleTimeWindowIterator.java @@ -31,7 +31,7 @@ public class TableSingleTimeWindowIterator implements ITableTimeRangeIterator { private final long startTime; private final long endTime; - private final TimeRange curTimeRange; + private TimeRange curTimeRange; private boolean hasCachedTimeRange; public TableSingleTimeWindowIterator(long startTime, long endTime) { @@ -51,14 +51,24 @@ public boolean hasNextTimeRange() { } @Override - public TimeRange nextTimeRange() { - if (hasCachedTimeRange || hasNextTimeRange()) { - hasCachedTimeRange = false; - return curTimeRange; - } - return null; + public boolean hasCachedTimeRange() { + return curTimeRange != null; } + @Override + public TimeRange getCurTimeRange() { + return curTimeRange; + } + + // @Override + // public TimeRange nextTimeRange() { + // if (hasCachedTimeRange || hasNextTimeRange()) { + // hasCachedTimeRange = false; + // return curTimeRange; + // } + // return null; + // } + @Override public boolean canFinishCurrentTimeRange(long startTime) { return false; @@ -68,13 +78,13 @@ public boolean canFinishCurrentTimeRange(long startTime) { public void resetCurTimeRange() {} @Override - public TimeRange updateCurTimeRange(long startTime) { + public void updateCurTimeRange(long startTime) { // not used - return null; } @Override public void setFinished() { + this.curTimeRange = null; this.finished = true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java index 4518f2fdc6073..b29ce52c9c79d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java @@ -107,7 +107,7 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation private int currentDeviceIndex; - ITableTimeRangeIterator tableTimeRangeIterator; + ITableTimeRangeIterator timeIterator; public TableAggregationTableScanOperator( PlanNodeId sourceId, @@ -162,7 +162,11 @@ public TableAggregationTableScanOperator( measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); this.currentDeviceIndex = 0; this.layoutArray = layoutArray; - this.tableTimeRangeIterator = tableTimeRangeIterator; + this.timeIterator = tableTimeRangeIterator; + if (tableTimeRangeIterator.getType() + == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) { + curTimeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE); + } this.maxReturnSize = maxReturnSize; this.maxTsBlockLineNum = maxTsBlockLineNum; @@ -170,10 +174,21 @@ public TableAggregationTableScanOperator( this.seriesScanUtil = constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex)); } + @Override + public boolean isFinished() throws Exception { + if (!finished) { + finished = !hasNextWithTimer(); + } + return finished; + + // return (retainedTsBlock == null) + // && (currentDeviceIndex >= deviceCount || seriesScanOptions.limitConsumedUp()); + } + @Override public boolean hasNext() throws Exception { - return curTimeRange != null - || tableTimeRangeIterator.hasNextTimeRange() + return timeIterator.hasCachedTimeRange() + || timeIterator.hasNextTimeRange() || !resultTsBlockBuilder.isEmpty(); } @@ -194,30 +209,30 @@ public TsBlock next() throws Exception { long maxRuntime = leftRuntimeOfOneNextCall; while (System.nanoTime() - start < maxRuntime - && (curTimeRange != null || tableTimeRangeIterator.hasNextTimeRange()) + && (timeIterator.hasCachedTimeRange() || timeIterator.hasNextTimeRange()) && !resultTsBlockBuilder.isFull()) { - if (curTimeRange == null) { - curTimeRange = tableTimeRangeIterator.nextTimeRange(); - resetTableAggregators(); - } + // if (curTimeRange == null) { + // curTimeRange = tableTimeRangeIterator.nextTimeRange(); + // resetTableAggregators(); + // } // calculate aggregation result on current time window + // return true if current time window is calc finished if (calculateAggregationResultForCurrentTimeRange()) { - tableTimeRangeIterator.resetCurTimeRange(); - curTimeRange = null; + timeIterator.resetCurTimeRange(); + // curTimeRange = null; } } if (resultTsBlockBuilder.getPositionCount() > 0) { - buildResultTsBlock(); - return this.resultTsBlock; + return buildResultTsBlock(); } else { return null; } } - private void buildResultTsBlock() { + private TsBlock buildResultTsBlock() { int declaredPositions = resultTsBlockBuilder.getPositionCount(); ColumnBuilder[] valueColumnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); Column[] valueColumns = new Column[valueColumnBuilders.length]; @@ -231,34 +246,14 @@ private void buildResultTsBlock() { } } - this.resultTsBlock = + TsBlock resultTsBlock = new TsBlock( resultTsBlockBuilder.getPositionCount(), new RunLengthEncodedColumn( TIME_COLUMN_TEMPLATE, resultTsBlockBuilder.getPositionCount()), valueColumns); resultTsBlockBuilder.reset(); - } - - @Override - public boolean isFinished() throws Exception { - if (!finished) { - finished = !hasNextWithTimer(); - } - return finished; - - // return (retainedTsBlock == null) - // && (currentDeviceIndex >= deviceCount || seriesScanOptions.limitConsumedUp()); - } - - @Override - public long ramBytesUsed() { - return INSTANCE_SIZE - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil) - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId) - + (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes()) - + RamUsageEstimator.sizeOfCollection(deviceEntries); + return resultTsBlock; } private AlignedSeriesScanUtil constructAlignedSeriesScanUtil(DeviceEntry deviceEntry) { @@ -308,8 +303,9 @@ && readAndCalcFromFile()) { || seriesScanUtil.hasNextFile()) { return false; } else { + // all data of current device has been consumed updateResultTsBlock(); - resetTableAggregators(); + timeIterator.resetCurTimeRange(); currentDeviceIndex++; } @@ -322,7 +318,7 @@ && readAndCalcFromFile()) { if (currentDeviceIndex >= deviceCount) { // all devices have been consumed - tableTimeRangeIterator.setFinished(); + timeIterator.setFinished(); return true; } else { return false; @@ -332,17 +328,10 @@ && readAndCalcFromFile()) { } } - @Override - public void initQueryDataSource(IQueryDataSource dataSource) { - this.queryDataSource = (QueryDataSource) dataSource; - this.seriesScanUtil.initQueryDataSource(queryDataSource); - this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); - this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); - } - - @Override protected void updateResultTsBlock() { appendAggregationResult(resultTsBlockBuilder, tableAggregators); + // after appendAggregationResult invoked, aggregators must be cleared + resetTableAggregators(); } protected boolean calcFromCachedData() { @@ -370,6 +359,7 @@ public Pair calculateAggregationFromRawData( updateCurTimeRange(inputTsBlock.getStartTime()); + TimeRange curTimeRange = timeIterator.getCurTimeRange(); // check if the tsBlock does not contain points in current interval if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) { // skip points that cannot be calculated @@ -415,7 +405,7 @@ private TsBlock process( .anyMatch(columnSchema.getColumnCategory()::equals)) { valueColumns[i] = inputRegion.getTimeColumn(); } else { - valueColumns[i] = inputRegion.getColumn(i); + valueColumns[i] = inputRegion.getColumn(columnsIndexArray[i]); } } TsBlock tsBlock = @@ -460,15 +450,6 @@ private boolean isNullIdOrAttribute(int i) { == null; } - public static boolean isAllAggregatorsHasFinalResult(List aggregators) { - for (TableAggregator aggregator : aggregators) { - if (!aggregator.hasFinalResult()) { - return false; - } - } - return true; - } - protected void calcFromStatistics(Statistics timeStatistics, Statistics[] valueStatistics) { for (int i = 0; i < tableAggregators.size(); i++) { if (isNullIdOrAttribute(i)) { @@ -496,7 +477,7 @@ protected boolean readAndCalcFromFile() throws IOException { updateCurTimeRange(fileTimeStatistics.getStartTime()); - if (fileTimeStatistics.getStartTime() > curTimeRange.getMax()) { + if (fileTimeStatistics.getStartTime() > timeIterator.getCurTimeRange().getMax()) { if (ascending) { return true; } else { @@ -506,8 +487,9 @@ protected boolean readAndCalcFromFile() throws IOException { } // calc from fileMetaData - if (curTimeRange.contains( - fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) { + if (timeIterator + .getCurTimeRange() + .contains(fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) { Statistics[] statisticsList = new Statistics[subSensorSize]; for (int i = 0; i < subSensorSize; i++) { statisticsList[i] = seriesScanUtil.currentFileStatistics(i); @@ -541,7 +523,7 @@ protected boolean readAndCalcFromChunk() throws IOException { updateCurTimeRange(chunkTimeStatistics.getStartTime()); - if (chunkTimeStatistics.getStartTime() > curTimeRange.getMax()) { + if (chunkTimeStatistics.getStartTime() > timeIterator.getCurTimeRange().getMax()) { if (ascending) { return true; } else { @@ -551,8 +533,9 @@ protected boolean readAndCalcFromChunk() throws IOException { } // calc from chunkMetaData - if (curTimeRange.contains( - chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) { + if (timeIterator + .getCurTimeRange() + .contains(chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) { // calc from chunkMetaData Statistics[] statisticsList = new Statistics[subSensorSize]; for (int i = 0; i < subSensorSize; i++) { @@ -589,7 +572,8 @@ protected boolean readAndCalcFromPage() throws IOException { updateCurTimeRange(pageTimeStatistics.getStartTime()); // There is no more eligible points in current time range - if (pageTimeStatistics.getStartTime() > curTimeRange.getMax()) { + // TODO(beyyes) will not appear in table model? + if (pageTimeStatistics.getStartTime() > timeIterator.getCurTimeRange().getMax()) { if (ascending) { return true; } else { @@ -599,8 +583,9 @@ protected boolean readAndCalcFromPage() throws IOException { } // can use pageHeader - if (curTimeRange.contains( - pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) { + if (timeIterator + .getCurTimeRange() + .contains(pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) { Statistics[] statisticsList = new Statistics[subSensorSize]; for (int i = 0; i < subSensorSize; i++) { statisticsList[i] = seriesScanUtil.currentPageStatistics(i); @@ -634,43 +619,18 @@ protected boolean readAndCalcFromPage() throws IOException { } private void updateCurTimeRange(long startTime) { - if (tableTimeRangeIterator.getType() - == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) { + if (timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) { return; } - if (tableTimeRangeIterator.canFinishCurrentTimeRange(startTime)) { + if (!timeIterator.hasCachedTimeRange()) { + timeIterator.updateCurTimeRange(startTime); + } else if (timeIterator.canFinishCurrentTimeRange(startTime)) { updateResultTsBlock(); - tableTimeRangeIterator.resetCurTimeRange(); + timeIterator.resetCurTimeRange(); + timeIterator.updateCurTimeRange(startTime); resetTableAggregators(); } - curTimeRange = tableTimeRangeIterator.updateCurTimeRange(startTime); - } - - @Override - protected List getResultDataTypes() { - int groupingKeySize = groupingKeySchemas != null ? groupingKeySchemas.size() : 0; - int dateBinSize = tableTimeRangeIterator != null ? 1 : 0; - List resultDataTypes = - new ArrayList<>(groupingKeySize + dateBinSize + tableAggregators.size()); - - if (groupingKeySchemas != null) { - for (int i = 0; i < groupingKeySchemas.size(); i++) { - resultDataTypes.add(TSDataType.STRING); - } - } - if (tableTimeRangeIterator != null) { - resultDataTypes.add(TSDataType.TIMESTAMP); - } - for (TableAggregator aggregator : tableAggregators) { - resultDataTypes.add(aggregator.getType()); - } - - return resultDataTypes; - } - - private void resetTableAggregators() { - tableAggregators.forEach(TableAggregator::reset); } /** Append a row of aggregation results to the result tsBlock. */ @@ -679,7 +639,10 @@ public void appendAggregationResult( ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders(); int groupKeySize = groupingKeySchemas == null ? 0 : groupingKeySchemas.size(); - int dateBinSize = tableTimeRangeIterator != null ? 1 : 0; + int dateBinSize = + timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR + ? 1 + : 0; if (groupingKeyIndex != null) { for (int i = 0; i < groupKeySize; i++) { @@ -704,7 +667,7 @@ public void appendAggregationResult( } if (dateBinSize > 0) { - columnBuilders[groupKeySize].writeLong(curTimeRange.getMin()); + columnBuilders[groupKeySize].writeLong(timeIterator.getCurTimeRange().getMin()); } for (int i = 0; i < aggregators.size(); i++) { @@ -713,4 +676,60 @@ public void appendAggregationResult( tsBlockBuilder.declarePosition(); } + + public static boolean isAllAggregatorsHasFinalResult(List aggregators) { + for (TableAggregator aggregator : aggregators) { + if (!aggregator.hasFinalResult()) { + return false; + } + } + return true; + } + + private void resetTableAggregators() { + tableAggregators.forEach(TableAggregator::reset); + } + + @Override + protected List getResultDataTypes() { + int groupingKeySize = groupingKeySchemas != null ? groupingKeySchemas.size() : 0; + int dateBinSize = + timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR + ? 1 + : 0; + List resultDataTypes = + new ArrayList<>(groupingKeySize + dateBinSize + tableAggregators.size()); + + if (groupingKeySchemas != null) { + for (int i = 0; i < groupingKeySchemas.size(); i++) { + resultDataTypes.add(TSDataType.STRING); + } + } + if (dateBinSize > 0) { + resultDataTypes.add(TSDataType.TIMESTAMP); + } + for (TableAggregator aggregator : tableAggregators) { + resultDataTypes.add(aggregator.getType()); + } + + return resultDataTypes; + } + + @Override + public void initQueryDataSource(IQueryDataSource dataSource) { + this.queryDataSource = (QueryDataSource) dataSource; + this.seriesScanUtil.initQueryDataSource(queryDataSource); + this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); + this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId) + + (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes()) + + RamUsageEstimator.sizeOfCollection(deviceEntries); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java index d3775df5f1273..b35f1f091ba07 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java @@ -60,6 +60,8 @@ public static TableAccumulator createBuiltinAccumulator( return new CountAccumulator(); case AVG: return new AvgAccumulator(inputDataTypes.get(0)); + case SUM: + return new SumAccumulator(inputDataTypes.get(0)); default: throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AvgAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AvgAccumulator.java index 45d991d946e91..90dd7cb957aae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AvgAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AvgAccumulator.java @@ -211,7 +211,7 @@ public void addStatistics(Statistics statistics) { @Override public void reset() { - initResult = false; + this.initResult = false; this.countValue = 0; this.sumValue = 0.0; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/SumAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/SumAccumulator.java new file mode 100644 index 0000000000000..e76ad72d551b9 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/SumAccumulator.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.statistics.IntegerStatistics; +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.utils.RamUsageEstimator; +import org.apache.tsfile.write.UnSupportedDataTypeException; + +import static com.google.common.base.Preconditions.checkArgument; + +public class SumAccumulator implements TableAccumulator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SumAccumulator.class); + private final TSDataType argumentDataType; + private double sumValue = 0; + private boolean initResult = false; + + public SumAccumulator(TSDataType argumentDataType) { + this.argumentDataType = argumentDataType; + } + + @Override + public long getEstimatedSize() { + return INSTANCE_SIZE; + } + + @Override + public TableAccumulator copy() { + return new SumAccumulator(this.argumentDataType); + } + + @Override + public void addInput(Column[] arguments) { + checkArgument(arguments.length == 1, "argument of Sum should be one column"); + switch (argumentDataType) { + case INT32: + addIntInput(arguments[0]); + return; + case INT64: + addLongInput(arguments[0]); + return; + case FLOAT: + addFloatInput(arguments[0]); + return; + case DOUBLE: + addDoubleInput(arguments[0]); + return; + case TEXT: + case BLOB: + case STRING: + case BOOLEAN: + case DATE: + case TIMESTAMP: + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in aggregation Sum : %s", argumentDataType)); + } + } + + @Override + public void addIntermediate(Column argument) { + for (int i = 0; i < argument.getPositionCount(); i++) { + if (argument.isNull(i)) { + continue; + } + + initResult = true; + sumValue += argument.getDouble(i); + } + } + + @Override + public void evaluateIntermediate(ColumnBuilder columnBuilder) { + if (!initResult) { + columnBuilder.appendNull(); + } else { + columnBuilder.writeDouble(sumValue); + } + } + + @Override + public void evaluateFinal(ColumnBuilder columnBuilder) { + if (!initResult) { + columnBuilder.appendNull(); + } else { + columnBuilder.writeDouble(sumValue); + } + } + + @Override + public boolean hasFinalResult() { + return false; + } + + @Override + public void addStatistics(Statistics statistics) { + if (statistics == null) { + return; + } + initResult = true; + if (statistics instanceof IntegerStatistics) { + sumValue += statistics.getSumLongValue(); + } else { + sumValue += statistics.getSumDoubleValue(); + } + } + + @Override + public void reset() { + this.initResult = false; + this.sumValue = 0.0; + } + + private void addIntInput(Column column) { + int count = column.getPositionCount(); + for (int i = 0; i < count; i++) { + if (!column.isNull(i)) { + initResult = true; + sumValue += column.getInt(i); + } + } + } + + private void addLongInput(Column column) { + int count = column.getPositionCount(); + for (int i = 0; i < count; i++) { + if (!column.isNull(i)) { + initResult = true; + sumValue += column.getLong(i); + } + } + } + + private void addFloatInput(Column column) { + int count = column.getPositionCount(); + for (int i = 0; i < count; i++) { + if (!column.isNull(i)) { + initResult = true; + sumValue += column.getFloat(i); + } + } + } + + private void addDoubleInput(Column column) { + int count = column.getPositionCount(); + for (int i = 0; i < count; i++) { + if (!column.isNull(i)) { + initResult = true; + sumValue += column.getDouble(i); + } + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableBuiltinAggregationFunction.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableBuiltinAggregationFunction.java index a2b7fe8d35433..8ef20f7b91069 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableBuiltinAggregationFunction.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableBuiltinAggregationFunction.java @@ -111,6 +111,8 @@ public static boolean canUseStatistics(String name, boolean withTime) { public static List getIntermediateTypes(String name, List originalArgumentTypes) { if (COUNT.functionName.equalsIgnoreCase(name)) { return ImmutableList.of(INT64); + } else if (SUM.functionName.equalsIgnoreCase(name)) { + return ImmutableList.of(DOUBLE); } else if (AVG.functionName.equalsIgnoreCase(name)) { return ImmutableList.of(DOUBLE, INT64); } else { From e2f5a93b17a33290cf231ba761763bb9b069477b Mon Sep 17 00:00:00 2001 From: Beyyes Date: Tue, 15 Oct 2024 14:57:45 +0800 Subject: [PATCH 5/5] add more it --- .../IoTDBMultiIDsWithAttributesTableIT.java | 35 ++++++++++++------- .../org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../ITableTimeRangeIterator.java | 2 -- .../TableDateBinTimeRangeIterator.java | 26 ++++---------- .../TableSingleTimeWindowIterator.java | 26 ++------------ .../TableAggregationTableScanOperator.java | 3 +- 6 files changed, 35 insertions(+), 59 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java index cb26a2c42b85d..3d997449470fb 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java @@ -617,25 +617,26 @@ public void groupByAggregationTest() { "count_date", "count_attr1", "count_attr2", - "count_time" + "count_time", + "sum_num" }; retArray = new String[] { - "d1,l1,3,3,3,0,3,3,3,", - "d1,l2,3,3,3,0,3,3,3,", - "d1,l3,3,3,3,0,3,3,3,", - "d1,l4,3,3,3,0,0,0,3,", - "d1,l5,3,3,3,1,0,0,3,", - "d2,l1,3,3,3,0,3,3,3,", - "d2,l2,3,3,3,0,3,0,3,", - "d2,l3,3,3,3,0,0,0,3,", - "d2,l4,3,3,3,0,0,0,3,", - "d2,l5,3,3,3,1,0,0,3,", + "d1,l1,3,3,3,0,3,3,3,20.0,", + "d1,l2,3,3,3,0,3,3,3,24.0,", + "d1,l3,3,3,3,0,3,3,3,19.0,", + "d1,l4,3,3,3,0,0,0,3,27.0,", + "d1,l5,3,3,3,1,0,0,3,30.0,", + "d2,l1,3,3,3,0,3,3,3,20.0,", + "d2,l2,3,3,3,0,3,0,3,24.0,", + "d2,l3,3,3,3,0,0,0,3,19.0,", + "d2,l4,3,3,3,0,0,0,3,27.0,", + "d2,l5,3,3,3,1,0,0,3,30.0,", }; String sql = "select device, level, " + "count(num) as count_num, count(*) as count_star, count(device) as count_device, count(date) as count_date, " - + "count(attr1) as count_attr1, count(attr2) as count_attr2, count(time) as count_time " + + "count(attr1) as count_attr1, count(attr2) as count_attr2, count(time) as count_time, sum(num) as sum_num " + "from table0 group by device,level order by device, level"; tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); } @@ -763,6 +764,16 @@ public void groupByDateBinTest() { + "count(attr1) as count_attr1, count(attr2) as count_attr2, count(time) as count_time, avg(num) as avg_num " + "from table0 group by 3, device, level order by device, level, bin"; tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + + // TODO(beyyes) test below + // sql = "select count(*) from (\n" + + // "\tselect device, level, date_bin(1d, time) as bin, \n" + + // "\tcount(num) as count_num, count(*) as count_star, count(device) as count_device, + // count(date) as count_date, count(attr1) as count_attr1, count(attr2) as count_attr2, + // count(time) as count_time, avg(num) as avg_num \n" + + // "\tfrom table0 \n" + + // "\tgroup by 3, device, level order by device, level, bin\n" + + // ")\n"; } // ============================ Join Test =========================== diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 88cc634d388e9..055029b0c68c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -632,7 +632,7 @@ public class IoTDBConfig { private long cacheFileReaderClearPeriod = 100000; /** the max executing time of query in ms. Unit: millisecond */ - private long queryTimeoutThreshold = 60000000; + private long queryTimeoutThreshold = 60000; /** the max time to live of a session in ms. Unit: millisecond */ private int sessionTimeoutThreshold = 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/ITableTimeRangeIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/ITableTimeRangeIterator.java index 60c82f280d479..5fe0c0c61d73b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/ITableTimeRangeIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/ITableTimeRangeIterator.java @@ -32,8 +32,6 @@ public interface ITableTimeRangeIterator { boolean hasCachedTimeRange(); - // TimeRange nextTimeRange(); - TimeRange getCurTimeRange(); boolean canFinishCurrentTimeRange(long startTime); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableDateBinTimeRangeIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableDateBinTimeRangeIterator.java index 54286e7ea3b09..d79fc2848043c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableDateBinTimeRangeIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableDateBinTimeRangeIterator.java @@ -25,19 +25,18 @@ public class TableDateBinTimeRangeIterator implements ITableTimeRangeIterator { - DateBinFunctionColumnTransformer dateBinTransformer; + private final DateBinFunctionColumnTransformer dateBinTransformer; - boolean finished = false; + private boolean finished = false; - // left close, right open + // left close, right close private TimeRange curTimeRange; - // private boolean hasCachedTimeRange; - public TableDateBinTimeRangeIterator(DateBinFunctionColumnTransformer dateBinTransformer) { this.dateBinTransformer = dateBinTransformer; } + @Override public boolean canFinishCurrentTimeRange(long startTime) { if (curTimeRange == null) { return false; @@ -46,6 +45,7 @@ public boolean canFinishCurrentTimeRange(long startTime) { return startTime > curTimeRange.getMax(); } + @Override public void updateCurTimeRange(long startTime) { long[] timeArray = dateBinTransformer.dateBinStartEnd(startTime); @@ -59,6 +59,7 @@ public void updateCurTimeRange(long startTime) { } } + @Override public void setFinished() { this.curTimeRange = null; this.finished = true; @@ -87,19 +88,4 @@ public TimeRange getCurTimeRange() { public void resetCurTimeRange() { this.curTimeRange = null; } - - // @Override - // public TimeRange nextTimeRange() { - // if (hasCachedTimeRange) { - // return curTimeRange; - // } - // // hasCachedTimeRange = false; - // // curTimeRange = null; - // return null; - // } - - // @Override - // public boolean isAscending() { - // return false; - // } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableSingleTimeWindowIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableSingleTimeWindowIterator.java index 8fee30556632e..b3b61dbabbc1a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableSingleTimeWindowIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableSingleTimeWindowIterator.java @@ -21,22 +21,14 @@ import org.apache.tsfile.read.common.TimeRange; -/** Used for aggregation with only one time window. i.e. Aggregation without group by. */ public class TableSingleTimeWindowIterator implements ITableTimeRangeIterator { // when all devices are consumed up, finished = true boolean finished = false; - // total query [startTime, endTime) - private final long startTime; - private final long endTime; - private TimeRange curTimeRange; - private boolean hasCachedTimeRange; public TableSingleTimeWindowIterator(long startTime, long endTime) { - this.startTime = startTime; - this.endTime = endTime; curTimeRange = new TimeRange(startTime, endTime); } @@ -60,22 +52,15 @@ public TimeRange getCurTimeRange() { return curTimeRange; } - // @Override - // public TimeRange nextTimeRange() { - // if (hasCachedTimeRange || hasNextTimeRange()) { - // hasCachedTimeRange = false; - // return curTimeRange; - // } - // return null; - // } - @Override public boolean canFinishCurrentTimeRange(long startTime) { return false; } @Override - public void resetCurTimeRange() {} + public void resetCurTimeRange() { + // do nothing + } @Override public void updateCurTimeRange(long startTime) { @@ -87,9 +72,4 @@ public void setFinished() { this.curTimeRange = null; this.finished = true; } - - // @Override - // public boolean isAscending() { - // return false; - // } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java index b29ce52c9c79d..e0f71e62c0656 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java @@ -197,8 +197,9 @@ public TsBlock next() throws Exception { // optimize for sql: select count(*) from (select count(s1), sum(s1) from table) if (tableAggregators.isEmpty()) { - retainedTsBlock = null; + resultTsBlockBuilder.reset(); currentDeviceIndex = deviceCount; + timeIterator.setFinished(); Column[] valueColumns = new Column[0]; return new TsBlock(1, new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 1), valueColumns); }