Skip to content

Commit

Permalink
window operator reordering
Browse files Browse the repository at this point in the history
  • Loading branch information
sreemanamala committed May 21, 2024
1 parent 599586b commit a972698
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 36 deletions.
148 changes: 112 additions & 36 deletions sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;

/**
* Maps onto a {@link org.apache.druid.query.operator.WindowOperatorQuery}.
Expand Down Expand Up @@ -123,45 +124,14 @@ public static Windowing fromCalciteStuff(
{
final Window window = Preconditions.checkNotNull(partialQuery.getWindow(), "window");

ArrayList<OperatorFactory> ops = new ArrayList<>();
List<WindowGroupProcessorWrapper> wrapperObjs = new ArrayList<>();

final List<String> windowOutputColumns = new ArrayList<>(sourceRowSignature.getColumnNames());
final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("w", sourceRowSignature.getColumnNames());
int outputNameCounter = 0;

// Track prior partition columns and sort columns group-to-group, so we only insert sorts and repartitions if
// we really need to.
List<String> priorPartitionColumns = null;
LinkedHashSet<ColumnWithDirection> priorSortColumns = new LinkedHashSet<>();

final RelCollation priorCollation = partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
if (priorCollation != null) {
// Populate initial priorSortColumns using collation of the input to the window operation. Allows us to skip
// the initial sort operator if the rows were already in the desired order.
priorSortColumns = computeSortColumnsFromRelCollation(priorCollation, sourceRowSignature);
}

for (int i = 0; i < window.groups.size(); ++i) {
final WindowGroup group = new WindowGroup(window, window.groups.get(i), sourceRowSignature);

final LinkedHashSet<ColumnWithDirection> sortColumns = new LinkedHashSet<>();
for (String partitionColumn : group.getPartitionColumns()) {
sortColumns.add(ColumnWithDirection.ascending(partitionColumn));
}
sortColumns.addAll(group.getOrdering());

// Add sorting and partitioning if needed.
if (!sortMatches(priorSortColumns, sortColumns)) {
// Sort order needs to change. Resort and repartition.
ops.add(new NaiveSortOperatorFactory(new ArrayList<>(sortColumns)));
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorSortColumns = sortColumns;
priorPartitionColumns = group.getPartitionColumns();
} else if (!group.getPartitionColumns().equals(priorPartitionColumns)) {
// Sort order doesn't need to change, but partitioning does. Only repartition.
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorPartitionColumns = group.getPartitionColumns();
}
for (Window.Group windowGroup : window.groups) {
final WindowGroup group = new WindowGroup(window, windowGroup, sourceRowSignature);

// Add aggregations.
final List<AggregateCall> aggregateCalls = group.getAggregateCalls();
Expand Down Expand Up @@ -232,10 +202,48 @@ public static Windowing fromCalciteStuff(
throw new ISE("No processors from Window[%s], why was this code called?", window);
}

ops.add(new WindowOperatorFactory(
wrapperObjs.add(new WindowGroupProcessorWrapper(windowGroup, new WindowOperatorFactory(
processors.size() == 1 ?
processors.get(0) : new ComposingProcessor(processors.toArray(new Processor[0]))
));
)));
}

// Track prior partition columns and sort columns group-to-group, so we only insert sorts and repartitions if
// we really need to.
List<String> priorPartitionColumns = null;
LinkedHashSet<ColumnWithDirection> priorSortColumns = new LinkedHashSet<>();

final RelCollation priorCollation = partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
if (priorCollation != null) {
// Populate initial priorSortColumns using collation of the input to the window operation. Allows us to skip
// the initial sort operator if the rows were already in the desired order.
priorSortColumns = computeSortColumnsFromRelCollation(priorCollation, sourceRowSignature);
}

Collections.sort(wrapperObjs);
ArrayList<OperatorFactory> ops = new ArrayList<>();
for (WindowGroupProcessorWrapper wrapperObj : wrapperObjs) {
final WindowGroup group = new WindowGroup(window, wrapperObj.getGroup(), sourceRowSignature);
final LinkedHashSet<ColumnWithDirection> sortColumns = new LinkedHashSet<>();
for (String partitionColumn : group.getPartitionColumns()) {
sortColumns.add(ColumnWithDirection.ascending(partitionColumn));
}
sortColumns.addAll(group.getOrdering());

// Add sorting and partitioning if needed.
if (!sortMatches(priorSortColumns, sortColumns)) {
// Sort order needs to change. Resort and repartition.
ops.add(new NaiveSortOperatorFactory(new ArrayList<>(sortColumns)));
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorSortColumns = sortColumns;
priorPartitionColumns = group.getPartitionColumns();
} else if (!group.getPartitionColumns().equals(priorPartitionColumns)) {
// Sort order doesn't need to change, but partitioning does. Only repartition.
ops.add(new NaivePartitioningOperatorFactory(group.getPartitionColumns()));
priorPartitionColumns = group.getPartitionColumns();
}

ops.add(wrapperObj.getProcessorOperatorFactory());
}

// Apply windowProject, if present.
Expand Down Expand Up @@ -270,6 +278,74 @@ public static Windowing fromCalciteStuff(
}
}

/**
* A wrapper object which stores {@link org.apache.calcite.rel.core.Window.Group}
* along with its computed {@link WindowOperatorFactory}
*
* this allows us to sort the window groups in order to optimise the order of operators we would need to compute
* without losing the aggregate column name information (which is part of the computed WindowOperatorFactory)
*/
private static class WindowGroupProcessorWrapper implements Comparable<WindowGroupProcessorWrapper>
{
private final Window.Group group;
private final OperatorFactory processorOperatorFactory;

public WindowGroupProcessorWrapper(Window.Group group, OperatorFactory processorOperatorFactory)
{
this.group = group;
this.processorOperatorFactory = processorOperatorFactory;
}

public Window.Group getGroup()
{
return group;
}

public OperatorFactory getProcessorOperatorFactory()
{
return processorOperatorFactory;
}

@Override
public int compareTo(WindowGroupProcessorWrapper o)
{
// Need to work on this method to optimise the order in which we need to process based on the partitions
// currently just moves the empty windows to the front
if (this.group.keys.isEmpty() && o.group.keys.isEmpty()) {
return 0;
}
if (this.group.keys.isEmpty()) {
return -1;
}
if (o.group.keys.isEmpty()) {
return 1;
}
return 0;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WindowGroupProcessorWrapper wrapper = (WindowGroupProcessorWrapper) o;
return Objects.equals(group, wrapper.group) && Objects.equals(
processorOperatorFactory,
wrapper.processorOperatorFactory
);
}

@Override
public int hashCode()
{
return Objects.hash(group, processorOperatorFactory);
}
}

private final RowSignature signature;

public Windowing(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
type: "operatorValidation"

sql: |
SELECT
m1,
m2,
SUM(m1) OVER(PARTITION BY m2) as sum1,
SUM(m2) OVER() as sum2
from druid.numfoo
GROUP BY m1,m2

expectedOperators:
- type: "naivePartition"
partitionColumns: [ ]
- type: "window"
processor:
type: "framedAgg"
frame:
peerType: "ROWS"
lowUnbounded: true
lowOffset: 0
uppUnbounded: true
uppOffset: 0
orderBy: null
aggregations:
- { "type": "doubleSum", "name": "w1", "fieldName": "_d1" }
- type: "naiveSort"
columns:
- column: "_d1"
direction: "ASC"
- type: "naivePartition"
partitionColumns: [ "_d1" ]
- type: "window"
processor:
type: "framedAgg"
frame:
peerType: "ROWS"
lowUnbounded: true
lowOffset: 0
uppUnbounded: true
uppOffset: 0
orderBy: null
aggregations:
- { "type": "doubleSum", "name": "w0", "fieldName": "_d0" }
expectedResults:
- [1.0, 1.0, 1.0, 21.0]
- [2.0, 2.0, 2.0, 21.0]
- [3.0, 3.0, 3.0, 21.0]
- [4.0, 4.0, 4.0, 21.0]
- [5.0, 5.0, 5.0, 21.0]
- [6.0, 6.0, 6.0, 21.0]

0 comments on commit a972698

Please sign in to comment.