Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.BaseSourceRewriter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
Expand All @@ -49,6 +50,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
Expand Down Expand Up @@ -95,6 +97,7 @@
import java.util.TreeSet;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.iotdb.commons.conf.IoTDBConstant.LAST_VALUE;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
Expand Down Expand Up @@ -242,13 +245,17 @@ public List<PlanNode> visitDeviceView(DeviceViewNode node, DistributionPlanConte
Map<Integer, List<Integer>> newMeasurementIdxMap = new HashMap<>();
List<String> newPartialOutputColumns = new ArrayList<>();
Set<Expression> deviceViewOutputExpressions = analysis.getDeviceViewOutputExpressions();
// Used to rewrite child ProjectNode if it exists
List<FunctionExpression> actualPartialAggregations = new ArrayList<>();

int i = 0, newIdxSum = 0;
for (Expression expression : deviceViewOutputExpressions) {
if (i == 0) {
newPartialOutputColumns.add(expression.getOutputSymbol());
i++;
newIdxSum++;
// just a placeholder, convenient for after process
actualPartialAggregations.add(null);
continue;
}
FunctionExpression aggExpression = (FunctionExpression) expression;
Expand All @@ -269,6 +276,7 @@ public List<PlanNode> visitDeviceView(DeviceViewNode node, DistributionPlanConte
.setTreeModelType(partialFunctionExpression.getOutputSymbol(), dataType);
}
newPartialOutputColumns.add(partialFunctionExpression.getOutputSymbol());
actualPartialAggregations.add(partialFunctionExpression);
}
newMeasurementIdxMap.put(
i++,
Expand All @@ -289,6 +297,38 @@ public List<PlanNode> visitDeviceView(DeviceViewNode node, DistributionPlanConte
DeviceViewNode deviceViewNode = (DeviceViewNode) planNode;
deviceViewNode.setOutputColumnNames(newPartialOutputColumns);
transferAggregatorsRecursively(planNode, context);

List<IDeviceID> devices = deviceViewNode.getDevices();
for (int j = 0; j < devices.size(); j++) {
if (deviceViewNode.getChildren().get(j) instanceof ProjectNode) {
IDeviceID device = devices.get(j);

// construct output column names for each child ProjectNode
List<Integer> newMeasurementIdxList =
deviceViewNode.getDeviceToMeasurementIndexesMap().get(device);
List<String> newProjectOutputs =
newMeasurementIdxList.stream()
.map(
// process each measurement
measurementIdx -> {
FunctionExpression aggExpression =
actualPartialAggregations.get(measurementIdx);

// construct new FunctionExpression with device for ProjectNode
List<Expression> withDeviceExpressions =
getWithDeviceExpressions(aggExpression, device.toString());
aggExpression =
new FunctionExpression(
aggExpression.getFunctionName(),
aggExpression.getFunctionAttributes(),
withDeviceExpressions);
return aggExpression.getExpressionString();
})
.collect(Collectors.toList());
((ProjectNode) deviceViewNode.getChildren().get(j))
.setOutputColumnNames(newProjectOutputs);
}
}
}

boolean hasGroupBy =
Expand All @@ -313,6 +353,22 @@ public List<PlanNode> visitDeviceView(DeviceViewNode node, DistributionPlanConte
}
}

private static List<Expression> getWithDeviceExpressions(
FunctionExpression aggExpression, String device) {
return aggExpression.getExpressions().stream()
.map(
// process each argument of FunctionExpression
argument -> {
checkArgument(
argument instanceof TimeSeriesOperand,
"Argument of AggregationFunction should be TimeSeriesOperand here");
return new TimeSeriesOperand(
new PartialPath(device, argument.getExpressionString(), false),
((TimeSeriesOperand) argument).getType());
})
.collect(Collectors.toList());
}

/**
* aggregation align by device, and aggregation is `count_if` or `diff`, or aggregation used with
* group by parameter (session, variation, count), use the old aggregation logic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public AggregationNode(
this.scanOrder = scanOrder;
}

// used by clone & deserialize
public AggregationNode(
PlanNodeId id,
List<AggregationDescriptor> aggregationDescriptorList,
Expand All @@ -105,7 +106,7 @@ public AggregationNode(
boolean outputEndTime,
Ordering scanOrder) {
super(id, new ArrayList<>());
this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
this.aggregationDescriptorList = aggregationDescriptorList;
this.groupByTimeParameter = groupByTimeParameter;
this.scanOrder = scanOrder;
this.groupByParameter = groupByParameter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public RawDataAggregationNode(
this.scanOrder = scanOrder;
}

// used by clone & deserialize
public RawDataAggregationNode(
PlanNodeId id,
List<AggregationDescriptor> aggregationDescriptorList,
Expand All @@ -98,7 +99,7 @@ public RawDataAggregationNode(
boolean outputEndTime,
Ordering scanOrder) {
super(id);
this.aggregationDescriptorList = getDeduplicatedDescriptors(aggregationDescriptorList);
this.aggregationDescriptorList = aggregationDescriptorList;
this.groupByTimeParameter = groupByTimeParameter;
this.scanOrder = scanOrder;
this.groupByParameter = groupByParameter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public AlignedSeriesAggregationScanNode(
this.regionReplicaSet = dataRegionReplicaSet;
}

// used by clone & deserialize
public AlignedSeriesAggregationScanNode(
PlanNodeId id,
AlignedPath alignedPath,
Expand All @@ -109,14 +110,12 @@ public AlignedSeriesAggregationScanNode(
@Nullable GroupByTimeParameter groupByTimeParameter,
TRegionReplicaSet dataRegionReplicaSet,
byte descriptorType) {
this(
id,
alignedPath,
aggregationDescriptorList,
scanOrder,
pushDownPredicate,
groupByTimeParameter,
dataRegionReplicaSet);
super(id, aggregationDescriptorList);
this.alignedPath = alignedPath;
this.scanOrder = scanOrder;
this.groupByTimeParameter = groupByTimeParameter;
this.pushDownPredicate = pushDownPredicate;
this.regionReplicaSet = dataRegionReplicaSet;
setOutputEndTime(outputEndTime);
setDescriptorType(descriptorType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public SeriesAggregationScanNode(
this.regionReplicaSet = dataRegionReplicaSet;
}

// used by clone & deserialize
public SeriesAggregationScanNode(
PlanNodeId id,
MeasurementPath seriesPath,
Expand All @@ -114,14 +115,12 @@ public SeriesAggregationScanNode(
@Nullable Expression pushDownPredicate,
@Nullable GroupByTimeParameter groupByTimeParameter,
TRegionReplicaSet dataRegionReplicaSet) {
this(
id,
seriesPath,
aggregationDescriptorList,
scanOrder,
pushDownPredicate,
groupByTimeParameter,
dataRegionReplicaSet);
super(id, aggregationDescriptorList);
this.seriesPath = seriesPath;
this.scanOrder = scanOrder;
this.groupByTimeParameter = groupByTimeParameter;
this.pushDownPredicate = pushDownPredicate;
this.regionReplicaSet = dataRegionReplicaSet;
setOutputEndTime(outputEndTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
Expand All @@ -39,6 +40,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;

import com.google.common.collect.ImmutableList;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -428,4 +430,40 @@ public void orderByTimeTest1() {
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof HorizontallyConcatNode);
}

@Test
public void crossRegionTest() {
// one aggregation measurement, two devices
sql = "select last_value(s1),last_value(s2)from root.sg.d1 align by device";
analysis = Util.analyze(sql, context);
logicalPlanNode = Util.genLogicalPlan(analysis, context);
planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
plan = planner.planFragments();
assertEquals(2, plan.getInstances().size());

firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
assertTrue(firstFiRoot instanceof AggregationMergeSortNode);
assertTrue(firstFiRoot.getChildren().get(0) instanceof DeviceViewNode);
if (firstFiRoot.getChildren().get(0).getChildren().get(0) instanceof ProjectNode) {
assertEquals(
firstFiRoot.getChildren().get(0).getChildren().get(0).getOutputColumnNames(),
ImmutableList.of(
"last_value(root.sg.d1.s1)",
"max_time(root.sg.d1.s1)",
"last_value(root.sg.d1.s2)",
"max_time(root.sg.d1.s2)"));
}

secondFiRoot = plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
assertTrue(secondFiRoot instanceof DeviceViewNode);
if (secondFiRoot.getChildren().get(0) instanceof ProjectNode) {
assertEquals(
firstFiRoot.getChildren().get(0).getChildren().get(0).getOutputColumnNames(),
ImmutableList.of(
"last_value(root.sg.d1.s1)",
"max_time(root.sg.d1.s1)",
"last_value(root.sg.d1.s2)",
"max_time(root.sg.d1.s2)"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ public PartialPath(final String path, final boolean needSplit) {
}
}

/**
* only use this method in following situations: 1. you are sure you do not want to split the
* path. 2. you are sure path is correct.
*
* @param needSplit whether to split path to nodes, needSplit can only be false.
*/
public PartialPath(String device, String measurement, boolean needSplit) {
Validate.isTrue(!needSplit);
String path = device + TsFileConstant.PATH_SEPARATOR + measurement;
this.nodes = new String[] {path};
}

public boolean hasWildcard() {
for (String node : nodes) {
// *, ** , d*, *d*
Expand Down
Loading