Skip to content

Commit

Permalink
Change to groupByDevice execution logic: processing filter by device … (
Browse files Browse the repository at this point in the history
#722)

* Change to groupByDevice execution logic: processing filter by device separately
  • Loading branch information
Alima777 authored and Jialin Qiao committed Jan 9, 2020
1 parent 2aa5e53 commit 28987b3
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class QueryPlan extends PhysicalPlan {
private List<String> measurements; // for group by device sql, e.g. temperature
private Map<String, Set<String>> measurementsGroupByDevice; // for group by device sql, e.g. root.ln.d1 -> temperature
private Map<String, TSDataType> dataTypeConsistencyChecker; // for group by device sql, e.g. root.ln.d1.temperature -> Float
private Map<String, IExpression> deviceToFilterMap; // for group by device sql
private Map<Path, TSDataType> dataTypeMapping = new HashMap<>(); // for group by device sql

public QueryPlan() {
Expand Down Expand Up @@ -183,4 +184,13 @@ public void setDeduplicatedDataTypes(
List<TSDataType> deduplicatedDataTypes) {
this.deduplicatedDataTypes = deduplicatedDataTypes;
}

public Map<String, IExpression> getDeviceToFilterMap() {
return deviceToFilterMap;
}

public void setDeviceToFilterMap(Map<String, IExpression> deviceToFilterMap) {
this.deviceToFilterMap = deviceToFilterMap;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.db.exception.query.LogicalOperatorException;
import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
import org.apache.iotdb.db.qp.logical.Operator;
Expand Down Expand Up @@ -313,27 +314,95 @@ private PhysicalPlan transformQuery(QueryOperator queryOperator)
queryPlan.setDataTypeConsistencyChecker(dataTypeConsistencyChecker);
queryPlan.setPaths(paths);
queryPlan.setDeduplicatedPaths(paths);

// get device to filter map
FilterOperator filterOperator = queryOperator.getFilterOperator();

if(filterOperator != null){
queryPlan.setDeviceToFilterMap(concatFilterByDivice(prefixPaths, filterOperator));
}
} else {
List<Path> paths = queryOperator.getSelectedPaths();
queryPlan.setPaths(paths);
}
generateDataTypes(queryPlan);
deduplicate(queryPlan);

// transform filter operator to expression
FilterOperator filterOperator = queryOperator.getFilterOperator();
// transform filter operator to expression
FilterOperator filterOperator = queryOperator.getFilterOperator();

if (filterOperator != null) {
IExpression expression = filterOperator.transformToExpression(executor);
queryPlan.setExpression(expression);
if (filterOperator != null) {
IExpression expression = filterOperator.transformToExpression(executor);
queryPlan.setExpression(expression);
}
}
generateDataTypes(queryPlan);
deduplicate(queryPlan);

queryPlan.setRowLimit(queryOperator.getRowLimit());
queryPlan.setRowOffset(queryOperator.getRowOffset());

return queryPlan;
}

// e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10" to
// [root.ln.d1 -> root.ln.d1.s1 < 20 AND root.ln.d1.s2 > 10,
// root.ln.d2 -> root.ln.d2.s1 < 20 AND root.ln.d2.s2 > 10)]
private Map<String, IExpression> concatFilterByDivice(List<Path> fromPaths, FilterOperator operator)
throws QueryProcessException {
Map<String, IExpression> deviceToFilterMap = new HashMap<>();
// remove stars in fromPaths and get deviceId with deduplication
List<String> noStarDevices = removeStarsInDeviceWithUnique(fromPaths);
for (int i = 0; i < noStarDevices.size(); i++) {
FilterOperator newOperator = operator.clone();
newOperator = concatFilterPath(noStarDevices.get(i), newOperator);

deviceToFilterMap.put(noStarDevices.get(i), newOperator.transformToExpression(executor));
}

return deviceToFilterMap;
}

private List<String> removeStarsInDeviceWithUnique(List<Path> paths)
throws LogicalOptimizeException {
List<String> retDevices;
Set<String> deviceSet = new LinkedHashSet<>();
try {
for (Path path : paths) {
List<String> tempDS;
tempDS = MManager.getInstance().getDevices(path.getFullPath());

for (String subDevice : tempDS) {
if (!deviceSet.contains(subDevice)) {
deviceSet.add(subDevice);
}
}
}
retDevices = new ArrayList<>(deviceSet);
} catch (PathException e) {
throw new LogicalOptimizeException("error when remove star: " + e.getMessage());
}
return retDevices;
}

private FilterOperator concatFilterPath(String prefix, FilterOperator operator) {
if(!operator.isLeaf()){
for (FilterOperator child : operator.getChildren()) {
concatFilterPath(prefix, child);
}
return operator;
}
BasicFunctionOperator basicOperator = (BasicFunctionOperator) operator;
Path filterPath = basicOperator.getSinglePath();

// do nothing in the cases of "where time > 5" or "where root.d1.s1 > 5"
if (SQLConstant.isReservedPath(filterPath) || filterPath.startWith(SQLConstant.ROOT)) {
return operator;
}

Path concatPath = filterPath.addPrefixPath(filterPath, prefix);
basicOperator.setSinglePath(concatPath);

return basicOperator;
}

private void generateDataTypes(QueryPlan queryPlan) throws PathException {
List<Path> paths = queryPlan.getPaths();
List<TSDataType> dataTypes = new ArrayList<>(paths.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.iotdb.db.qp.strategy.optimizer;

import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
import org.apache.iotdb.db.exception.runtime.SQLParserException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
import org.apache.iotdb.db.qp.logical.Operator;
Expand Down Expand Up @@ -113,11 +111,10 @@ public Operator transform(Operator operator) throws LogicalOptimizeException {
if (filter == null) {
return operator;
}
if(isGroupByDevice){
sfwOperator.setFilterOperator(concatFilterByDivice(prefixPaths, filter));
} else {
if(!isGroupByDevice){
sfwOperator.setFilterOperator(concatFilter(prefixPaths, filter));
}
// GROUP_BY_DEVICE leaves the concatFilter to PhysicalGenerator to optimize filter without prefix first

return sfwOperator;
}
Expand Down Expand Up @@ -243,92 +240,6 @@ private FilterOperator concatFilter(List<Path> fromPaths, FilterOperator operato
}
}

// e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10"
// to "select * from root.ln.d1, root.ln.d2 where
// (root.ln.d1.s1 < 20 AND root.ln.d1.s2 > 10) OR (root.ln.d2.s1 < 20 AND root.ln.d2.s2 > 10)"
private FilterOperator concatFilterByDivice(List<Path> fromPaths, FilterOperator operator)
throws LogicalOptimizeException {
// remove stars in fromPaths and get deviceId
List<String> noStarDevices = removeStarsInDeviceWithUnique(fromPaths);

FilterOperator filterBinaryTree = new FilterOperator(SQLConstant.KW_OR);
FilterOperator currentNode = filterBinaryTree;
FilterOperator parentNode = currentNode;
// to check whether duplicate
// e.g. SELECT * FROM root.ln.d1, root.ln.d2 where time < 10
// brings two 'time < 10' filter operator
Set<FilterOperator> operatorSet = new HashSet<>();
for (int i = 0; i < noStarDevices.size(); i++) {
FilterOperator newOperator = operator.clone();
newOperator = concatFilterPath(noStarDevices.get(i), newOperator);

if (!operatorSet.contains(newOperator)) {
if(currentNode.getChildren().size() > 0){
FilterOperator newInnerNode = new FilterOperator(SQLConstant.KW_OR);
currentNode.addChildOperator(newInnerNode);
parentNode = currentNode;
currentNode = newInnerNode;
}
currentNode.addChildOperator(newOperator);
operatorSet.add(newOperator);
}
}

// if 'OR' has no enough operands due to duplication
if(currentNode.getChildren().size() == 1){
if(parentNode == currentNode){
filterBinaryTree = currentNode.getChildren().get(0);
} else {
parentNode.getChildren().set(1, currentNode.getChildren().get(0));
}
}

return filterBinaryTree;
}

private List<String> removeStarsInDeviceWithUnique(List<Path> paths)
throws LogicalOptimizeException {
List<String> retDevices;
Set<String> deviceSet = new LinkedHashSet<>();
try {
for (Path path : paths) {
List<String> tempDS;
tempDS = MManager.getInstance().getDevices(path.getFullPath());

for (String subDevice : tempDS) {
if (!deviceSet.contains(subDevice)) {
deviceSet.add(subDevice);
}
}
}
retDevices = new ArrayList<>(deviceSet);
} catch (PathException e) {
throw new LogicalOptimizeException("error when remove star: " + e.getMessage());
}
return retDevices;
}

private FilterOperator concatFilterPath(String prefix, FilterOperator operator) {
if(!operator.isLeaf()){
for (FilterOperator child : operator.getChildren()) {
concatFilterPath(prefix, child);
}
return operator;
}
BasicFunctionOperator basicOperator = (BasicFunctionOperator) operator;
Path filterPath = basicOperator.getSinglePath();

// do nothing in the cases of "where time > 5" or "where root.d1.s1 > 5"
if (SQLConstant.isReservedPath(filterPath) || filterPath.startWith(SQLConstant.ROOT)) {
return operator;
}

Path concatPath = filterPath.addPrefixPath(filterPath, prefix);
basicOperator.setSinglePath(concatPath);

return basicOperator;
}

private FilterOperator constructBinaryFilterTreeWithAnd(List<Path> noStarPaths,
FilterOperator operator)
throws LogicalOptimizeException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
Expand Down Expand Up @@ -57,6 +55,7 @@ public class DeviceIterateDataSet extends QueryDataSet {

private List<String> deduplicatedMeasurementColumns;
private Map<String, Set<String>> measurementColumnsGroupByDevice;
private Map<String, IExpression> deviceToFilterMap;

// group-by-time parameters
private long unit;
Expand Down Expand Up @@ -85,20 +84,18 @@ public DeviceIterateDataSet(QueryPlan queryPlan, QueryContext context,
this.queryRouter = queryRouter;
this.context = context;
this.measurementColumnsGroupByDevice = queryPlan.getMeasurementsGroupByDevice();
this.deviceToFilterMap = queryPlan.getDeviceToFilterMap();

if (queryPlan instanceof GroupByPlan) {
this.dataSetType = DataSetType.GROUPBY;
// assign parameters
this.expression = queryPlan.getExpression();
this.unit = ((GroupByPlan) queryPlan).getUnit();
this.slidingStep = ((GroupByPlan) queryPlan).getSlidingStep();
this.startTime = ((GroupByPlan) queryPlan).getStartTime();
this.endTime = ((GroupByPlan) queryPlan).getEndTime();

} else if (queryPlan instanceof AggregationPlan) {
this.dataSetType = DataSetType.AGGREGATE;
// assign parameters
this.expression = queryPlan.getExpression();

} else if (queryPlan instanceof FillQueryPlan) {
this.dataSetType = DataSetType.FILL;
Expand All @@ -107,8 +104,6 @@ public DeviceIterateDataSet(QueryPlan queryPlan, QueryContext context,
this.fillType = ((FillQueryPlan) queryPlan).getFillType();
} else {
this.dataSetType = DataSetType.QUERY;
// assign parameters
this.expression = queryPlan.getExpression();
}

this.curDataSetInitialized = false;
Expand Down Expand Up @@ -166,6 +161,11 @@ protected boolean hasNextWithoutConstraint() throws IOException {
}
}

// get filter to execute for the current device
if(deviceToFilterMap != null){
this.expression = deviceToFilterMap.get(currentDevice);
}

try {
switch (dataSetType) {
case GROUPBY:
Expand Down

0 comments on commit 28987b3

Please sign in to comment.