From 27cfe12f4a43881f8e3023e9aed00a1abaedf821 Mon Sep 17 00:00:00 2001 From: Sree Charan Manamala <155449160+sreemanamala@users.noreply.github.com> Date: Wed, 29 May 2024 12:17:12 +0530 Subject: [PATCH] Enable reordering of window operators (#16482) This commit aims to enable the re-ordering of window operators in order to optimise the sort and partition operators. Example : ``` SELECT m1, m2, SUM(m1) OVER(PARTITION BY m2) as sum1, SUM(m2) OVER() as sum2 from numFoo GROUP BY m1,m2 ``` In order to compute this query, we can order the operators as to first compute the operators corresponding to sum2 and then place the operators corresponding to sum1 which would help us in reducing one sort operator if we order our operators by sum1 and then sum2. --- .../apache/druid/msq/exec/MSQWindowTest.java | 18 +- .../druid/sql/calcite/rel/Windowing.java | 163 ++++++++++++++---- .../tests/window/WindowOpReorder.sqlTest | 51 ++++++ 3 files changed, 185 insertions(+), 47 deletions(-) create mode 100644 sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java index 1ffa89ab2471..b3f6ac20d53e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java @@ -217,11 +217,11 @@ public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty(String contex .add("w1", ColumnType.DOUBLE) .build(), ImmutableList.of( + new NaivePartitioningOperatorFactory(ImmutableList.of()), + new WindowOperatorFactory(proc1), new NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("d0"))), new NaivePartitioningOperatorFactory(ImmutableList.of("d0")), - new WindowOperatorFactory(proc), - new NaivePartitioningOperatorFactory(ImmutableList.of()), - new WindowOperatorFactory(proc1) + new WindowOperatorFactory(proc) ), null ); @@ -245,12 +245,12 @@ public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty(String contex .build()) .setExpectedRowSignature(rowSignature) .setExpectedResultRows(ImmutableList.of( - new Object[]{1.0f, 1.0, 1.0, 1.0}, - new Object[]{2.0f, 2.0, 2.0, 2.0}, - new Object[]{3.0f, 3.0, 3.0, 3.0}, - new Object[]{4.0f, 4.0, 4.0, 4.0}, - new Object[]{5.0f, 5.0, 5.0, 5.0}, - new Object[]{6.0f, 6.0, 6.0, 6.0} + new Object[]{1.0f, 1.0, 1.0, 21.0}, + new Object[]{2.0f, 2.0, 2.0, 21.0}, + new Object[]{3.0f, 3.0, 3.0, 21.0}, + new Object[]{4.0f, 4.0, 4.0, 21.0}, + new Object[]{5.0f, 5.0, 5.0, 21.0}, + new Object[]{6.0f, 6.0, 6.0, 21.0} )) .setQueryContext(context) .setExpectedCountersForStageWorkerChannel( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java index 20c672ce924b..4f0f0eda21b9 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java @@ -68,9 +68,11 @@ import javax.annotation.Nonnull; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; +import java.util.Objects; /** * Maps onto a {@link org.apache.druid.query.operator.WindowOperatorQuery}. @@ -123,45 +125,14 @@ public static Windowing fromCalciteStuff( { final Window window = Preconditions.checkNotNull(partialQuery.getWindow(), "window"); - ArrayList ops = new ArrayList<>(); - + final List windowGroupProcessors = new ArrayList<>(); final List windowOutputColumns = new ArrayList<>(sourceRowSignature.getColumnNames()); + final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("w", sourceRowSignature.getColumnNames()); int outputNameCounter = 0; - // Track prior partition columns and sort columns group-to-group, so we only insert sorts and repartitions if - // we really need to. - List priorPartitionColumns = null; - LinkedHashSet priorSortColumns = new LinkedHashSet<>(); - - final RelCollation priorCollation = partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE); - if (priorCollation != null) { - // Populate initial priorSortColumns using collation of the input to the window operation. Allows us to skip - // the initial sort operator if the rows were already in the desired order. - priorSortColumns = computeSortColumnsFromRelCollation(priorCollation, sourceRowSignature); - } - - for (int i = 0; i < window.groups.size(); ++i) { - final WindowGroup group = new WindowGroup(window, window.groups.get(i), sourceRowSignature); - - final LinkedHashSet sortColumns = new LinkedHashSet<>(); - for (String partitionColumn : group.getPartitionColumns()) { - sortColumns.add(ColumnWithDirection.ascending(partitionColumn)); - } - sortColumns.addAll(group.getOrdering()); - - // Add sorting and partitioning if needed. - if (!sortMatches(priorSortColumns, sortColumns)) { - // Sort order needs to change. Resort and repartition. - ops.add(new NaiveSortOperatorFactory(new ArrayList<>(sortColumns))); - ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns())); - priorSortColumns = sortColumns; - priorPartitionColumns = group.getPartitionColumns(); - } else if (!group.getPartitionColumns().equals(priorPartitionColumns)) { - // Sort order doesn't need to change, but partitioning does. Only repartition. - ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns())); - priorPartitionColumns = group.getPartitionColumns(); - } + for (Window.Group windowGroup : window.groups) { + final WindowGroup group = new WindowGroup(window, windowGroup, sourceRowSignature); // Add aggregations. final List aggregateCalls = group.getAggregateCalls(); @@ -184,7 +155,8 @@ public static Windowing fromCalciteStuff( InputAccessor.buildFor( window, partialQuery.getSelectProject(), - sourceRowSignature), + sourceRowSignature + ), Collections.emptyList(), aggName, aggregateCall, @@ -232,12 +204,14 @@ public static Windowing fromCalciteStuff( throw new ISE("No processors from Window[%s], why was this code called?", window); } - ops.add(new WindowOperatorFactory( + windowGroupProcessors.add(new WindowComputationProcessor(group, new WindowOperatorFactory( processors.size() == 1 ? processors.get(0) : new ComposingProcessor(processors.toArray(new Processor[0])) - )); + ))); } + List ops = computeWindowOperations(partialQuery, sourceRowSignature, windowGroupProcessors); + // Apply windowProject, if present. if (partialQuery.getWindowProject() != null) { // We know windowProject is a mapping due to the isMapping() check in DruidRules. @@ -270,6 +244,119 @@ public static Windowing fromCalciteStuff( } } + /** + * Computes the list of operators that are to be applied in an optimised order + */ + private static List computeWindowOperations( + final PartialDruidQuery partialQuery, + final RowSignature sourceRowSignature, + List windowGroupProcessors + ) + { + final List ops = new ArrayList<>(); + // Track prior partition columns and sort columns group-to-group, so we only insert sorts and repartitions if + // we really need to. + List priorPartitionColumns = null; + LinkedHashSet priorSortColumns = new LinkedHashSet<>(); + + final RelCollation priorCollation = partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE); + if (priorCollation != null) { + // Populate initial priorSortColumns using collation of the input to the window operation. Allows us to skip + // the initial sort operator if the rows were already in the desired order. + priorSortColumns = computeSortColumnsFromRelCollation(priorCollation, sourceRowSignature); + } + + // sort the processors to optimise the order of window operators + // currently we are moving the empty groups to the front + windowGroupProcessors.sort(WindowComputationProcessor.MOVE_EMPTY_GROUPS_FIRST); + + for (WindowComputationProcessor windowComputationProcessor : windowGroupProcessors) { + final WindowGroup group = windowComputationProcessor.getGroup(); + final LinkedHashSet sortColumns = new LinkedHashSet<>(); + for (String partitionColumn : group.getPartitionColumns()) { + sortColumns.add(ColumnWithDirection.ascending(partitionColumn)); + } + sortColumns.addAll(group.getOrdering()); + + // Add sorting and partitioning if needed. + if (!sortMatches(priorSortColumns, sortColumns)) { + // Sort order needs to change. Resort and repartition. + ops.add(new NaiveSortOperatorFactory(new ArrayList<>(sortColumns))); + ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns())); + priorSortColumns = sortColumns; + priorPartitionColumns = group.getPartitionColumns(); + } else if (!group.getPartitionColumns().equals(priorPartitionColumns)) { + // Sort order doesn't need to change, but partitioning does. Only repartition. + ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns())); + priorPartitionColumns = group.getPartitionColumns(); + } + + ops.add(windowComputationProcessor.getProcessorOperatorFactory()); + } + + return ops; + } + + private static class WindowComputationProcessor + { + private final WindowGroup group; + private final OperatorFactory processorOperatorFactory; + + public WindowComputationProcessor(WindowGroup group, OperatorFactory processorOperatorFactory) + { + this.group = group; + this.processorOperatorFactory = processorOperatorFactory; + } + + public WindowGroup getGroup() + { + return group; + } + + public OperatorFactory getProcessorOperatorFactory() + { + return processorOperatorFactory; + } + + /** + * Comparator to move the empty windows to the front + */ + public static final Comparator MOVE_EMPTY_GROUPS_FIRST = (o1, o2) -> { + if (o1.getGroup().getPartitionColumns().isEmpty() && o2.getGroup().getPartitionColumns().isEmpty()) { + return 0; + } + if (o1.getGroup().getPartitionColumns().isEmpty()) { + return -1; + } + if (o2.getGroup().getPartitionColumns().isEmpty()) { + return 1; + } + return 0; + }; + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WindowComputationProcessor obj = (WindowComputationProcessor) o; + return Objects.equals(group, obj.group) && Objects.equals( + processorOperatorFactory, + obj.processorOperatorFactory + ); + } + + @Override + public int hashCode() + { + return Objects.hash(group, processorOperatorFactory); + } + } + private final RowSignature signature; public Windowing( diff --git a/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest b/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest new file mode 100644 index 000000000000..0c9d88b5041f --- /dev/null +++ b/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest @@ -0,0 +1,51 @@ +type: "operatorValidation" + +sql: | + SELECT + m1, + m2, + SUM(m1) OVER(PARTITION BY m2) as sum1, + SUM(m2) OVER() as sum2 + from druid.numfoo + GROUP BY m1,m2 + +expectedOperators: + - type: "naivePartition" + partitionColumns: [ ] + - type: "window" + processor: + type: "framedAgg" + frame: + peerType: "ROWS" + lowUnbounded: true + lowOffset: 0 + uppUnbounded: true + uppOffset: 0 + orderBy: null + aggregations: + - { "type": "doubleSum", "name": "w1", "fieldName": "_d1" } + - type: "naiveSort" + columns: + - column: "_d1" + direction: "ASC" + - type: "naivePartition" + partitionColumns: [ "_d1" ] + - type: "window" + processor: + type: "framedAgg" + frame: + peerType: "ROWS" + lowUnbounded: true + lowOffset: 0 + uppUnbounded: true + uppOffset: 0 + orderBy: null + aggregations: + - { "type": "doubleSum", "name": "w0", "fieldName": "_d0" } +expectedResults: + - [1.0, 1.0, 1.0, 21.0] + - [2.0, 2.0, 2.0, 21.0] + - [3.0, 3.0, 3.0, 21.0] + - [4.0, 4.0, 4.0, 21.0] + - [5.0, 5.0, 5.0, 21.0] + - [6.0, 6.0, 6.0, 21.0] \ No newline at end of file