Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions docs/UserGuide/Operation Manual/DML Data Manipulation Language.md
Original file line number Diff line number Diff line change
Expand Up @@ -419,21 +419,41 @@ It costs 0.003s
```


Assuming another timeseries is added, called "root.ln.wf02.wt01.status".
To query the number of "status" points of both two paths "root.ln.wf01" and "root.ln.wf02".
Suppose we add another two timeseries, "root.ln.wf01.wt01.temperature" and "root.ln.wf02.wt01.temperature".
To query the count and the sum of "temperature" under path "root.ln.*.*",
aggregating on level=2, use following statement:

```
select count(status) from root.ln.*.* group by level=2
select count(temperature), sum(temperature) from root.ln.*.* group by level=2
```
Result:

```
+----------------------------+----------------------------+
|COUNT(root.ln.wf01.*.status)|COUNT(root.ln.wf02.*.status)|
+----------------------------+----------------------------+
| 10080| 10082|
+----------------------------+----------------------------+
+---------------------------------+---------------------------------+-------------------------------+-------------------------------+
|count(root.ln.wf02.*.temperature)|count(root.ln.wf01.*.temperature)|sum(root.ln.wf02.*.temperature)|sum(root.ln.wf01.*.temperature)|
+---------------------------------+---------------------------------+-------------------------------+-------------------------------+
| 8| 4| 228.0| 91.83000183105469|
+---------------------------------+---------------------------------+-------------------------------+-------------------------------+
Total line number = 1
It costs 0.003s
It costs 0.013s
```

To query the count and the sum of path "root.ln.\*.\*.temperature" aggregating on "root.ln" level,
simply set level=1

```
select count(temperature), sum(temperature) from root.ln.*.* group by level=1
```
Result:

```
+------------------------------+----------------------------+
|count(root.ln.*.*.temperature)|sum(root.ln.*.*.temperature)|
+------------------------------+----------------------------+
| 12| 319.8300018310547|
+------------------------------+----------------------------+
Total line number = 1
It costs 0.013s
```

All supported aggregation functions are: count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ select count(status) from root.ln.wf01.* group by level=2

```
+----------------------------+
|COUNT(root.ln.wf01.*.status)|
|count(root.ln.wf01.*.status)|
+----------------------------+
| 10080|
+----------------------------+
Expand All @@ -478,21 +478,37 @@ It costs 0.003s
```


假设此时在"root.ln"下面加入名为wf02的子序列,如"root.ln.wf02.wt01.status"。
需要同时查询"root.ln.wf01"和"root.ln.wf02"下各自status子序列的点个数。则使用查询
假设此时添加两条序列,"root.ln.wf01.wt01.temperature" and "root.ln.wf02.wt01.temperature"。
需要同时查询"root.ln.\*.\*.temperature"在第二层级的count聚合结果和sum聚合结果,可以使用下列查询语句
```
select count(status) from root.ln.*.* group by level=2
select count(temperature), sum(temperature) from root.ln.*.* group by level=2
```
运行结果:

```
+----------------------------+----------------------------+
|COUNT(root.ln.wf01.*.status)|COUNT(root.ln.wf02.*.status)|
+----------------------------+----------------------------+
| 10080| 10082|
+----------------------------+----------------------------+
+---------------------------------+---------------------------------+-------------------------------+-------------------------------+
|count(root.ln.wf02.*.temperature)|count(root.ln.wf01.*.temperature)|sum(root.ln.wf02.*.temperature)|sum(root.ln.wf01.*.temperature)|
+---------------------------------+---------------------------------+-------------------------------+-------------------------------+
| 8| 4| 228.0| 91.83000183105469|
+---------------------------------+---------------------------------+-------------------------------+-------------------------------+
Total line number = 1
It costs 0.003s
It costs 0.013s
```

若统计"root.ln.\*.\*"下第一层级的count聚合结果和sum聚合结果,则设置level=1即可:
```
select count(temperature), sum(temperature) from root.ln.*.* group by level=1
```
运行结果:

```
+------------------------------+----------------------------+
|count(root.ln.*.*.temperature)|sum(root.ln.*.*.temperature)|
+------------------------------+----------------------------+
| 12| 319.8300018310547|
+------------------------------+----------------------------+
Total line number = 1
It costs 0.013s
```

分层聚合查询也可被用于其他聚合函数,当前所支持的聚合函数为:count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,19 @@
*/
package org.apache.iotdb.db.qp.physical.crud;

import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public class AggregationPlan extends RawDataQueryPlan {

Expand All @@ -33,6 +42,8 @@ public class AggregationPlan extends RawDataQueryPlan {
private List<String> deduplicatedAggregations = new ArrayList<>();

private int level = -1;
// group by level aggregation result path
private final Map<String, AggregateResult> levelAggPaths = new LinkedHashMap<>();

public AggregationPlan() {
super();
Expand Down Expand Up @@ -67,4 +78,28 @@ public int getLevel() {
public void setLevel(int level) {
this.level = level;
}

public Map<String, AggregateResult> getAggPathByLevel() throws QueryProcessException {
if (!levelAggPaths.isEmpty()) {
return levelAggPaths;
}
List<PartialPath> seriesPaths = getPaths();
List<TSDataType> dataTypes = getDataTypes();
try {
for (int i = 0; i < seriesPaths.size(); i++) {
String transformedPath =
FilePathUtils.generatePartialPathByLevel(seriesPaths.get(i).getFullPath(), getLevel());
String key = getAggregations().get(i) + "(" + transformedPath + ")";
if (!levelAggPaths.containsKey(key)) {
AggregateResult aggRet =
AggregateResultFactory.getAggrResultByName(
getAggregations().get(i), dataTypes.get(i));
levelAggPaths.put(key, aggRet);
}
}
} catch (IllegalPathException e) {
throw new QueryProcessException(e.getMessage());
}
return levelAggPaths;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.udf.core.context.UDFContext;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
Expand Down Expand Up @@ -876,6 +877,16 @@ private void deduplicate(QueryPlan queryPlan, int fetchSize)
.getContext()
.getColumnName()
: columnForReader;
if (queryPlan instanceof AggregationPlan && ((AggregationPlan) queryPlan).getLevel() >= 0) {
String aggregatePath =
originalPath.isMeasurementAliasExists()
? FilePathUtils.generatePartialPathByLevel(
originalPath.getFullPathWithAlias(), ((AggregationPlan) queryPlan).getLevel())
: FilePathUtils.generatePartialPathByLevel(
originalPath.toString(), ((AggregationPlan) queryPlan).getLevel());
Comment on lines +881 to +886
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line 850, check the level and don't change the value of columnForReader. In this way, you can use the columnForReader directly here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The modified value of columnForReader is used between line 858-871. Considering the complexity of deduplicate(), I tried not to make much changes here to keep correctness.
Perhaps refactor deduplicate() and moving it into QueryPlan would be a choice. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay...but maybe we have to rewrite this method later...it's so long

columnForDisplay =
queryPlan.getAggregations().get(originalIndex) + "(" + aggregatePath + ")";
}
if (!columnForDisplaySet.contains(columnForDisplay)) {
queryPlan.addPathToIndex(columnForDisplay, queryPlan.getPathToIndex().size());
if (queryPlan instanceof UDTFPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ public Operator transform(Operator operator, int maxDeduplicatedPathNum)
}

checkAggrOfSelectOperator(select);
if (((QueryOperator) operator).isGroupByLevel()) {
checkAggrOfGroupByLevel(select);
}

boolean isAlignByDevice = false;
if (operator instanceof QueryOperator) {
Expand Down Expand Up @@ -165,14 +162,6 @@ private void checkAggrOfSelectOperator(SelectOperator selectOperator)
}
}

private void checkAggrOfGroupByLevel(SelectOperator selectOperator)
throws LogicalOptimizeException {
if (selectOperator.getAggregations().size() != 1) {
throw new LogicalOptimizeException(
"Aggregation function is restricted to one if group by level clause exists");
}
}

private void extendListSafely(List<String> source, int index, List<String> target) {
if (source != null && !source.isEmpty()) {
target.add(source.get(index));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,23 @@ private void updateAvg(TSDataType type, Object sumVal) throws UnSupportedDataTyp
cnt++;
}

public void setAvgResult(TSDataType type, Object val) throws UnSupportedDataTypeException {
cnt = 1;
switch (type) {
case INT32:
case INT64:
case FLOAT:
case DOUBLE:
avg = (double) val;
break;
case TEXT:
case BOOLEAN:
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type in aggregation AVG : %s", type));
}
}

@Override
public boolean hasFinalResult() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -61,8 +60,7 @@ public GroupByTimeDataSet(
logger.debug("paths " + this.paths + " level:" + plan.getLevel());
}

Map<Integer, String> pathIndex = new HashMap<>();
Map<String, AggregateResult> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
Map<String, AggregateResult> finalPaths = plan.getAggPathByLevel();

// get all records from GroupByDataSet, then we merge every record
if (logger.isDebugEnabled()) {
Expand All @@ -72,7 +70,7 @@ public GroupByTimeDataSet(
RowRecord rawRecord = dataSet.nextWithoutConstraint();
RowRecord curRecord = new RowRecord(rawRecord.getTimestamp());
List<AggregateResult> mergedAggResults =
FilePathUtils.mergeRecordByPath(plan, rawRecord, finalPaths, pathIndex);
FilePathUtils.mergeRecordByPath(plan, rawRecord, finalPaths);
for (AggregateResult resultData : mergedAggResults) {
TSDataType dataType = resultData.getResultDataType();
curRecord.addField(resultData.getResult(), dataType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ private static int aggregatePages(
*
* @param context query context.
*/
public QueryDataSet executeWithValueFilter(QueryContext context, RawDataQueryPlan queryPlan)
public QueryDataSet executeWithValueFilter(QueryContext context, AggregationPlan queryPlan)
throws StorageEngineException, IOException, QueryProcessException {
int index = 0;
for (; index < aggregations.size(); index++) {
Expand Down Expand Up @@ -426,7 +426,7 @@ private void aggregateWithValueFilter(
* @param aggregateResultList aggregate result list
*/
private QueryDataSet constructDataSet(
List<AggregateResult> aggregateResultList, RawDataQueryPlan plan)
List<AggregateResult> aggregateResultList, AggregationPlan plan)
throws QueryProcessException {
RowRecord record = new RowRecord(0);
for (AggregateResult resultData : aggregateResultList) {
Expand All @@ -435,13 +435,11 @@ private QueryDataSet constructDataSet(
}

SingleDataSet dataSet;
if (((AggregationPlan) plan).getLevel() >= 0) {
Map<Integer, String> pathIndex = new HashMap<>();
Map<String, AggregateResult> finalPaths =
FilePathUtils.getPathByLevel((AggregationPlan) plan, pathIndex);
if (plan.getLevel() >= 0) {
Map<String, AggregateResult> finalPaths = plan.getAggPathByLevel();

List<AggregateResult> mergedAggResults =
FilePathUtils.mergeRecordByPath(aggregateResultList, finalPaths, pathIndex);
FilePathUtils.mergeRecordByPath(plan, aggregateResultList, finalPaths);

List<PartialPath> paths = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import org.apache.iotdb.db.query.dataset.UDTFDataSet;
import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.rpc.RpcUtils;
Expand Down Expand Up @@ -139,7 +138,6 @@
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -767,11 +765,9 @@ private TSExecuteStatementResp getQueryColumnHeaders(PhysicalPlan physicalPlan,
// same.
return StaticResps.LAST_RESP.deepCopy();
} else if (plan instanceof AggregationPlan && ((AggregationPlan) plan).getLevel() >= 0) {
Map<Integer, String> pathIndex = new HashMap<>();
Map<String, AggregateResult> finalPaths =
FilePathUtils.getPathByLevel((AggregationPlan) plan, pathIndex);
Map<String, AggregateResult> finalPaths = ((AggregationPlan) plan).getAggPathByLevel();
for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) {
respColumns.add(entry.getValue().getAggregationType() + "(" + entry.getKey() + ")");
respColumns.add(entry.getKey());
columnsTypes.add(entry.getValue().getResultDataType().toString());
}
} else {
Expand Down
Loading