diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java index 20c672ce924b..a9791fab4d42 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java @@ -71,6 +71,7 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; +import java.util.Objects; /** * Maps onto a {@link org.apache.druid.query.operator.WindowOperatorQuery}. @@ -123,45 +124,14 @@ public static Windowing fromCalciteStuff( { final Window window = Preconditions.checkNotNull(partialQuery.getWindow(), "window"); - ArrayList ops = new ArrayList<>(); + List wrapperObjs = new ArrayList<>(); final List windowOutputColumns = new ArrayList<>(sourceRowSignature.getColumnNames()); final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("w", sourceRowSignature.getColumnNames()); int outputNameCounter = 0; - // Track prior partition columns and sort columns group-to-group, so we only insert sorts and repartitions if - // we really need to. - List priorPartitionColumns = null; - LinkedHashSet priorSortColumns = new LinkedHashSet<>(); - - final RelCollation priorCollation = partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE); - if (priorCollation != null) { - // Populate initial priorSortColumns using collation of the input to the window operation. Allows us to skip - // the initial sort operator if the rows were already in the desired order. - priorSortColumns = computeSortColumnsFromRelCollation(priorCollation, sourceRowSignature); - } - - for (int i = 0; i < window.groups.size(); ++i) { - final WindowGroup group = new WindowGroup(window, window.groups.get(i), sourceRowSignature); - - final LinkedHashSet sortColumns = new LinkedHashSet<>(); - for (String partitionColumn : group.getPartitionColumns()) { - sortColumns.add(ColumnWithDirection.ascending(partitionColumn)); - } - sortColumns.addAll(group.getOrdering()); - - // Add sorting and partitioning if needed. - if (!sortMatches(priorSortColumns, sortColumns)) { - // Sort order needs to change. Resort and repartition. - ops.add(new NaiveSortOperatorFactory(new ArrayList<>(sortColumns))); - ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns())); - priorSortColumns = sortColumns; - priorPartitionColumns = group.getPartitionColumns(); - } else if (!group.getPartitionColumns().equals(priorPartitionColumns)) { - // Sort order doesn't need to change, but partitioning does. Only repartition. - ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns())); - priorPartitionColumns = group.getPartitionColumns(); - } + for (Window.Group windowGroup : window.groups) { + final WindowGroup group = new WindowGroup(window, windowGroup, sourceRowSignature); // Add aggregations. final List aggregateCalls = group.getAggregateCalls(); @@ -232,10 +202,48 @@ public static Windowing fromCalciteStuff( throw new ISE("No processors from Window[%s], why was this code called?", window); } - ops.add(new WindowOperatorFactory( + wrapperObjs.add(new WindowGroupProcessorWrapper(windowGroup, new WindowOperatorFactory( processors.size() == 1 ? processors.get(0) : new ComposingProcessor(processors.toArray(new Processor[0])) - )); + ))); + } + + // Track prior partition columns and sort columns group-to-group, so we only insert sorts and repartitions if + // we really need to. + List priorPartitionColumns = null; + LinkedHashSet priorSortColumns = new LinkedHashSet<>(); + + final RelCollation priorCollation = partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE); + if (priorCollation != null) { + // Populate initial priorSortColumns using collation of the input to the window operation. Allows us to skip + // the initial sort operator if the rows were already in the desired order. + priorSortColumns = computeSortColumnsFromRelCollation(priorCollation, sourceRowSignature); + } + + Collections.sort(wrapperObjs); + ArrayList ops = new ArrayList<>(); + for (WindowGroupProcessorWrapper wrapperObj : wrapperObjs) { + final WindowGroup group = new WindowGroup(window, wrapperObj.getGroup(), sourceRowSignature); + final LinkedHashSet sortColumns = new LinkedHashSet<>(); + for (String partitionColumn : group.getPartitionColumns()) { + sortColumns.add(ColumnWithDirection.ascending(partitionColumn)); + } + sortColumns.addAll(group.getOrdering()); + + // Add sorting and partitioning if needed. + if (!sortMatches(priorSortColumns, sortColumns)) { + // Sort order needs to change. Resort and repartition. + ops.add(new NaiveSortOperatorFactory(new ArrayList<>(sortColumns))); + ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns())); + priorSortColumns = sortColumns; + priorPartitionColumns = group.getPartitionColumns(); + } else if (!group.getPartitionColumns().equals(priorPartitionColumns)) { + // Sort order doesn't need to change, but partitioning does. Only repartition. + ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns())); + priorPartitionColumns = group.getPartitionColumns(); + } + + ops.add(wrapperObj.getProcessorOperatorFactory()); } // Apply windowProject, if present. @@ -270,6 +278,74 @@ public static Windowing fromCalciteStuff( } } + /** + * A wrapper object which stores {@link org.apache.calcite.rel.core.Window.Group} + * along with its computed {@link WindowOperatorFactory} + * + * this allows us to sort the window groups in order to optimise the order of operators we would need to compute + * without losing the aggregate column name information (which is part of the computed WindowOperatorFactory) + */ + private static class WindowGroupProcessorWrapper implements Comparable + { + private final Window.Group group; + private final OperatorFactory processorOperatorFactory; + + public WindowGroupProcessorWrapper(Window.Group group, OperatorFactory processorOperatorFactory) + { + this.group = group; + this.processorOperatorFactory = processorOperatorFactory; + } + + public Window.Group getGroup() + { + return group; + } + + public OperatorFactory getProcessorOperatorFactory() + { + return processorOperatorFactory; + } + + @Override + public int compareTo(WindowGroupProcessorWrapper o) + { + // Need to work on this method to optimise the order in which we need to process based on the partitions + // currently just moves the empty windows to the front + if (this.group.keys.isEmpty() && o.group.keys.isEmpty()) { + return 0; + } + if (this.group.keys.isEmpty()) { + return -1; + } + if (o.group.keys.isEmpty()) { + return 1; + } + return 0; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WindowGroupProcessorWrapper wrapper = (WindowGroupProcessorWrapper) o; + return Objects.equals(group, wrapper.group) && Objects.equals( + processorOperatorFactory, + wrapper.processorOperatorFactory + ); + } + + @Override + public int hashCode() + { + return Objects.hash(group, processorOperatorFactory); + } + } + private final RowSignature signature; public Windowing( diff --git a/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest b/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest new file mode 100644 index 000000000000..0c9d88b5041f --- /dev/null +++ b/sql/src/test/resources/calcite/tests/window/WindowOpReorder.sqlTest @@ -0,0 +1,51 @@ +type: "operatorValidation" + +sql: | + SELECT + m1, + m2, + SUM(m1) OVER(PARTITION BY m2) as sum1, + SUM(m2) OVER() as sum2 + from druid.numfoo + GROUP BY m1,m2 + +expectedOperators: + - type: "naivePartition" + partitionColumns: [ ] + - type: "window" + processor: + type: "framedAgg" + frame: + peerType: "ROWS" + lowUnbounded: true + lowOffset: 0 + uppUnbounded: true + uppOffset: 0 + orderBy: null + aggregations: + - { "type": "doubleSum", "name": "w1", "fieldName": "_d1" } + - type: "naiveSort" + columns: + - column: "_d1" + direction: "ASC" + - type: "naivePartition" + partitionColumns: [ "_d1" ] + - type: "window" + processor: + type: "framedAgg" + frame: + peerType: "ROWS" + lowUnbounded: true + lowOffset: 0 + uppUnbounded: true + uppOffset: 0 + orderBy: null + aggregations: + - { "type": "doubleSum", "name": "w0", "fieldName": "_d0" } +expectedResults: + - [1.0, 1.0, 1.0, 21.0] + - [2.0, 2.0, 2.0, 21.0] + - [3.0, 3.0, 3.0, 21.0] + - [4.0, 4.0, 4.0, 21.0] + - [5.0, 5.0, 5.0, 21.0] + - [6.0, 6.0, 6.0, 21.0] \ No newline at end of file