diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java index 8933d848159af..3d997449470fb 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java @@ -110,8 +110,8 @@ public class IoTDBMultiIDsWithAttributesTableIT { }; String[] expectedHeader; - String[] retArray; + String sql; @BeforeClass public static void setUp() throws Exception { @@ -605,6 +605,177 @@ public void countStarTest() { // TODO select count(*),count(t1) from (select avg(num+1) as t1 from table0) where time < 0 } + @Test + public void groupByAggregationTest() { + expectedHeader = + new String[] { + "device", + "level", + "count_num", + "count_star", + "count_device", + "count_date", + "count_attr1", + "count_attr2", + "count_time", + "sum_num" + }; + retArray = + new String[] { + "d1,l1,3,3,3,0,3,3,3,20.0,", + "d1,l2,3,3,3,0,3,3,3,24.0,", + "d1,l3,3,3,3,0,3,3,3,19.0,", + "d1,l4,3,3,3,0,0,0,3,27.0,", + "d1,l5,3,3,3,1,0,0,3,30.0,", + "d2,l1,3,3,3,0,3,3,3,20.0,", + "d2,l2,3,3,3,0,3,0,3,24.0,", + "d2,l3,3,3,3,0,0,0,3,19.0,", + "d2,l4,3,3,3,0,0,0,3,27.0,", + "d2,l5,3,3,3,1,0,0,3,30.0,", + }; + String sql = + "select device, level, " + + "count(num) as count_num, count(*) as count_star, count(device) as count_device, count(date) as count_date, " + + "count(attr1) as count_attr1, count(attr2) as count_attr2, count(time) as count_time, sum(num) as sum_num " + + "from table0 group by device,level order by device, level"; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + } + + @Test + public void groupByDateBinTest() { + expectedHeader = + new String[] { + "device", + "level", + "bin", + "count_num", + "count_star", + "count_device", + "count_date", + "count_attr1", + "count_attr2", + "count_time", + "avg_num" + }; + retArray = + new String[] { + "d1,l1,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,3.0,", + "d1,l1,1971-01-01T00:00:00.000Z,2,2,2,0,2,2,2,8.5,", + "d1,l2,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,2.0,", + "d1,l2,1971-01-01T00:00:00.000Z,2,2,2,0,2,2,2,11.0,", + "d1,l3,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,1.0,", + "d1,l3,1971-01-01T00:00:00.000Z,2,2,2,0,2,2,2,9.0,", + "d1,l4,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,9.0,", + "d1,l4,1971-01-01T00:00:00.000Z,2,2,2,0,0,0,2,9.0,", + "d1,l5,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,8.0,", + "d1,l5,1971-01-01T00:00:00.000Z,2,2,2,1,0,0,2,11.0,", + "d2,l1,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,3.0,", + "d2,l1,1971-01-01T00:00:00.000Z,2,2,2,0,2,2,2,8.5,", + "d2,l2,1970-01-01T00:00:00.000Z,1,1,1,0,1,0,1,2.0,", + "d2,l2,1971-01-01T00:00:00.000Z,2,2,2,0,2,0,2,11.0,", + "d2,l3,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,1.0,", + "d2,l3,1971-01-01T00:00:00.000Z,2,2,2,0,0,0,2,9.0,", + "d2,l4,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,9.0,", + "d2,l4,1971-01-01T00:00:00.000Z,2,2,2,0,0,0,2,9.0,", + "d2,l5,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,8.0,", + "d2,l5,1971-01-01T00:00:00.000Z,2,2,2,1,0,0,2,11.0,", + }; + sql = + "select device, level, date_bin(1y, time) as bin," + + "count(num) as count_num, count(*) as count_star, count(device) as count_device, count(date) as count_date, " + + "count(attr1) as count_attr1, count(attr2) as count_attr2, count(time) as count_time, avg(num) as avg_num " + + "from table0 group by 3, device, level order by device, level, bin"; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + + retArray = + new String[] { + "d1,l1,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,3.0,", + "d1,l1,1971-01-01T00:00:00.000Z,2,2,2,0,2,2,2,8.5,", + "d1,l2,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,2.0,", + "d1,l2,1971-01-01T00:00:00.000Z,1,1,1,0,1,1,1,10.0,", + "d1,l2,1971-04-26T00:00:00.000Z,1,1,1,0,1,1,1,12.0,", + "d1,l3,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,1.0,", + "d1,l3,1971-01-01T00:00:00.000Z,1,1,1,0,1,1,1,4.0,", + "d1,l3,1971-04-26T00:00:00.000Z,1,1,1,0,1,1,1,14.0,", + "d1,l4,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,9.0,", + "d1,l4,1971-01-01T00:00:00.000Z,1,1,1,0,0,0,1,5.0,", + "d1,l4,1971-04-26T00:00:00.000Z,1,1,1,0,0,0,1,13.0,", + "d1,l5,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,8.0,", + "d1,l5,1971-01-01T00:00:00.000Z,1,1,1,0,0,0,1,7.0,", + "d1,l5,1971-08-20T00:00:00.000Z,1,1,1,1,0,0,1,15.0,", + "d2,l1,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,3.0,", + "d2,l1,1971-01-01T00:00:00.000Z,2,2,2,0,2,2,2,8.5,", + "d2,l2,1970-01-01T00:00:00.000Z,1,1,1,0,1,0,1,2.0,", + "d2,l2,1971-01-01T00:00:00.000Z,1,1,1,0,1,0,1,10.0,", + "d2,l2,1971-04-26T00:00:00.000Z,1,1,1,0,1,0,1,12.0,", + "d2,l3,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,1.0,", + "d2,l3,1971-01-01T00:00:00.000Z,1,1,1,0,0,0,1,4.0,", + "d2,l3,1971-04-26T00:00:00.000Z,1,1,1,0,0,0,1,14.0,", + "d2,l4,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,9.0,", + "d2,l4,1971-01-01T00:00:00.000Z,1,1,1,0,0,0,1,5.0,", + "d2,l4,1971-04-26T00:00:00.000Z,1,1,1,0,0,0,1,13.0,", + "d2,l5,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,8.0,", + "d2,l5,1971-01-01T00:00:00.000Z,1,1,1,0,0,0,1,7.0,", + "d2,l5,1971-08-20T00:00:00.000Z,1,1,1,1,0,0,1,15.0,", + }; + sql = + "select device, level, date_bin(1d, time) as bin," + + "count(num) as count_num, count(*) as count_star, count(device) as count_device, count(date) as count_date, " + + "count(attr1) as count_attr1, count(attr2) as count_attr2, count(time) as count_time, avg(num) as avg_num " + + "from table0 group by 3, device, level order by device, level, bin"; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + + retArray = + new String[] { + "d1,l1,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,3.0,", + "d1,l1,1971-01-01T00:00:00.000Z,1,1,1,0,1,1,1,6.0,", + "d1,l1,1971-01-01T00:01:40.000Z,1,1,1,0,1,1,1,11.0,", + "d1,l2,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,2.0,", + "d1,l2,1971-01-01T00:00:00.000Z,1,1,1,0,1,1,1,10.0,", + "d1,l2,1971-04-26T17:46:40.000Z,1,1,1,0,1,1,1,12.0,", + "d1,l3,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,1.0,", + "d1,l3,1971-01-01T00:00:00.000Z,1,1,1,0,1,1,1,4.0,", + "d1,l3,1971-04-26T17:46:40.000Z,1,1,1,0,1,1,1,14.0,", + "d1,l4,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,9.0,", + "d1,l4,1971-01-01T00:00:01.000Z,1,1,1,0,0,0,1,5.0,", + "d1,l4,1971-04-26T18:01:40.000Z,1,1,1,0,0,0,1,13.0,", + "d1,l5,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,8.0,", + "d1,l5,1971-01-01T00:00:10.000Z,1,1,1,0,0,0,1,7.0,", + "d1,l5,1971-08-20T11:33:20.000Z,1,1,1,1,0,0,1,15.0,", + "d2,l1,1970-01-01T00:00:00.000Z,1,1,1,0,1,1,1,3.0,", + "d2,l1,1971-01-01T00:00:00.000Z,1,1,1,0,1,1,1,6.0,", + "d2,l1,1971-01-01T00:01:40.000Z,1,1,1,0,1,1,1,11.0,", + "d2,l2,1970-01-01T00:00:00.000Z,1,1,1,0,1,0,1,2.0,", + "d2,l2,1971-01-01T00:00:00.000Z,1,1,1,0,1,0,1,10.0,", + "d2,l2,1971-04-26T17:46:40.000Z,1,1,1,0,1,0,1,12.0,", + "d2,l3,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,1.0,", + "d2,l3,1971-01-01T00:00:00.000Z,1,1,1,0,0,0,1,4.0,", + "d2,l3,1971-04-26T17:46:40.000Z,1,1,1,0,0,0,1,14.0,", + "d2,l4,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,9.0,", + "d2,l4,1971-01-01T00:00:01.000Z,1,1,1,0,0,0,1,5.0,", + "d2,l4,1971-04-26T18:01:40.000Z,1,1,1,0,0,0,1,13.0,", + "d2,l5,1970-01-01T00:00:00.000Z,1,1,1,0,0,0,1,8.0,", + "d2,l5,1971-01-01T00:00:10.000Z,1,1,1,0,0,0,1,7.0,", + "d2,l5,1971-08-20T11:33:20.000Z,1,1,1,1,0,0,1,15.0,", + }; + sql = + "select device, level, date_bin(1s, time) as bin," + + "count(num) as count_num, count(*) as count_star, count(device) as count_device, count(date) as count_date, " + + "count(attr1) as count_attr1, count(attr2) as count_attr2, count(time) as count_time, avg(num) as avg_num " + + "from table0 group by 3, device, level order by device, level, bin"; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + + // TODO(beyyes) test below + // sql = "select count(*) from (\n" + + // "\tselect device, level, date_bin(1d, time) as bin, \n" + + // "\tcount(num) as count_num, count(*) as count_star, count(device) as count_device, + // count(date) as count_date, count(attr1) as count_attr1, count(attr2) as count_attr2, + // count(time) as count_time, avg(num) as avg_num \n" + + // "\tfrom table0 \n" + + // "\tgroup by 3, device, level order by device, level, bin\n" + + // ")\n"; + } + // ============================ Join Test =========================== // no filter @Test diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/ITableTimeRangeIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/ITableTimeRangeIterator.java new file mode 100644 index 0000000000000..5fe0c0c61d73b --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/ITableTimeRangeIterator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator; + +import org.apache.tsfile.read.common.TimeRange; + +public interface ITableTimeRangeIterator { + + TimeIteratorType getType(); + + /** + * @return whether current iterator has next time range. + */ + boolean hasNextTimeRange(); + + boolean hasCachedTimeRange(); + + TimeRange getCurTimeRange(); + + boolean canFinishCurrentTimeRange(long startTime); + + void resetCurTimeRange(); + + void updateCurTimeRange(long startTime); + + void setFinished(); + + enum TimeIteratorType { + DATE_BIN_TIME_ITERATOR, + SINGLE_TIME_ITERATOR + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableDateBinTimeRangeIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableDateBinTimeRangeIterator.java new file mode 100644 index 0000000000000..d79fc2848043c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableDateBinTimeRangeIterator.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator; + +import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.DateBinFunctionColumnTransformer; + +import org.apache.tsfile.read.common.TimeRange; + +public class TableDateBinTimeRangeIterator implements ITableTimeRangeIterator { + + private final DateBinFunctionColumnTransformer dateBinTransformer; + + private boolean finished = false; + + // left close, right close + private TimeRange curTimeRange; + + public TableDateBinTimeRangeIterator(DateBinFunctionColumnTransformer dateBinTransformer) { + this.dateBinTransformer = dateBinTransformer; + } + + @Override + public boolean canFinishCurrentTimeRange(long startTime) { + if (curTimeRange == null) { + return false; + } + + return startTime > curTimeRange.getMax(); + } + + @Override + public void updateCurTimeRange(long startTime) { + long[] timeArray = dateBinTransformer.dateBinStartEnd(startTime); + + if (curTimeRange != null) { + // meet new time range, remove old time range + if (timeArray[0] != curTimeRange.getMin()) { + this.curTimeRange = new TimeRange(timeArray[0], timeArray[1] - 1); + } + } else { + this.curTimeRange = new TimeRange(timeArray[0], timeArray[1] - 1); + } + } + + @Override + public void setFinished() { + this.curTimeRange = null; + this.finished = true; + } + + @Override + public TimeIteratorType getType() { + return TimeIteratorType.DATE_BIN_TIME_ITERATOR; + } + + @Override + public boolean hasNextTimeRange() { + return !finished; + } + + @Override + public boolean hasCachedTimeRange() { + return curTimeRange != null; + } + + public TimeRange getCurTimeRange() { + return this.curTimeRange; + } + + @Override + public void resetCurTimeRange() { + this.curTimeRange = null; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableSingleTimeWindowIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableSingleTimeWindowIterator.java new file mode 100644 index 0000000000000..b3b61dbabbc1a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/timerangeiterator/TableSingleTimeWindowIterator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator; + +import org.apache.tsfile.read.common.TimeRange; + +public class TableSingleTimeWindowIterator implements ITableTimeRangeIterator { + + // when all devices are consumed up, finished = true + boolean finished = false; + + private TimeRange curTimeRange; + + public TableSingleTimeWindowIterator(long startTime, long endTime) { + curTimeRange = new TimeRange(startTime, endTime); + } + + @Override + public TimeIteratorType getType() { + return TimeIteratorType.SINGLE_TIME_ITERATOR; + } + + @Override + public boolean hasNextTimeRange() { + return !finished; + } + + @Override + public boolean hasCachedTimeRange() { + return curTimeRange != null; + } + + @Override + public TimeRange getCurTimeRange() { + return curTimeRange; + } + + @Override + public boolean canFinishCurrentTimeRange(long startTime) { + return false; + } + + @Override + public void resetCurTimeRange() { + // do nothing + } + + @Override + public void updateCurTimeRange(long startTime) { + // not used + } + + @Override + public void setFinished() { + this.curTimeRange = null; + this.finished = true; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java index 1718f460e58d7..e0f71e62c0656 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java @@ -22,7 +22,7 @@ import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; -import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITimeRangeIterator; +import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil; @@ -67,6 +67,8 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregationScanOperator { + // TODO(beyyes) variable groupBy is no need in table model + private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(TableAggregationTableScanOperator.class); @@ -105,6 +107,8 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation private int currentDeviceIndex; + ITableTimeRangeIterator timeIterator; + public TableAggregationTableScanOperator( PlanNodeId sourceId, OperatorContext context, @@ -120,7 +124,7 @@ public TableAggregationTableScanOperator( List tableAggregators, List groupingKeySchemas, int[] groupingKeyIndex, - ITimeRangeIterator timeRangeIterator, + ITableTimeRangeIterator tableTimeRangeIterator, boolean ascending, GroupByTimeParameter groupByTimeParameter, long maxReturnSize, @@ -133,7 +137,7 @@ public TableAggregationTableScanOperator( null, subSensorSize, null, - timeRangeIterator, + null, ascending, false, groupByTimeParameter, @@ -158,6 +162,11 @@ public TableAggregationTableScanOperator( measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); this.currentDeviceIndex = 0; this.layoutArray = layoutArray; + this.timeIterator = tableTimeRangeIterator; + if (tableTimeRangeIterator.getType() + == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) { + curTimeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE); + } this.maxReturnSize = maxReturnSize; this.maxTsBlockLineNum = maxTsBlockLineNum; @@ -165,9 +174,22 @@ public TableAggregationTableScanOperator( this.seriesScanUtil = constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex)); } + @Override + public boolean isFinished() throws Exception { + if (!finished) { + finished = !hasNextWithTimer(); + } + return finished; + + // return (retainedTsBlock == null) + // && (currentDeviceIndex >= deviceCount || seriesScanOptions.limitConsumedUp()); + } + @Override public boolean hasNext() throws Exception { - return !isFinished(); + return timeIterator.hasCachedTimeRange() + || timeIterator.hasNextTimeRange() + || !resultTsBlockBuilder.isEmpty(); } @Override @@ -175,78 +197,64 @@ public TsBlock next() throws Exception { // optimize for sql: select count(*) from (select count(s1), sum(s1) from table) if (tableAggregators.isEmpty()) { - retainedTsBlock = null; + resultTsBlockBuilder.reset(); currentDeviceIndex = deviceCount; + timeIterator.setFinished(); Column[] valueColumns = new Column[0]; return new TsBlock(1, new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 1), valueColumns); } // start stopwatch, reset leftRuntimeOfOneNextCall long start = System.nanoTime(); - leftRuntimeOfOneNextCall = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + leftRuntimeOfOneNextCall = 1000 * operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); long maxRuntime = leftRuntimeOfOneNextCall; while (System.nanoTime() - start < maxRuntime - && (curTimeRange != null || timeRangeIterator.hasNextTimeRange()) + && (timeIterator.hasCachedTimeRange() || timeIterator.hasNextTimeRange()) && !resultTsBlockBuilder.isFull()) { - if (curTimeRange == null) { - // move to the next time window - curTimeRange = timeRangeIterator.nextTimeRange(); - // clear previous aggregation result - for (TableAggregator aggregator : tableAggregators) { - aggregator.reset(); - } - } + + // if (curTimeRange == null) { + // curTimeRange = tableTimeRangeIterator.nextTimeRange(); + // resetTableAggregators(); + // } // calculate aggregation result on current time window - // Keep curTimeRange if the calculation of this timeRange is not done + // return true if current time window is calc finished if (calculateAggregationResultForCurrentTimeRange()) { - // updateResultTsBlock(); - curTimeRange = null; + timeIterator.resetCurTimeRange(); + // curTimeRange = null; } } if (resultTsBlockBuilder.getPositionCount() > 0) { - int declaredPositions = resultTsBlockBuilder.getPositionCount(); - ColumnBuilder[] valueColumnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); - Column[] valueColumns = new Column[valueColumnBuilders.length]; - for (int i = 0; i < valueColumns.length; i++) { - valueColumns[i] = valueColumnBuilders[i].build(); - if (valueColumns[i].getPositionCount() != declaredPositions) { - throw new IllegalStateException( - format( - "Declared positions (%s) does not match column %s's number of entries (%s)", - declaredPositions, i, valueColumns[i].getPositionCount())); - } - } - - this.resultTsBlock = - new TsBlock( - resultTsBlockBuilder.getPositionCount(), - new RunLengthEncodedColumn( - TIME_COLUMN_TEMPLATE, resultTsBlockBuilder.getPositionCount()), - valueColumns); - resultTsBlockBuilder.reset(); - return this.resultTsBlock; + return buildResultTsBlock(); } else { return null; } } - @Override - public boolean isFinished() throws Exception { - return (retainedTsBlock == null) - && (currentDeviceIndex >= deviceCount || seriesScanOptions.limitConsumedUp()); - } + private TsBlock buildResultTsBlock() { + int declaredPositions = resultTsBlockBuilder.getPositionCount(); + ColumnBuilder[] valueColumnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); + Column[] valueColumns = new Column[valueColumnBuilders.length]; + for (int i = 0; i < valueColumns.length; i++) { + valueColumns[i] = valueColumnBuilders[i].build(); + if (valueColumns[i].getPositionCount() != declaredPositions) { + throw new IllegalStateException( + format( + "Declared positions (%s) does not match column %s's number of entries (%s)", + declaredPositions, i, valueColumns[i].getPositionCount())); + } + } - @Override - public long ramBytesUsed() { - return INSTANCE_SIZE - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil) - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId) - + (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes()) - + RamUsageEstimator.sizeOfCollection(deviceEntries); + TsBlock resultTsBlock = + new TsBlock( + resultTsBlockBuilder.getPositionCount(), + new RunLengthEncodedColumn( + TIME_COLUMN_TEMPLATE, resultTsBlockBuilder.getPositionCount()), + valueColumns); + resultTsBlockBuilder.reset(); + return resultTsBlock; } private AlignedSeriesScanUtil constructAlignedSeriesScanUtil(DeviceEntry deviceEntry) { @@ -296,37 +304,35 @@ && readAndCalcFromFile()) { || seriesScanUtil.hasNextFile()) { return false; } else { + // all data of current device has been consumed updateResultTsBlock(); - for (TableAggregator aggregator : tableAggregators) { - aggregator.reset(); - } + timeIterator.resetCurTimeRange(); currentDeviceIndex++; } + if (currentDeviceIndex < deviceCount) { // construct AlignedSeriesScanUtil for next device this.seriesScanUtil = constructAlignedSeriesScanUtil(deviceEntries.get(currentDeviceIndex)); - - // reset QueryDataSource queryDataSource.reset(); this.seriesScanUtil.initQueryDataSource(queryDataSource); } - return currentDeviceIndex >= deviceCount; + + if (currentDeviceIndex >= deviceCount) { + // all devices have been consumed + timeIterator.setFinished(); + return true; + } else { + return false; + } } catch (IOException e) { throw new RuntimeException("Error while scanning the file", e); } } - @Override - public void initQueryDataSource(IQueryDataSource dataSource) { - this.queryDataSource = (QueryDataSource) dataSource; - this.seriesScanUtil.initQueryDataSource(queryDataSource); - this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); - this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); - } - - @Override protected void updateResultTsBlock() { appendAggregationResult(resultTsBlockBuilder, tableAggregators); + // after appendAggregationResult invoked, aggregators must be cleared + resetTableAggregators(); } protected boolean calcFromCachedData() { @@ -335,7 +341,7 @@ protected boolean calcFromCachedData() { private boolean calcFromRawData(TsBlock tsBlock) { Pair calcResult = - calculateAggregationFromRawData(tsBlock, tableAggregators, curTimeRange, ascending); + calculateAggregationFromRawData(tsBlock, tableAggregators, ascending); inputTsBlock = calcResult.getRight(); return calcResult.getLeft(); } @@ -347,14 +353,14 @@ private boolean calcFromRawData(TsBlock tsBlock) { * remaining tsBlock */ public Pair calculateAggregationFromRawData( - TsBlock inputTsBlock, - List aggregators, - TimeRange curTimeRange, - boolean ascending) { + TsBlock inputTsBlock, List aggregators, boolean ascending) { if (inputTsBlock == null || inputTsBlock.isEmpty()) { return new Pair<>(false, inputTsBlock); } + updateCurTimeRange(inputTsBlock.getStartTime()); + + TimeRange curTimeRange = timeIterator.getCurTimeRange(); // check if the tsBlock does not contain points in current interval if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) { // skip points that cannot be calculated @@ -390,6 +396,25 @@ private TsBlock process( } TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1); + + // TODO(beyyes) add optimization: if only agg measurement columns, no need to transfer + Column[] valueColumns = new Column[tableAggregators.size()]; + for (int i = 0; i < tableAggregators.size(); i++) { + ColumnSchema columnSchema = columnSchemas.get(layoutArray[i]); + if (Streams.of( + TsTableColumnCategory.ID, TsTableColumnCategory.ATTRIBUTE, TsTableColumnCategory.TIME) + .anyMatch(columnSchema.getColumnCategory()::equals)) { + valueColumns[i] = inputRegion.getTimeColumn(); + } else { + valueColumns[i] = inputRegion.getColumn(columnsIndexArray[i]); + } + } + TsBlock tsBlock = + new TsBlock( + inputRegion.getPositionCount(), + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, inputRegion.getPositionCount()), + valueColumns); + for (int i = 0; i < aggregators.size(); i++) { if (isNullIdOrAttribute(i)) { continue; @@ -401,7 +426,7 @@ private TsBlock process( continue; } - aggregator.processBlock(inputRegion); + aggregator.processBlock(tsBlock); } int lastReadRowIndex = lastIndexToProcess + 1; if (lastReadRowIndex >= inputTsBlock.getPositionCount()) { @@ -426,15 +451,6 @@ private boolean isNullIdOrAttribute(int i) { == null; } - public static boolean isAllAggregatorsHasFinalResult(List aggregators) { - for (TableAggregator aggregator : aggregators) { - if (!aggregator.hasFinalResult()) { - return false; - } - } - return true; - } - protected void calcFromStatistics(Statistics timeStatistics, Statistics[] valueStatistics) { for (int i = 0; i < tableAggregators.size(); i++) { if (isNullIdOrAttribute(i)) { @@ -459,7 +475,10 @@ protected boolean readAndCalcFromFile() throws IOException { while (System.nanoTime() - start < leftRuntimeOfOneNextCall && seriesScanUtil.hasNextFile()) { if (canUseStatistics && seriesScanUtil.canUseCurrentFileStatistics()) { Statistics fileTimeStatistics = seriesScanUtil.currentFileTimeStatistics(); - if (fileTimeStatistics.getStartTime() > curTimeRange.getMax()) { + + updateCurTimeRange(fileTimeStatistics.getStartTime()); + + if (fileTimeStatistics.getStartTime() > timeIterator.getCurTimeRange().getMax()) { if (ascending) { return true; } else { @@ -467,16 +486,18 @@ protected boolean readAndCalcFromFile() throws IOException { continue; } } + // calc from fileMetaData - if (curTimeRange.contains( - fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) { + if (timeIterator + .getCurTimeRange() + .contains(fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) { Statistics[] statisticsList = new Statistics[subSensorSize]; for (int i = 0; i < subSensorSize; i++) { statisticsList[i] = seriesScanUtil.currentFileStatistics(i); } calcFromStatistics(fileTimeStatistics, statisticsList); seriesScanUtil.skipCurrentFile(); - if (isAllAggregatorsHasFinalResult(tableAggregators) && !isGroupByQuery) { + if (isAllAggregatorsHasFinalResult(tableAggregators)) { return true; } else { continue; @@ -500,7 +521,10 @@ protected boolean readAndCalcFromChunk() throws IOException { while (System.nanoTime() - start < leftRuntimeOfOneNextCall && seriesScanUtil.hasNextChunk()) { if (canUseStatistics && seriesScanUtil.canUseCurrentChunkStatistics()) { Statistics chunkTimeStatistics = seriesScanUtil.currentChunkTimeStatistics(); - if (chunkTimeStatistics.getStartTime() > curTimeRange.getMax()) { + + updateCurTimeRange(chunkTimeStatistics.getStartTime()); + + if (chunkTimeStatistics.getStartTime() > timeIterator.getCurTimeRange().getMax()) { if (ascending) { return true; } else { @@ -508,9 +532,11 @@ protected boolean readAndCalcFromChunk() throws IOException { continue; } } + // calc from chunkMetaData - if (curTimeRange.contains( - chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) { + if (timeIterator + .getCurTimeRange() + .contains(chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) { // calc from chunkMetaData Statistics[] statisticsList = new Statistics[subSensorSize]; for (int i = 0; i < subSensorSize; i++) { @@ -518,7 +544,7 @@ protected boolean readAndCalcFromChunk() throws IOException { } calcFromStatistics(chunkTimeStatistics, statisticsList); seriesScanUtil.skipCurrentChunk(); - if (isAllAggregatorsHasFinalResult(tableAggregators) && !isGroupByQuery) { + if (isAllAggregatorsHasFinalResult(tableAggregators)) { return true; } else { continue; @@ -543,8 +569,12 @@ protected boolean readAndCalcFromPage() throws IOException { while (System.nanoTime() - start < leftRuntimeOfOneNextCall && seriesScanUtil.hasNextPage()) { if (canUseStatistics && seriesScanUtil.canUseCurrentPageStatistics()) { Statistics pageTimeStatistics = seriesScanUtil.currentPageTimeStatistics(); + + updateCurTimeRange(pageTimeStatistics.getStartTime()); + // There is no more eligible points in current time range - if (pageTimeStatistics.getStartTime() > curTimeRange.getMax()) { + // TODO(beyyes) will not appear in table model? + if (pageTimeStatistics.getStartTime() > timeIterator.getCurTimeRange().getMax()) { if (ascending) { return true; } else { @@ -552,16 +582,18 @@ protected boolean readAndCalcFromPage() throws IOException { continue; } } + // can use pageHeader - if (curTimeRange.contains( - pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) { + if (timeIterator + .getCurTimeRange() + .contains(pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) { Statistics[] statisticsList = new Statistics[subSensorSize]; for (int i = 0; i < subSensorSize; i++) { statisticsList[i] = seriesScanUtil.currentPageStatistics(i); } calcFromStatistics(pageTimeStatistics, statisticsList); seriesScanUtil.skipCurrentPage(); - if (isAllAggregatorsHasFinalResult(tableAggregators) && !isGroupByQuery) { + if (isAllAggregatorsHasFinalResult(tableAggregators)) { return true; } else { continue; @@ -574,57 +606,32 @@ protected boolean readAndCalcFromPage() throws IOException { if (originalTsBlock == null) { continue; } - // TODO(beyyes) add optimization: if only agg measurement columns, no need to transfer - Column[] valueColumns = new Column[tableAggregators.size()]; - for (int i = 0; i < tableAggregators.size(); i++) { - ColumnSchema columnSchema = columnSchemas.get(layoutArray[i]); - if (Streams.of( - TsTableColumnCategory.ID, - TsTableColumnCategory.ATTRIBUTE, - TsTableColumnCategory.TIME) - .anyMatch(columnSchema.getColumnCategory()::equals)) { - valueColumns[i] = originalTsBlock.getTimeColumn(); - } else { - valueColumns[i] = originalTsBlock.getColumn(i); - } - } - TsBlock tsBlock = - new TsBlock( - originalTsBlock.getPositionCount(), - new RunLengthEncodedColumn( - TIME_COLUMN_TEMPLATE, originalTsBlock.getPositionCount()), - valueColumns); - if (tsBlock.isEmpty()) { - continue; - } // calc from raw data - if (calcFromRawData(tsBlock)) { + if (calcFromRawData(originalTsBlock)) { return true; } } + return false; } finally { leftRuntimeOfOneNextCall -= (System.nanoTime() - start); } } - @Override - protected List getResultDataTypes() { - List resultDataTypes = - new ArrayList<>( - (groupingKeyIndex != null ? groupingKeyIndex.length : 0) + tableAggregators.size()); - if (groupingKeyIndex != null) { - for (int i = 0; i < groupingKeyIndex.length; i++) { - resultDataTypes.add(TSDataType.TEXT); - } + private void updateCurTimeRange(long startTime) { + if (timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) { + return; } - for (TableAggregator aggregator : tableAggregators) { - resultDataTypes.add(aggregator.getType()); + if (!timeIterator.hasCachedTimeRange()) { + timeIterator.updateCurTimeRange(startTime); + } else if (timeIterator.canFinishCurrentTimeRange(startTime)) { + updateResultTsBlock(); + timeIterator.resetCurTimeRange(); + timeIterator.updateCurTimeRange(startTime); + resetTableAggregators(); } - - return resultDataTypes; } /** Append a row of aggregation results to the result tsBlock. */ @@ -632,12 +639,22 @@ public void appendAggregationResult( TsBlockBuilder tsBlockBuilder, List aggregators) { ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders(); + int groupKeySize = groupingKeySchemas == null ? 0 : groupingKeySchemas.size(); + int dateBinSize = + timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR + ? 1 + : 0; + if (groupingKeyIndex != null) { - for (int i = 0; i < groupingKeyIndex.length; i++) { + for (int i = 0; i < groupKeySize; i++) { if (TsTableColumnCategory.ID == groupingKeySchemas.get(i).getColumnCategory()) { columnBuilders[i].writeBinary( - (Binary) - deviceEntries.get(currentDeviceIndex).getNthSegment(groupingKeyIndex[i] + 1)); + new Binary( + ((String) + deviceEntries + .get(currentDeviceIndex) + .getNthSegment(groupingKeyIndex[i] + 1)) + .getBytes())); } else { columnBuilders[i].writeBinary( new Binary( @@ -650,12 +667,70 @@ public void appendAggregationResult( } } - int groupKeyLength = groupingKeyIndex == null ? 0 : groupingKeyIndex.length; + if (dateBinSize > 0) { + columnBuilders[groupKeySize].writeLong(timeIterator.getCurTimeRange().getMin()); + } for (int i = 0; i < aggregators.size(); i++) { - aggregators.get(groupKeyLength + i).evaluate(columnBuilders[groupKeyLength + i]); + aggregators.get(i).evaluate(columnBuilders[groupKeySize + dateBinSize + i]); } tsBlockBuilder.declarePosition(); } + + public static boolean isAllAggregatorsHasFinalResult(List aggregators) { + for (TableAggregator aggregator : aggregators) { + if (!aggregator.hasFinalResult()) { + return false; + } + } + return true; + } + + private void resetTableAggregators() { + tableAggregators.forEach(TableAggregator::reset); + } + + @Override + protected List getResultDataTypes() { + int groupingKeySize = groupingKeySchemas != null ? groupingKeySchemas.size() : 0; + int dateBinSize = + timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR + ? 1 + : 0; + List resultDataTypes = + new ArrayList<>(groupingKeySize + dateBinSize + tableAggregators.size()); + + if (groupingKeySchemas != null) { + for (int i = 0; i < groupingKeySchemas.size(); i++) { + resultDataTypes.add(TSDataType.STRING); + } + } + if (dateBinSize > 0) { + resultDataTypes.add(TSDataType.TIMESTAMP); + } + for (TableAggregator aggregator : tableAggregators) { + resultDataTypes.add(aggregator.getType()); + } + + return resultDataTypes; + } + + @Override + public void initQueryDataSource(IQueryDataSource dataSource) { + this.queryDataSource = (QueryDataSource) dataSource; + this.seriesScanUtil.initQueryDataSource(queryDataSource); + this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); + this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId) + + (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes()) + + RamUsageEstimator.sizeOfCollection(deviceEntries); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java index d3775df5f1273..b35f1f091ba07 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java @@ -60,6 +60,8 @@ public static TableAccumulator createBuiltinAccumulator( return new CountAccumulator(); case AVG: return new AvgAccumulator(inputDataTypes.get(0)); + case SUM: + return new SumAccumulator(inputDataTypes.get(0)); default: throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AvgAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AvgAccumulator.java index 45d991d946e91..90dd7cb957aae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AvgAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AvgAccumulator.java @@ -211,7 +211,7 @@ public void addStatistics(Statistics statistics) { @Override public void reset() { - initResult = false; + this.initResult = false; this.countValue = 0; this.sumValue = 0.0; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/SumAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/SumAccumulator.java new file mode 100644 index 0000000000000..e76ad72d551b9 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/SumAccumulator.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.statistics.IntegerStatistics; +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.utils.RamUsageEstimator; +import org.apache.tsfile.write.UnSupportedDataTypeException; + +import static com.google.common.base.Preconditions.checkArgument; + +public class SumAccumulator implements TableAccumulator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SumAccumulator.class); + private final TSDataType argumentDataType; + private double sumValue = 0; + private boolean initResult = false; + + public SumAccumulator(TSDataType argumentDataType) { + this.argumentDataType = argumentDataType; + } + + @Override + public long getEstimatedSize() { + return INSTANCE_SIZE; + } + + @Override + public TableAccumulator copy() { + return new SumAccumulator(this.argumentDataType); + } + + @Override + public void addInput(Column[] arguments) { + checkArgument(arguments.length == 1, "argument of Sum should be one column"); + switch (argumentDataType) { + case INT32: + addIntInput(arguments[0]); + return; + case INT64: + addLongInput(arguments[0]); + return; + case FLOAT: + addFloatInput(arguments[0]); + return; + case DOUBLE: + addDoubleInput(arguments[0]); + return; + case TEXT: + case BLOB: + case STRING: + case BOOLEAN: + case DATE: + case TIMESTAMP: + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in aggregation Sum : %s", argumentDataType)); + } + } + + @Override + public void addIntermediate(Column argument) { + for (int i = 0; i < argument.getPositionCount(); i++) { + if (argument.isNull(i)) { + continue; + } + + initResult = true; + sumValue += argument.getDouble(i); + } + } + + @Override + public void evaluateIntermediate(ColumnBuilder columnBuilder) { + if (!initResult) { + columnBuilder.appendNull(); + } else { + columnBuilder.writeDouble(sumValue); + } + } + + @Override + public void evaluateFinal(ColumnBuilder columnBuilder) { + if (!initResult) { + columnBuilder.appendNull(); + } else { + columnBuilder.writeDouble(sumValue); + } + } + + @Override + public boolean hasFinalResult() { + return false; + } + + @Override + public void addStatistics(Statistics statistics) { + if (statistics == null) { + return; + } + initResult = true; + if (statistics instanceof IntegerStatistics) { + sumValue += statistics.getSumLongValue(); + } else { + sumValue += statistics.getSumDoubleValue(); + } + } + + @Override + public void reset() { + this.initResult = false; + this.sumValue = 0.0; + } + + private void addIntInput(Column column) { + int count = column.getPositionCount(); + for (int i = 0; i < count; i++) { + if (!column.isNull(i)) { + initResult = true; + sumValue += column.getInt(i); + } + } + } + + private void addLongInput(Column column) { + int count = column.getPositionCount(); + for (int i = 0; i < count; i++) { + if (!column.isNull(i)) { + initResult = true; + sumValue += column.getLong(i); + } + } + } + + private void addFloatInput(Column column) { + int count = column.getPositionCount(); + for (int i = 0; i < count; i++) { + if (!column.isNull(i)) { + initResult = true; + sumValue += column.getFloat(i); + } + } + } + + private void addDoubleInput(Column column) { + int count = column.getPositionCount(); + for (int i = 0; i < count; i++) { + if (!column.isNull(i)) { + initResult = true; + sumValue += column.getDouble(i); + } + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index ed8fafe8f4ac5..a1dcefb0bd401 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -24,8 +24,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.header.ColumnHeader; -import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITimeRangeIterator; -import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.SingleTimeWindowIterator; +import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; +import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.TableDateBinTimeRangeIterator; +import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.TableSingleTimeWindowIterator; import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext; import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager; import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeService; @@ -122,9 +123,12 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral; import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.leaf.LeafColumnTransformer; +import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.DateBinFunctionColumnTransformer; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.IDeviceSchemaInfo; import org.apache.iotdb.db.utils.datastructure.SortKey; @@ -177,6 +181,7 @@ import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST; import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST; +import static org.apache.tsfile.read.common.type.TimestampType.TIMESTAMP; /** This Visitor is responsible for transferring Table PlanNode Tree to Table Operator Tree. */ public class TableOperatorGenerator extends PlanVisitor { @@ -1397,22 +1402,6 @@ public Operator visitAggregationTableScan( List measurementColumnNames = new ArrayList<>(); List measurementSchemas = new ArrayList<>(); - List groupingKeySchemas = null; - int[] groupingKeyIndex = null; - if (!node.getGroupingKeys().isEmpty()) { - groupingKeySchemas = new ArrayList<>(node.getGroupingKeys().size()); - groupingKeyIndex = new int[node.getGroupingKeys().size()]; - for (int i = 0; i < node.getGroupingKeys().size(); i++) { - Symbol groupingKey = node.getGroupingKeys().get(i); - - checkArgument( - idAndAttributeColumnsIndexMap.containsKey(groupingKey), - "grouping key must be ID or Attribute in AggregationTableScan"); - groupingKeySchemas.add(columnSchemaMap.get(groupingKey)); - groupingKeyIndex[i] = idAndAttributeColumnsIndexMap.get(groupingKey); - } - } - // TODO test aggregation function which has more than one arguements for (Map.Entry entry : node.getAggregations().entrySet()) { idx++; @@ -1489,6 +1478,44 @@ public Operator visitAggregationTableScan( } } + ITableTimeRangeIterator timeRangeIterator = null; + List groupingKeySchemas = null; + int[] groupingKeyIndex = null; + if (!node.getGroupingKeys().isEmpty()) { + groupingKeySchemas = new ArrayList<>(node.getGroupingKeys().size()); + groupingKeyIndex = new int[node.getGroupingKeys().size()]; + for (int i = 0; i < node.getGroupingKeys().size(); i++) { + Symbol groupingKey = node.getGroupingKeys().get(i); + + if (idAndAttributeColumnsIndexMap.containsKey(groupingKey)) { + groupingKeySchemas.add(columnSchemaMap.get(groupingKey)); + groupingKeyIndex[i] = idAndAttributeColumnsIndexMap.get(groupingKey); + } else { + if (node.getProjection() != null + && !node.getProjection().getMap().isEmpty() + && node.getProjection().contains(groupingKey)) { + FunctionCall dateBinFunc = (FunctionCall) node.getProjection().get(groupingKey); + List arguments = dateBinFunc.getArguments(); + DateBinFunctionColumnTransformer dateBinTransformer = + new DateBinFunctionColumnTransformer( + TIMESTAMP, + ((LongLiteral) arguments.get(0)).getParsedValue(), + ((LongLiteral) arguments.get(1)).getParsedValue(), + null, + ((LongLiteral) arguments.get(3)).getParsedValue(), + context.getZoneId()); + timeRangeIterator = new TableDateBinTimeRangeIterator(dateBinTransformer); + } else { + throw new IllegalStateException( + "grouping key must be ID or Attribute in AggregationTableScan"); + } + } + } + } + if (timeRangeIterator == null) { + timeRangeIterator = new TableSingleTimeWindowIterator(Long.MIN_VALUE, Long.MAX_VALUE); + } + final OperatorContext operatorContext = context .getDriverContext() @@ -1509,8 +1536,7 @@ public Operator visitAggregationTableScan( scanOptionsBuilder.withPushDownFilter( convertPredicateToFilter(pushDownPredicate, measurementColumnNames, columnSchemaMap)); } - ITimeRangeIterator timeRangeIterator = - new SingleTimeWindowIterator(Long.MIN_VALUE, Long.MAX_VALUE); + TableAggregationTableScanOperator aggTableScanOperator = new TableAggregationTableScanOperator( node.getPlanNodeId(), @@ -1528,7 +1554,7 @@ public Operator visitAggregationTableScan( groupingKeySchemas, groupingKeyIndex, timeRangeIterator, - false, + node.getScanOrder().isAscending(), null, calculateMaxAggregationResultSize(), canUseStatistic, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableBuiltinAggregationFunction.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableBuiltinAggregationFunction.java index a2b7fe8d35433..8ef20f7b91069 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableBuiltinAggregationFunction.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableBuiltinAggregationFunction.java @@ -111,6 +111,8 @@ public static boolean canUseStatistics(String name, boolean withTime) { public static List getIntermediateTypes(String name, List originalArgumentTypes) { if (COUNT.functionName.equalsIgnoreCase(name)) { return ImmutableList.of(INT64); + } else if (SUM.functionName.equalsIgnoreCase(name)) { + return ImmutableList.of(DOUBLE); } else if (AVG.functionName.equalsIgnoreCase(name)) { return ImmutableList.of(DOUBLE, INT64); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/DateBinFunctionColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/DateBinFunctionColumnTransformer.java index 304f89e2a37d4..eadfcd7e4c635 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/DateBinFunctionColumnTransformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/DateBinFunctionColumnTransformer.java @@ -144,6 +144,45 @@ public static long dateBin( return origin + (n * nonMonthDuration); } + public long[] dateBinStartEnd(long source) { + // return source if interval is 0 + if (monthDuration == 0 && nonMonthDuration == 0) { + return new long[] {source, source}; + } + + if (monthDuration != 0) { + // convert to LocalDateTime + LocalDateTime sourceDate = convertToLocalDateTime(source, zoneId); + LocalDateTime originDate = convertToLocalDateTime(origin, zoneId); + + // calculate the number of months between the origin and source + long monthsDiff = ChronoUnit.MONTHS.between(originDate, sourceDate); + // calculate the number of month cycles completed + long completedMonthCycles = monthsDiff / monthDuration; + + LocalDateTime binStart = + originDate + .plusNanos(getNanoTimeStamp(nonMonthDuration) * completedMonthCycles) + .plusMonths(completedMonthCycles * monthDuration); + + if (binStart.isAfter(sourceDate)) { + binStart = + binStart.minusMonths(monthDuration).minusNanos(getNanoTimeStamp(nonMonthDuration)); + } + + return new long[] { + convertToTimestamp(binStart, zoneId), + convertToTimestamp(binStart.plusMonths(monthDuration), zoneId) + }; + } + + long diff = source - origin; + long n = diff >= 0 ? diff / nonMonthDuration : (diff - nonMonthDuration + 1) / nonMonthDuration; + return new long[] { + origin + (n * nonMonthDuration), origin + (n * nonMonthDuration) + nonMonthDuration + }; + } + public static long nextDateBin(int monthDuration, ZoneId zoneId, long currentTime) { LocalDateTime currentDateTime = convertToLocalDateTime(currentTime, zoneId); LocalDateTime nextDateTime = currentDateTime.plusMonths(monthDuration);