From f78e90fdbdaaaae1ee5262955ccc861b91f8a785 Mon Sep 17 00:00:00 2001 From: "Zhang.Jinrui" Date: Wed, 18 May 2022 17:49:49 +0800 Subject: [PATCH] Add distribution plan logic for AlignedSeriesScan (#5941) --- .../mpp/plan/planner/DistributionPlanner.java | 53 +++++++++++++++---- .../node/source/AlignedSeriesScanNode.java | 12 ++++- 2 files changed, 54 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java index f0eb1fb38736..ef0a8b96cd1e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java @@ -43,8 +43,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement; @@ -234,16 +236,10 @@ public PlanNode visitSeriesScan(SeriesScanNode node, DistributionPlanContext con return timeJoinNode; } - @Override public PlanNode visitSeriesAggregationScan( SeriesAggregationScanNode node, DistributionPlanContext context) { List dataDistribution = analysis.getPartitionInfo(node.getSeriesPath(), node.getTimeFilter()); - if (dataDistribution.size() == 1) { - node.setRegionReplicaSet(dataDistribution.get(0)); - return node; - } - List leafAggDescriptorList = new ArrayList<>(); node.getAggregationDescriptorList() .forEach( @@ -279,6 +275,26 @@ public PlanNode visitSeriesAggregationScan( return aggregationNode; } + @Override + public PlanNode visitAlignedSeriesScan( + AlignedSeriesScanNode node, DistributionPlanContext context) { + List dataDistribution = + analysis.getPartitionInfo(node.getAlignedPath(), node.getTimeFilter()); + if (dataDistribution.size() == 1) { + node.setRegionReplicaSet(dataDistribution.get(0)); + return node; + } + TimeJoinNode timeJoinNode = + new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder()); + for (TRegionReplicaSet dataRegion : dataDistribution) { + AlignedSeriesScanNode split = (AlignedSeriesScanNode) node.clone(); + split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); + split.setRegionReplicaSet(dataRegion); + timeJoinNode.addChild(split); + } + return timeJoinNode; + } + @Override public PlanNode visitSchemaFetchMerge( SchemaFetchMergeNode node, DistributionPlanContext context) { @@ -314,7 +330,7 @@ public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context // Step 1: Get all source nodes. For the node which is not source, add it as the child of // current TimeJoinNode - List sources = new ArrayList<>(); + List sources = new ArrayList<>(); for (PlanNode child : node.getChildren()) { if (child instanceof SeriesScanNode) { // If the child is SeriesScanNode, we need to check whether this node should be seperated @@ -330,6 +346,18 @@ public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context split.setRegionReplicaSet(dataRegion); sources.add(split); } + } else if (child instanceof AlignedSeriesScanNode) { + AlignedSeriesScanNode handle = (AlignedSeriesScanNode) child; + List dataDistribution = + analysis.getPartitionInfo(handle.getAlignedPath(), handle.getTimeFilter()); + // If the size of dataDistribution is m, this SeriesScanNode should be seperated into m + // SeriesScanNode. + for (TRegionReplicaSet dataRegion : dataDistribution) { + AlignedSeriesScanNode split = (AlignedSeriesScanNode) handle.clone(); + split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); + split.setRegionReplicaSet(dataRegion); + sources.add(split); + } } else if (child instanceof SeriesAggregationScanNode) { // TODO: (xingtanzjr) We should do the same thing for SeriesAggregateScanNode. Consider to // make SeriesAggregateScanNode @@ -344,8 +372,8 @@ public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context } // Step 2: For the source nodes, group them by the DataRegion. - Map> sourceGroup = - sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getRegionReplicaSet)); + Map> sourceGroup = + sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet)); // Step 3: For the source nodes which belong to same data region, add a TimeJoinNode for them // and make the // new TimeJoinNode as the child of current TimeJoinNode @@ -481,6 +509,13 @@ public PlanNode visitSeriesScan(SeriesScanNode node, NodeGroupContext context) { } @Override + public PlanNode visitAlignedSeriesScan(AlignedSeriesScanNode node, NodeGroupContext context) { + context.putNodeDistribution( + node.getPlanNodeId(), + new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet())); + return node.clone(); + } + public PlanNode visitSeriesAggregationScan( SeriesAggregationScanNode node, NodeGroupContext context) { context.putNodeDistribution( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java index 749c653e0b25..72c2a2424483 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java @@ -129,11 +129,13 @@ public void open() throws Exception {} @Override public TRegionReplicaSet getRegionReplicaSet() { - return null; + return regionReplicaSet; } @Override - public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {} + public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) { + this.regionReplicaSet = regionReplicaSet; + } @Override public void close() throws Exception {} @@ -265,4 +267,10 @@ public int hashCode() { offset, regionReplicaSet); } + + public String toString() { + return String.format( + "AlignedSeriesScanNode-%s:[SeriesPath: %s, DataRegion: %s]", + this.getPlanNodeId(), this.getAlignedPath(), this.getRegionReplicaSet()); + } }