Skip to content
Permalink
Browse files
Add distribution plan logic for AlignedSeriesScan (#5941)
  • Loading branch information
xingtanzjr committed May 18, 2022
1 parent 05fc9f5 commit f78e90fdbdaaaae1ee5262955ccc861b91f8a785
Showing 2 changed files with 54 additions and 11 deletions.
@@ -43,8 +43,10 @@
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
@@ -234,16 +236,10 @@ public PlanNode visitSeriesScan(SeriesScanNode node, DistributionPlanContext con
return timeJoinNode;
}

@Override
public PlanNode visitSeriesAggregationScan(
SeriesAggregationScanNode node, DistributionPlanContext context) {
List<TRegionReplicaSet> dataDistribution =
analysis.getPartitionInfo(node.getSeriesPath(), node.getTimeFilter());
if (dataDistribution.size() == 1) {
node.setRegionReplicaSet(dataDistribution.get(0));
return node;
}

List<AggregationDescriptor> leafAggDescriptorList = new ArrayList<>();
node.getAggregationDescriptorList()
.forEach(
@@ -279,6 +275,26 @@ public PlanNode visitSeriesAggregationScan(
return aggregationNode;
}

@Override
public PlanNode visitAlignedSeriesScan(
AlignedSeriesScanNode node, DistributionPlanContext context) {
List<TRegionReplicaSet> dataDistribution =
analysis.getPartitionInfo(node.getAlignedPath(), node.getTimeFilter());
if (dataDistribution.size() == 1) {
node.setRegionReplicaSet(dataDistribution.get(0));
return node;
}
TimeJoinNode timeJoinNode =
new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
for (TRegionReplicaSet dataRegion : dataDistribution) {
AlignedSeriesScanNode split = (AlignedSeriesScanNode) node.clone();
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
split.setRegionReplicaSet(dataRegion);
timeJoinNode.addChild(split);
}
return timeJoinNode;
}

@Override
public PlanNode visitSchemaFetchMerge(
SchemaFetchMergeNode node, DistributionPlanContext context) {
@@ -314,7 +330,7 @@ public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context

// Step 1: Get all source nodes. For the node which is not source, add it as the child of
// current TimeJoinNode
List<SeriesScanNode> sources = new ArrayList<>();
List<SourceNode> sources = new ArrayList<>();
for (PlanNode child : node.getChildren()) {
if (child instanceof SeriesScanNode) {
// If the child is SeriesScanNode, we need to check whether this node should be seperated
@@ -330,6 +346,18 @@ public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context
split.setRegionReplicaSet(dataRegion);
sources.add(split);
}
} else if (child instanceof AlignedSeriesScanNode) {
AlignedSeriesScanNode handle = (AlignedSeriesScanNode) child;
List<TRegionReplicaSet> dataDistribution =
analysis.getPartitionInfo(handle.getAlignedPath(), handle.getTimeFilter());
// If the size of dataDistribution is m, this SeriesScanNode should be seperated into m
// SeriesScanNode.
for (TRegionReplicaSet dataRegion : dataDistribution) {
AlignedSeriesScanNode split = (AlignedSeriesScanNode) handle.clone();
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
split.setRegionReplicaSet(dataRegion);
sources.add(split);
}
} else if (child instanceof SeriesAggregationScanNode) {
// TODO: (xingtanzjr) We should do the same thing for SeriesAggregateScanNode. Consider to
// make SeriesAggregateScanNode
@@ -344,8 +372,8 @@ public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context
}

// Step 2: For the source nodes, group them by the DataRegion.
Map<TRegionReplicaSet, List<SeriesScanNode>> sourceGroup =
sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getRegionReplicaSet));
Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
// Step 3: For the source nodes which belong to same data region, add a TimeJoinNode for them
// and make the
// new TimeJoinNode as the child of current TimeJoinNode
@@ -481,6 +509,13 @@ public PlanNode visitSeriesScan(SeriesScanNode node, NodeGroupContext context) {
}

@Override
public PlanNode visitAlignedSeriesScan(AlignedSeriesScanNode node, NodeGroupContext context) {
context.putNodeDistribution(
node.getPlanNodeId(),
new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
return node.clone();
}

public PlanNode visitSeriesAggregationScan(
SeriesAggregationScanNode node, NodeGroupContext context) {
context.putNodeDistribution(
@@ -129,11 +129,13 @@ public void open() throws Exception {}

@Override
public TRegionReplicaSet getRegionReplicaSet() {
return null;
return regionReplicaSet;
}

@Override
public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {}
public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
this.regionReplicaSet = regionReplicaSet;
}

@Override
public void close() throws Exception {}
@@ -265,4 +267,10 @@ public int hashCode() {
offset,
regionReplicaSet);
}

public String toString() {
return String.format(
"AlignedSeriesScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
this.getPlanNodeId(), this.getAlignedPath(), this.getRegionReplicaSet());
}
}

0 comments on commit f78e90f

Please sign in to comment.