diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java index e184ce2777d7e5..72e254fe632a35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java @@ -24,9 +24,11 @@ import org.apache.doris.nereids.jobs.JobContext; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType; +import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup; import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.OrderExpression; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.DistributeType; import org.apache.doris.nereids.trees.plans.Plan; @@ -49,6 +51,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation; import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; +import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.JoinUtils; import org.apache.doris.qe.ConnectContext; @@ -372,6 +375,40 @@ public Void visitPhysicalFileSink(PhysicalFileSink fileSink, Pla return null; } + @Override + public Void visitPhysicalWindow(PhysicalWindow window, PlanContext context) { + // requiredProperties: + // Distribution: partitionKeys + // Order: requiredOrderKeys + WindowFrameGroup windowFrameGroup = window.getWindowFrameGroup(); + // all keys that need to be sorted, which includes BOTH partitionKeys and orderKeys from this group + List keysNeedToBeSorted = Lists.newArrayList(); + if (!windowFrameGroup.getPartitionKeys().isEmpty()) { + keysNeedToBeSorted.addAll(windowFrameGroup.getPartitionKeys().stream().map(partitionKey -> { + // todo: haven't support isNullFirst, and its default value is false(see AnalyticPlanner, + // but in LogicalPlanBuilder, its default value is true) + return new OrderKey(partitionKey, true, false); + }).collect(Collectors.toList())); + } + if (!windowFrameGroup.getOrderKeys().isEmpty()) { + keysNeedToBeSorted.addAll(windowFrameGroup.getOrderKeys().stream() + .map(OrderExpression::getOrderKey) + .collect(Collectors.toList()) + ); + } + + if (windowFrameGroup.getPartitionKeys().isEmpty() && windowFrameGroup.getOrderKeys().isEmpty()) { + addRequestPropertyToChildren(PhysicalProperties.GATHER); + } else if (windowFrameGroup.getPartitionKeys().isEmpty() && !windowFrameGroup.getOrderKeys().isEmpty()) { + addRequestPropertyToChildren(PhysicalProperties.GATHER.withOrderSpec(new OrderSpec(keysNeedToBeSorted))); + } else if (!windowFrameGroup.getPartitionKeys().isEmpty()) { + addRequestPropertyToChildren(PhysicalProperties.createHash( + windowFrameGroup.getPartitionKeys(), ShuffleType.REQUIRE) + .withOrderSpec(new OrderSpec(keysNeedToBeSorted))); + } + return null; + } + private List createHashRequestAccordingToParent( SetOperation setOperation, DistributionSpecHash distributionRequestFromParent, PlanContext context) { List requiredPropertyList = diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalWindowToPhysicalWindow.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalWindowToPhysicalWindow.java index 8e607bee6e5def..41fb44ec6d0258 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalWindowToPhysicalWindow.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalWindowToPhysicalWindow.java @@ -18,10 +18,6 @@ package org.apache.doris.nereids.rules.implementation; import org.apache.doris.nereids.annotation.DependsRules; -import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType; -import org.apache.doris.nereids.properties.OrderKey; -import org.apache.doris.nereids.properties.OrderSpec; -import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.properties.RequireProperties; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; @@ -37,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalWindow; import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; @@ -55,8 +52,6 @@ * step 2: sort PartitionKeyGroup with increasing order of tupleSize * step 3: for every WindowFrameGroup of each SortGroup, generate one PhysicalWindow node, with common PartitionKeys, * OrderKeys, unique WindowFrame and a function list. - * step 4: for each PhysicalWindow, generate RequiredProperties, including PartitionKey for DistributionSpec, - * and (PartitionKey + OrderKey) for OrderSpec. */ @DependsRules({ CheckAndStandardizeWindowFunctionAndFrame.class, @@ -118,45 +113,15 @@ private Plan createPhysicalPlanNodeForWindowFrameGroup(Plan root, OrderKeyGroup // Plan newRoot = createPhysicalSortNode(root, orderKeyGroup, ctx); Plan newRoot = root; - // we will not add PhysicalSort in this step, but generate it if necessary with the ability of enforcer by - // setting RequiredProperties for PhysicalWindow - List requiredOrderKeys = generateKeysNeedToBeSorted(orderKeyGroup); - // PhysicalWindow nodes for each different window frame, so at least one PhysicalWindow node will be added for (WindowFrameGroup windowFrameGroup : orderKeyGroup.groups) { - newRoot = createPhysicalWindow(newRoot, windowFrameGroup, requiredOrderKeys); + newRoot = createPhysicalWindow(newRoot, windowFrameGroup); } return newRoot; } - private List generateKeysNeedToBeSorted(OrderKeyGroup orderKeyGroup) { - // all keys that need to be sorted, which includes BOTH partitionKeys and orderKeys from this group - List keysNeedToBeSorted = Lists.newArrayList(); - - // used as SortNode.isAnalyticSort, but it is unnecessary to add it in LogicalSort - if (!orderKeyGroup.partitionKeys.isEmpty()) { - keysNeedToBeSorted.addAll(orderKeyGroup.partitionKeys.stream().map(partitionKey -> { - // todo: haven't support isNullFirst, and its default value is false(see AnalyticPlanner, - // but in LogicalPlanBuilder, its default value is true) - return new OrderKey(partitionKey, true, false); - }).collect(Collectors.toList())); - } - - if (!orderKeyGroup.orderKeys.isEmpty()) { - keysNeedToBeSorted.addAll(orderKeyGroup.orderKeys.stream() - .map(OrderExpression::getOrderKey) - .collect(Collectors.toList()) - ); - } - return keysNeedToBeSorted; - } - - private PhysicalWindow createPhysicalWindow(Plan root, WindowFrameGroup windowFrameGroup, - List requiredOrderKeys) { - // requiredProperties: - // Distribution: partitionKeys - // Order: requiredOrderKeys + private PhysicalWindow createPhysicalWindow(Plan root, WindowFrameGroup windowFrameGroup) { LogicalWindow tempLogicalWindow = new LogicalWindow<>(windowFrameGroup.groups, root); PhysicalWindow physicalWindow = new PhysicalWindow<>( windowFrameGroup, @@ -164,24 +129,7 @@ private PhysicalWindow createPhysicalWindow(Plan root, WindowFrameGroup wi tempLogicalWindow.getWindowExpressions(), tempLogicalWindow.getLogicalProperties(), root); - - if (windowFrameGroup.partitionKeys.isEmpty() && requiredOrderKeys.isEmpty()) { - return physicalWindow.withRequirePropertiesAndChild(RequireProperties.of(PhysicalProperties.GATHER), root); - } - - // todo: WFGs in the same OKG only need same RequiredProperties - PhysicalProperties properties; - if (windowFrameGroup.partitionKeys.isEmpty()) { - properties = PhysicalProperties.GATHER.withOrderSpec(new OrderSpec(requiredOrderKeys)); - } else { - properties = PhysicalProperties.createHash( - windowFrameGroup.partitionKeys, ShuffleType.REQUIRE); - // requiredOrderKeys contain partitionKeys, so there is no need to check if requiredOrderKeys.isEmpty() - properties = properties.withOrderSpec(new OrderSpec(requiredOrderKeys)); - } - - RequireProperties requireProperties = RequireProperties.of(properties); - return physicalWindow.withRequirePropertiesAndChild(requireProperties, root); + return (PhysicalWindow) physicalWindow.withChildren(ImmutableList.of(root)); } /* ******************************************************************************************** diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java index fa10f16c58c0f2..a0c1d19d2fbeff 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java @@ -23,10 +23,19 @@ import org.apache.doris.nereids.memo.Group; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType; +import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup; +import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.AssertNumRowsElement; import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.OrderExpression; import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.WindowExpression; +import org.apache.doris.nereids.trees.expressions.WindowFrame; +import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameBoundary; +import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameUnitsType; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam; +import org.apache.doris.nereids.trees.expressions.functions.window.RowNumber; import org.apache.doris.nereids.trees.plans.AggMode; import org.apache.doris.nereids.trees.plans.AggPhase; import org.apache.doris.nereids.trees.plans.DistributeType; @@ -36,6 +45,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow; import org.apache.doris.nereids.types.IntegerType; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.qe.ConnectContext; @@ -252,4 +262,98 @@ void testAssertNumRows() { expected.add(Lists.newArrayList(PhysicalProperties.GATHER)); Assertions.assertEquals(expected, actual); } + + @Test + void testWindowWithPartitionKeyAndOrderKey() { + SlotReference col1 = new SlotReference("col1", IntegerType.INSTANCE); + SlotReference col2 = new SlotReference("col2", IntegerType.INSTANCE); + Expression rowNumber = new RowNumber(); + WindowExpression windowExpression = new WindowExpression(rowNumber, ImmutableList.of(col1), + ImmutableList.of(new OrderExpression(new OrderKey(col2, true, false))), + new WindowFrame(FrameUnitsType.RANGE, + FrameBoundary.newPrecedingBoundary(), FrameBoundary.newCurrentRowBoundary())); + Alias alias = new Alias(windowExpression); + WindowFrameGroup windowFrameGroup = new WindowFrameGroup(alias); + PhysicalWindow window = new PhysicalWindow<>(windowFrameGroup, null, + ImmutableList.of(alias), logicalProperties, groupPlan); + GroupExpression groupExpression = new GroupExpression(window); + new Group(null, groupExpression, null); + RequestPropertyDeriver requestPropertyDeriver = new RequestPropertyDeriver(null, jobContext); + List> actual + = requestPropertyDeriver.getRequestChildrenPropertyList(groupExpression); + List> expected = Lists.newArrayList(); + expected.add(Lists.newArrayList(PhysicalProperties.createHash(ImmutableList.of(col1.getExprId()), ShuffleType.REQUIRE).withOrderSpec( + new OrderSpec(ImmutableList.of(new OrderKey(col1, true, false), new OrderKey(col2, true, false))) + ))); + Assertions.assertEquals(expected, actual); + } + + @Test + void testWindowWithPartitionKeyAndNoOrderKey() { + SlotReference col1 = new SlotReference("col1", IntegerType.INSTANCE); + Expression rowNumber = new RowNumber(); + WindowExpression windowExpression = new WindowExpression(rowNumber, ImmutableList.of(col1), + ImmutableList.of(), + new WindowFrame(FrameUnitsType.RANGE, + FrameBoundary.newPrecedingBoundary(), FrameBoundary.newCurrentRowBoundary())); + Alias alias = new Alias(windowExpression); + WindowFrameGroup windowFrameGroup = new WindowFrameGroup(alias); + PhysicalWindow window = new PhysicalWindow<>(windowFrameGroup, null, + ImmutableList.of(alias), logicalProperties, groupPlan); + GroupExpression groupExpression = new GroupExpression(window); + new Group(null, groupExpression, null); + RequestPropertyDeriver requestPropertyDeriver = new RequestPropertyDeriver(null, jobContext); + List> actual + = requestPropertyDeriver.getRequestChildrenPropertyList(groupExpression); + List> expected = Lists.newArrayList(); + expected.add(Lists.newArrayList(PhysicalProperties.createHash(ImmutableList.of(col1.getExprId()), ShuffleType.REQUIRE).withOrderSpec( + new OrderSpec(ImmutableList.of(new OrderKey(col1, true, false))) + ))); + Assertions.assertEquals(expected, actual); + } + + @Test + void testWindowWithNoPartitionKeyAndOrderKey() { + SlotReference col2 = new SlotReference("col2", IntegerType.INSTANCE); + Expression rowNumber = new RowNumber(); + WindowExpression windowExpression = new WindowExpression(rowNumber, ImmutableList.of(), + ImmutableList.of(new OrderExpression(new OrderKey(col2, true, false))), + new WindowFrame(FrameUnitsType.RANGE, + FrameBoundary.newPrecedingBoundary(), FrameBoundary.newCurrentRowBoundary())); + Alias alias = new Alias(windowExpression); + WindowFrameGroup windowFrameGroup = new WindowFrameGroup(alias); + PhysicalWindow window = new PhysicalWindow<>(windowFrameGroup, null, + ImmutableList.of(alias), logicalProperties, groupPlan); + GroupExpression groupExpression = new GroupExpression(window); + new Group(null, groupExpression, null); + RequestPropertyDeriver requestPropertyDeriver = new RequestPropertyDeriver(null, jobContext); + List> actual + = requestPropertyDeriver.getRequestChildrenPropertyList(groupExpression); + List> expected = Lists.newArrayList(); + expected.add(Lists.newArrayList(PhysicalProperties.GATHER.withOrderSpec( + new OrderSpec(ImmutableList.of(new OrderKey(col2, true, false))) + ))); + Assertions.assertEquals(expected, actual); + } + + @Test + void testWindowWithNoPartitionKeyAndNoOrderKey() { + Expression rowNumber = new RowNumber(); + WindowExpression windowExpression = new WindowExpression(rowNumber, ImmutableList.of(), + ImmutableList.of(), + new WindowFrame(FrameUnitsType.RANGE, + FrameBoundary.newPrecedingBoundary(), FrameBoundary.newCurrentRowBoundary())); + Alias alias = new Alias(windowExpression); + WindowFrameGroup windowFrameGroup = new WindowFrameGroup(alias); + PhysicalWindow window = new PhysicalWindow<>(windowFrameGroup, null, + ImmutableList.of(alias), logicalProperties, groupPlan); + GroupExpression groupExpression = new GroupExpression(window); + new Group(null, groupExpression, null); + RequestPropertyDeriver requestPropertyDeriver = new RequestPropertyDeriver(null, jobContext); + List> actual + = requestPropertyDeriver.getRequestChildrenPropertyList(groupExpression); + List> expected = Lists.newArrayList(); + expected.add(Lists.newArrayList(PhysicalProperties.GATHER)); + Assertions.assertEquals(expected, actual); + } }