Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.DistributeType;
import org.apache.doris.nereids.trees.plans.Plan;
Expand All @@ -49,6 +51,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.JoinUtils;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -372,6 +375,40 @@ public Void visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink, Pla
return null;
}

@Override
public Void visitPhysicalWindow(PhysicalWindow<? extends Plan> window, PlanContext context) {
// requiredProperties:
// Distribution: partitionKeys
// Order: requiredOrderKeys
WindowFrameGroup windowFrameGroup = window.getWindowFrameGroup();
// all keys that need to be sorted, which includes BOTH partitionKeys and orderKeys from this group
List<OrderKey> keysNeedToBeSorted = Lists.newArrayList();
if (!windowFrameGroup.getPartitionKeys().isEmpty()) {
keysNeedToBeSorted.addAll(windowFrameGroup.getPartitionKeys().stream().map(partitionKey -> {
// todo: haven't support isNullFirst, and its default value is false(see AnalyticPlanner,
// but in LogicalPlanBuilder, its default value is true)
return new OrderKey(partitionKey, true, false);
}).collect(Collectors.toList()));
}
if (!windowFrameGroup.getOrderKeys().isEmpty()) {
keysNeedToBeSorted.addAll(windowFrameGroup.getOrderKeys().stream()
.map(OrderExpression::getOrderKey)
.collect(Collectors.toList())
);
}

if (windowFrameGroup.getPartitionKeys().isEmpty() && windowFrameGroup.getOrderKeys().isEmpty()) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
} else if (windowFrameGroup.getPartitionKeys().isEmpty() && !windowFrameGroup.getOrderKeys().isEmpty()) {
addRequestPropertyToChildren(PhysicalProperties.GATHER.withOrderSpec(new OrderSpec(keysNeedToBeSorted)));
} else if (!windowFrameGroup.getPartitionKeys().isEmpty()) {
addRequestPropertyToChildren(PhysicalProperties.createHash(
windowFrameGroup.getPartitionKeys(), ShuffleType.REQUIRE)
.withOrderSpec(new OrderSpec(keysNeedToBeSorted)));
}
return null;
}

private List<PhysicalProperties> createHashRequestAccordingToParent(
SetOperation setOperation, DistributionSpecHash distributionRequestFromParent, PlanContext context) {
List<PhysicalProperties> requiredPropertyList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@
package org.apache.doris.nereids.rules.implementation;

import org.apache.doris.nereids.annotation.DependsRules;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.properties.OrderSpec;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.properties.RequireProperties;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
Expand All @@ -37,6 +33,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -55,8 +52,6 @@
* step 2: sort PartitionKeyGroup with increasing order of tupleSize
* step 3: for every WindowFrameGroup of each SortGroup, generate one PhysicalWindow node, with common PartitionKeys,
* OrderKeys, unique WindowFrame and a function list.
* step 4: for each PhysicalWindow, generate RequiredProperties, including PartitionKey for DistributionSpec,
* and (PartitionKey + OrderKey) for OrderSpec.
*/
@DependsRules({
CheckAndStandardizeWindowFunctionAndFrame.class,
Expand Down Expand Up @@ -118,70 +113,23 @@ private Plan createPhysicalPlanNodeForWindowFrameGroup(Plan root, OrderKeyGroup
// Plan newRoot = createPhysicalSortNode(root, orderKeyGroup, ctx);
Plan newRoot = root;

// we will not add PhysicalSort in this step, but generate it if necessary with the ability of enforcer by
// setting RequiredProperties for PhysicalWindow
List<OrderKey> requiredOrderKeys = generateKeysNeedToBeSorted(orderKeyGroup);

// PhysicalWindow nodes for each different window frame, so at least one PhysicalWindow node will be added
for (WindowFrameGroup windowFrameGroup : orderKeyGroup.groups) {
newRoot = createPhysicalWindow(newRoot, windowFrameGroup, requiredOrderKeys);
newRoot = createPhysicalWindow(newRoot, windowFrameGroup);
}

return newRoot;
}

private List<OrderKey> generateKeysNeedToBeSorted(OrderKeyGroup orderKeyGroup) {
// all keys that need to be sorted, which includes BOTH partitionKeys and orderKeys from this group
List<OrderKey> keysNeedToBeSorted = Lists.newArrayList();

// used as SortNode.isAnalyticSort, but it is unnecessary to add it in LogicalSort
if (!orderKeyGroup.partitionKeys.isEmpty()) {
keysNeedToBeSorted.addAll(orderKeyGroup.partitionKeys.stream().map(partitionKey -> {
// todo: haven't support isNullFirst, and its default value is false(see AnalyticPlanner,
// but in LogicalPlanBuilder, its default value is true)
return new OrderKey(partitionKey, true, false);
}).collect(Collectors.toList()));
}

if (!orderKeyGroup.orderKeys.isEmpty()) {
keysNeedToBeSorted.addAll(orderKeyGroup.orderKeys.stream()
.map(OrderExpression::getOrderKey)
.collect(Collectors.toList())
);
}
return keysNeedToBeSorted;
}

private PhysicalWindow<Plan> createPhysicalWindow(Plan root, WindowFrameGroup windowFrameGroup,
List<OrderKey> requiredOrderKeys) {
// requiredProperties:
// Distribution: partitionKeys
// Order: requiredOrderKeys
private PhysicalWindow<Plan> createPhysicalWindow(Plan root, WindowFrameGroup windowFrameGroup) {
LogicalWindow<Plan> tempLogicalWindow = new LogicalWindow<>(windowFrameGroup.groups, root);
PhysicalWindow<Plan> physicalWindow = new PhysicalWindow<>(
windowFrameGroup,
RequireProperties.followParent(),
tempLogicalWindow.getWindowExpressions(),
tempLogicalWindow.getLogicalProperties(),
root);

if (windowFrameGroup.partitionKeys.isEmpty() && requiredOrderKeys.isEmpty()) {
return physicalWindow.withRequirePropertiesAndChild(RequireProperties.of(PhysicalProperties.GATHER), root);
}

// todo: WFGs in the same OKG only need same RequiredProperties
PhysicalProperties properties;
if (windowFrameGroup.partitionKeys.isEmpty()) {
properties = PhysicalProperties.GATHER.withOrderSpec(new OrderSpec(requiredOrderKeys));
} else {
properties = PhysicalProperties.createHash(
windowFrameGroup.partitionKeys, ShuffleType.REQUIRE);
// requiredOrderKeys contain partitionKeys, so there is no need to check if requiredOrderKeys.isEmpty()
properties = properties.withOrderSpec(new OrderSpec(requiredOrderKeys));
}

RequireProperties requireProperties = RequireProperties.of(properties);
return physicalWindow.withRequirePropertiesAndChild(requireProperties, root);
return (PhysicalWindow) physicalWindow.withChildren(ImmutableList.of(root));
}

/* ********************************************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,19 @@
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.AssertNumRowsElement;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.expressions.WindowFrame;
import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameBoundary;
import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameUnitsType;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
import org.apache.doris.nereids.trees.expressions.functions.window.RowNumber;
import org.apache.doris.nereids.trees.plans.AggMode;
import org.apache.doris.nereids.trees.plans.AggPhase;
import org.apache.doris.nereids.trees.plans.DistributeType;
Expand All @@ -36,6 +45,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
import org.apache.doris.nereids.types.IntegerType;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -252,4 +262,98 @@ void testAssertNumRows() {
expected.add(Lists.newArrayList(PhysicalProperties.GATHER));
Assertions.assertEquals(expected, actual);
}

@Test
void testWindowWithPartitionKeyAndOrderKey() {
SlotReference col1 = new SlotReference("col1", IntegerType.INSTANCE);
SlotReference col2 = new SlotReference("col2", IntegerType.INSTANCE);
Expression rowNumber = new RowNumber();
WindowExpression windowExpression = new WindowExpression(rowNumber, ImmutableList.of(col1),
ImmutableList.of(new OrderExpression(new OrderKey(col2, true, false))),
new WindowFrame(FrameUnitsType.RANGE,
FrameBoundary.newPrecedingBoundary(), FrameBoundary.newCurrentRowBoundary()));
Alias alias = new Alias(windowExpression);
WindowFrameGroup windowFrameGroup = new WindowFrameGroup(alias);
PhysicalWindow<GroupPlan> window = new PhysicalWindow<>(windowFrameGroup, null,
ImmutableList.of(alias), logicalProperties, groupPlan);
GroupExpression groupExpression = new GroupExpression(window);
new Group(null, groupExpression, null);
RequestPropertyDeriver requestPropertyDeriver = new RequestPropertyDeriver(null, jobContext);
List<List<PhysicalProperties>> actual
= requestPropertyDeriver.getRequestChildrenPropertyList(groupExpression);
List<List<PhysicalProperties>> expected = Lists.newArrayList();
expected.add(Lists.newArrayList(PhysicalProperties.createHash(ImmutableList.of(col1.getExprId()), ShuffleType.REQUIRE).withOrderSpec(
new OrderSpec(ImmutableList.of(new OrderKey(col1, true, false), new OrderKey(col2, true, false)))
)));
Assertions.assertEquals(expected, actual);
}

@Test
void testWindowWithPartitionKeyAndNoOrderKey() {
SlotReference col1 = new SlotReference("col1", IntegerType.INSTANCE);
Expression rowNumber = new RowNumber();
WindowExpression windowExpression = new WindowExpression(rowNumber, ImmutableList.of(col1),
ImmutableList.of(),
new WindowFrame(FrameUnitsType.RANGE,
FrameBoundary.newPrecedingBoundary(), FrameBoundary.newCurrentRowBoundary()));
Alias alias = new Alias(windowExpression);
WindowFrameGroup windowFrameGroup = new WindowFrameGroup(alias);
PhysicalWindow<GroupPlan> window = new PhysicalWindow<>(windowFrameGroup, null,
ImmutableList.of(alias), logicalProperties, groupPlan);
GroupExpression groupExpression = new GroupExpression(window);
new Group(null, groupExpression, null);
RequestPropertyDeriver requestPropertyDeriver = new RequestPropertyDeriver(null, jobContext);
List<List<PhysicalProperties>> actual
= requestPropertyDeriver.getRequestChildrenPropertyList(groupExpression);
List<List<PhysicalProperties>> expected = Lists.newArrayList();
expected.add(Lists.newArrayList(PhysicalProperties.createHash(ImmutableList.of(col1.getExprId()), ShuffleType.REQUIRE).withOrderSpec(
new OrderSpec(ImmutableList.of(new OrderKey(col1, true, false)))
)));
Assertions.assertEquals(expected, actual);
}

@Test
void testWindowWithNoPartitionKeyAndOrderKey() {
SlotReference col2 = new SlotReference("col2", IntegerType.INSTANCE);
Expression rowNumber = new RowNumber();
WindowExpression windowExpression = new WindowExpression(rowNumber, ImmutableList.of(),
ImmutableList.of(new OrderExpression(new OrderKey(col2, true, false))),
new WindowFrame(FrameUnitsType.RANGE,
FrameBoundary.newPrecedingBoundary(), FrameBoundary.newCurrentRowBoundary()));
Alias alias = new Alias(windowExpression);
WindowFrameGroup windowFrameGroup = new WindowFrameGroup(alias);
PhysicalWindow<GroupPlan> window = new PhysicalWindow<>(windowFrameGroup, null,
ImmutableList.of(alias), logicalProperties, groupPlan);
GroupExpression groupExpression = new GroupExpression(window);
new Group(null, groupExpression, null);
RequestPropertyDeriver requestPropertyDeriver = new RequestPropertyDeriver(null, jobContext);
List<List<PhysicalProperties>> actual
= requestPropertyDeriver.getRequestChildrenPropertyList(groupExpression);
List<List<PhysicalProperties>> expected = Lists.newArrayList();
expected.add(Lists.newArrayList(PhysicalProperties.GATHER.withOrderSpec(
new OrderSpec(ImmutableList.of(new OrderKey(col2, true, false)))
)));
Assertions.assertEquals(expected, actual);
}

@Test
void testWindowWithNoPartitionKeyAndNoOrderKey() {
Expression rowNumber = new RowNumber();
WindowExpression windowExpression = new WindowExpression(rowNumber, ImmutableList.of(),
ImmutableList.of(),
new WindowFrame(FrameUnitsType.RANGE,
FrameBoundary.newPrecedingBoundary(), FrameBoundary.newCurrentRowBoundary()));
Alias alias = new Alias(windowExpression);
WindowFrameGroup windowFrameGroup = new WindowFrameGroup(alias);
PhysicalWindow<GroupPlan> window = new PhysicalWindow<>(windowFrameGroup, null,
ImmutableList.of(alias), logicalProperties, groupPlan);
GroupExpression groupExpression = new GroupExpression(window);
new Group(null, groupExpression, null);
RequestPropertyDeriver requestPropertyDeriver = new RequestPropertyDeriver(null, jobContext);
List<List<PhysicalProperties>> actual
= requestPropertyDeriver.getRequestChildrenPropertyList(groupExpression);
List<List<PhysicalProperties>> expected = Lists.newArrayList();
expected.add(Lists.newArrayList(PhysicalProperties.GATHER));
Assertions.assertEquals(expected, actual);
}
}