Skip to content

Commit

Permalink
Enable reordering of window operators (#16482)
Browse files Browse the repository at this point in the history
This commit aims to enable the re-ordering of window operators in order to optimise
the sort and partition operators.
Example : 
```
SELECT m1, m2,
SUM(m1) OVER(PARTITION BY m2) as sum1,
SUM(m2) OVER() as sum2
from numFoo
GROUP BY m1,m2
```

In order to compute this query, we can order the operators as to first compute the operators
corresponding to sum2 and then place the operators corresponding to sum1 which would
help us in reducing one sort operator if we order our operators by sum1 and then sum2.
  • Loading branch information
sreemanamala committed May 29, 2024
1 parent f7013e0 commit 27cfe12
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,11 @@ public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty(String contex
.add("w1", ColumnType.DOUBLE)
.build(),
ImmutableList.of(
new NaivePartitioningOperatorFactory(ImmutableList.of()),
new WindowOperatorFactory(proc1),
new NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("d0"))),
new NaivePartitioningOperatorFactory(ImmutableList.of("d0")),
new WindowOperatorFactory(proc),
new NaivePartitioningOperatorFactory(ImmutableList.of()),
new WindowOperatorFactory(proc1)
new WindowOperatorFactory(proc)
),
null
);
Expand All @@ -245,12 +245,12 @@ public void testWindowOnFooWithFirstWindowPartitionNextWindowEmpty(String contex
.build())
.setExpectedRowSignature(rowSignature)
.setExpectedResultRows(ImmutableList.of(
new Object[]{1.0f, 1.0, 1.0, 1.0},
new Object[]{2.0f, 2.0, 2.0, 2.0},
new Object[]{3.0f, 3.0, 3.0, 3.0},
new Object[]{4.0f, 4.0, 4.0, 4.0},
new Object[]{5.0f, 5.0, 5.0, 5.0},
new Object[]{6.0f, 6.0, 6.0, 6.0}
new Object[]{1.0f, 1.0, 1.0, 21.0},
new Object[]{2.0f, 2.0, 2.0, 21.0},
new Object[]{3.0f, 3.0, 3.0, 21.0},
new Object[]{4.0f, 4.0, 4.0, 21.0},
new Object[]{5.0f, 5.0, 5.0, 21.0},
new Object[]{6.0f, 6.0, 6.0, 21.0}
))
.setQueryContext(context)
.setExpectedCountersForStageWorkerChannel(
Expand Down
163 changes: 125 additions & 38 deletions sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;

/**
* Maps onto a {@link org.apache.druid.query.operator.WindowOperatorQuery}.
Expand Down Expand Up @@ -123,45 +125,14 @@ public static Windowing fromCalciteStuff(
{
final Window window = Preconditions.checkNotNull(partialQuery.getWindow(), "window");

ArrayList<OperatorFactory> ops = new ArrayList<>();

final List<WindowComputationProcessor> windowGroupProcessors = new ArrayList<>();
final List<String> windowOutputColumns = new ArrayList<>(sourceRowSignature.getColumnNames());

final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("w", sourceRowSignature.getColumnNames());
int outputNameCounter = 0;

// Track prior partition columns and sort columns group-to-group, so we only insert sorts and repartitions if
// we really need to.
List<String> priorPartitionColumns = null;
LinkedHashSet<ColumnWithDirection> priorSortColumns = new LinkedHashSet<>();

final RelCollation priorCollation = partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
if (priorCollation != null) {
// Populate initial priorSortColumns using collation of the input to the window operation. Allows us to skip
// the initial sort operator if the rows were already in the desired order.
priorSortColumns = computeSortColumnsFromRelCollation(priorCollation, sourceRowSignature);
}

for (int i = 0; i < window.groups.size(); ++i) {
final WindowGroup group = new WindowGroup(window, window.groups.get(i), sourceRowSignature);

final LinkedHashSet<ColumnWithDirection> sortColumns = new LinkedHashSet<>();
for (String partitionColumn : group.getPartitionColumns()) {
sortColumns.add(ColumnWithDirection.ascending(partitionColumn));
}
sortColumns.addAll(group.getOrdering());

// Add sorting and partitioning if needed.
if (!sortMatches(priorSortColumns, sortColumns)) {
// Sort order needs to change. Resort and repartition.
ops.add(new NaiveSortOperatorFactory(new ArrayList<>(sortColumns)));
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorSortColumns = sortColumns;
priorPartitionColumns = group.getPartitionColumns();
} else if (!group.getPartitionColumns().equals(priorPartitionColumns)) {
// Sort order doesn't need to change, but partitioning does. Only repartition.
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorPartitionColumns = group.getPartitionColumns();
}
for (Window.Group windowGroup : window.groups) {
final WindowGroup group = new WindowGroup(window, windowGroup, sourceRowSignature);

// Add aggregations.
final List<AggregateCall> aggregateCalls = group.getAggregateCalls();
Expand All @@ -184,7 +155,8 @@ public static Windowing fromCalciteStuff(
InputAccessor.buildFor(
window,
partialQuery.getSelectProject(),
sourceRowSignature),
sourceRowSignature
),
Collections.emptyList(),
aggName,
aggregateCall,
Expand Down Expand Up @@ -232,12 +204,14 @@ public static Windowing fromCalciteStuff(
throw new ISE("No processors from Window[%s], why was this code called?", window);
}

ops.add(new WindowOperatorFactory(
windowGroupProcessors.add(new WindowComputationProcessor(group, new WindowOperatorFactory(
processors.size() == 1 ?
processors.get(0) : new ComposingProcessor(processors.toArray(new Processor[0]))
));
)));
}

List<OperatorFactory> ops = computeWindowOperations(partialQuery, sourceRowSignature, windowGroupProcessors);

// Apply windowProject, if present.
if (partialQuery.getWindowProject() != null) {
// We know windowProject is a mapping due to the isMapping() check in DruidRules.
Expand Down Expand Up @@ -270,6 +244,119 @@ public static Windowing fromCalciteStuff(
}
}

/**
* Computes the list of operators that are to be applied in an optimised order
*/
private static List<OperatorFactory> computeWindowOperations(
final PartialDruidQuery partialQuery,
final RowSignature sourceRowSignature,
List<WindowComputationProcessor> windowGroupProcessors
)
{
final List<OperatorFactory> ops = new ArrayList<>();
// Track prior partition columns and sort columns group-to-group, so we only insert sorts and repartitions if
// we really need to.
List<String> priorPartitionColumns = null;
LinkedHashSet<ColumnWithDirection> priorSortColumns = new LinkedHashSet<>();

final RelCollation priorCollation = partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
if (priorCollation != null) {
// Populate initial priorSortColumns using collation of the input to the window operation. Allows us to skip
// the initial sort operator if the rows were already in the desired order.
priorSortColumns = computeSortColumnsFromRelCollation(priorCollation, sourceRowSignature);
}

// sort the processors to optimise the order of window operators
// currently we are moving the empty groups to the front
windowGroupProcessors.sort(WindowComputationProcessor.MOVE_EMPTY_GROUPS_FIRST);

for (WindowComputationProcessor windowComputationProcessor : windowGroupProcessors) {
final WindowGroup group = windowComputationProcessor.getGroup();
final LinkedHashSet<ColumnWithDirection> sortColumns = new LinkedHashSet<>();
for (String partitionColumn : group.getPartitionColumns()) {
sortColumns.add(ColumnWithDirection.ascending(partitionColumn));
}
sortColumns.addAll(group.getOrdering());

// Add sorting and partitioning if needed.
if (!sortMatches(priorSortColumns, sortColumns)) {
// Sort order needs to change. Resort and repartition.
ops.add(new NaiveSortOperatorFactory(new ArrayList<>(sortColumns)));
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorSortColumns = sortColumns;
priorPartitionColumns = group.getPartitionColumns();
} else if (!group.getPartitionColumns().equals(priorPartitionColumns)) {
// Sort order doesn't need to change, but partitioning does. Only repartition.
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorPartitionColumns = group.getPartitionColumns();
}

ops.add(windowComputationProcessor.getProcessorOperatorFactory());
}

return ops;
}

private static class WindowComputationProcessor
{
private final WindowGroup group;
private final OperatorFactory processorOperatorFactory;

public WindowComputationProcessor(WindowGroup group, OperatorFactory processorOperatorFactory)
{
this.group = group;
this.processorOperatorFactory = processorOperatorFactory;
}

public WindowGroup getGroup()
{
return group;
}

public OperatorFactory getProcessorOperatorFactory()
{
return processorOperatorFactory;
}

/**
* Comparator to move the empty windows to the front
*/
public static final Comparator<WindowComputationProcessor> MOVE_EMPTY_GROUPS_FIRST = (o1, o2) -> {
if (o1.getGroup().getPartitionColumns().isEmpty() && o2.getGroup().getPartitionColumns().isEmpty()) {
return 0;
}
if (o1.getGroup().getPartitionColumns().isEmpty()) {
return -1;
}
if (o2.getGroup().getPartitionColumns().isEmpty()) {
return 1;
}
return 0;
};

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WindowComputationProcessor obj = (WindowComputationProcessor) o;
return Objects.equals(group, obj.group) && Objects.equals(
processorOperatorFactory,
obj.processorOperatorFactory
);
}

@Override
public int hashCode()
{
return Objects.hash(group, processorOperatorFactory);
}
}

private final RowSignature signature;

public Windowing(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
type: "operatorValidation"

sql: |
SELECT
m1,
m2,
SUM(m1) OVER(PARTITION BY m2) as sum1,
SUM(m2) OVER() as sum2
from druid.numfoo
GROUP BY m1,m2

expectedOperators:
- type: "naivePartition"
partitionColumns: [ ]
- type: "window"
processor:
type: "framedAgg"
frame:
peerType: "ROWS"
lowUnbounded: true
lowOffset: 0
uppUnbounded: true
uppOffset: 0
orderBy: null
aggregations:
- { "type": "doubleSum", "name": "w1", "fieldName": "_d1" }
- type: "naiveSort"
columns:
- column: "_d1"
direction: "ASC"
- type: "naivePartition"
partitionColumns: [ "_d1" ]
- type: "window"
processor:
type: "framedAgg"
frame:
peerType: "ROWS"
lowUnbounded: true
lowOffset: 0
uppUnbounded: true
uppOffset: 0
orderBy: null
aggregations:
- { "type": "doubleSum", "name": "w0", "fieldName": "_d0" }
expectedResults:
- [1.0, 1.0, 1.0, 21.0]
- [2.0, 2.0, 2.0, 21.0]
- [3.0, 3.0, 3.0, 21.0]
- [4.0, 4.0, 4.0, 21.0]
- [5.0, 5.0, 5.0, 21.0]
- [6.0, 6.0, 6.0, 21.0]

0 comments on commit 27cfe12

Please sign in to comment.