Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grouping Engine fix when a limit spec with different order by columns is applied #16534

Merged
merged 9 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -560,15 +560,20 @@ private boolean canDoLimitPushDown(
return false;
}

/**
* When limit push down is applied, the partial results would be sorted by the ordering specified by the
* limit/order spec (unlike non-push down case where the results always use the default natural ascending order),
* so when merging these partial result streams, the merge needs to use the same ordering to get correct results.
*/
private Ordering<ResultRow> getRowOrderingForPushDown(
final boolean granular,
final DefaultLimitSpec limitSpec
)
public Ordering<ResultRow> getRowOrdering(final boolean granular)
{
return getOrderingAndDimensions(granular).getRowOrdering();
}

public List<String> getDimensionNamesInOrder()
{
return getOrderingAndDimensions(false).getDimensions()
.stream()
.map(DimensionSpec::getOutputName)
.collect(Collectors.toList());
}

public OrderingAndDimensions getOrderingAndDimensions(final boolean granular)
{
final boolean sortByDimsFirst = getContextSortByDimsFirst();

Expand All @@ -577,18 +582,30 @@ private Ordering<ResultRow> getRowOrderingForPushDown(
final List<Boolean> needsReverseList = new ArrayList<>();
final List<ColumnType> dimensionTypes = new ArrayList<>();
final List<StringComparator> comparators = new ArrayList<>();
final List<DimensionSpec> dimensionsInOrder = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this double-book-keeping is a pretty wierd way to supply this detail - but it works...

I think it would be more beneficial

  • identify the differences between getRowOrderingForPushDown and getRowOrdering
  • merge the differing behaviour into the getRowOrderingForPushDown method
    • isn't the main difference is that getRowOrdering ignores the limitSpec regardless its set?
  • possibly also eat-up the RowBasedGrouperHelper which is another copy of the process of creating the comparators - I think correctness of the execution depends on knowing on which columns we ordered and in what order....having just one source of truth could reduce the amount of issues we face

all of the above are refactors - which might delay the fix of this bug....


for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) {
boolean needsReverse = orderSpec.getDirection() != OrderByColumnSpec.Direction.ASCENDING;
int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, dimensions);
if (dimIndex >= 0) {
DimensionSpec dim = dimensions.get(dimIndex);
orderedFieldNumbers.add(resultRowSignature.indexOf(dim.getOutputName()));
dimsInOrderBy.add(dimIndex);
needsReverseList.add(needsReverse);
final ColumnType type = dimensions.get(dimIndex).getOutputType();
dimensionTypes.add(type);
comparators.add(orderSpec.getDimensionComparator());
/*
* When limit push down is applied, the partial results would be sorted by the ordering specified by the
* limit/order spec (unlike non-push down case where the results always use the default natural ascending order),
* so when merging these partial result streams, the merge needs to use the same ordering to get correct results.
*/
if (isApplyLimitPushDown()) {
DefaultLimitSpec limitSpec1 = (DefaultLimitSpec) limitSpec;
if (!DefaultLimitSpec.sortingOrderHasNonGroupingFields(limitSpec1, dimensions)) {
for (OrderByColumnSpec orderSpec : ((DefaultLimitSpec) limitSpec).getColumns()) {
boolean needsReverse = orderSpec.getDirection() != OrderByColumnSpec.Direction.ASCENDING;
int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, dimensions);
if (dimIndex >= 0) {
DimensionSpec dim = dimensions.get(dimIndex);
orderedFieldNumbers.add(resultRowSignature.indexOf(dim.getOutputName()));
dimsInOrderBy.add(dimIndex);
needsReverseList.add(needsReverse);
final ColumnType type = dimensions.get(dimIndex).getOutputType();
dimensionTypes.add(type);
comparators.add(orderSpec.getDimensionComparator());
dimensionsInOrder.add(dim);
}
}
}
}

Expand All @@ -599,14 +616,16 @@ private Ordering<ResultRow> getRowOrderingForPushDown(
final ColumnType type = dimensions.get(i).getOutputType();
dimensionTypes.add(type);
comparators.add(StringComparators.NATURAL);
dimensionsInOrder.add(dimensions.get(i));
}
}

final Comparator<ResultRow> timeComparator = getTimeComparator(granular);
Ordering<ResultRow> ordering;

if (timeComparator == null) {
return Ordering.from(
(lhs, rhs) -> compareDimsForLimitPushDown(
ordering = Ordering.from(
(lhs, rhs) -> compareDims(
orderedFieldNumbers,
needsReverseList,
dimensionTypes,
Expand All @@ -616,9 +635,9 @@ private Ordering<ResultRow> getRowOrderingForPushDown(
)
);
} else if (sortByDimsFirst) {
return Ordering.from(
ordering = Ordering.from(
(lhs, rhs) -> {
final int cmp = compareDimsForLimitPushDown(
final int cmp = compareDims(
orderedFieldNumbers,
needsReverseList,
dimensionTypes,
Expand All @@ -634,15 +653,15 @@ private Ordering<ResultRow> getRowOrderingForPushDown(
}
);
} else {
return Ordering.from(
ordering = Ordering.from(
(lhs, rhs) -> {
final int timeCompare = timeComparator.compare(lhs, rhs);

if (timeCompare != 0) {
return timeCompare;
}

return compareDimsForLimitPushDown(
return compareDims(
orderedFieldNumbers,
needsReverseList,
dimensionTypes,
Expand All @@ -653,45 +672,8 @@ private Ordering<ResultRow> getRowOrderingForPushDown(
}
);
}
}

public Ordering<ResultRow> getRowOrdering(final boolean granular)
{
if (isApplyLimitPushDown()) {
if (!DefaultLimitSpec.sortingOrderHasNonGroupingFields((DefaultLimitSpec) limitSpec, dimensions)) {
return getRowOrderingForPushDown(granular, (DefaultLimitSpec) limitSpec);
}
}

final boolean sortByDimsFirst = getContextSortByDimsFirst();
final Comparator<ResultRow> timeComparator = getTimeComparator(granular);

if (timeComparator == null) {
return Ordering.from((lhs, rhs) -> compareDims(dimensions, lhs, rhs));
} else if (sortByDimsFirst) {
return Ordering.from(
(lhs, rhs) -> {
final int cmp = compareDims(dimensions, lhs, rhs);
if (cmp != 0) {
return cmp;
}

return timeComparator.compare(lhs, rhs);
}
);
} else {
return Ordering.from(
(lhs, rhs) -> {
final int timeCompare = timeComparator.compare(lhs, rhs);

if (timeCompare != 0) {
return timeCompare;
}

return compareDims(dimensions, lhs, rhs);
}
);
}
return new OrderingAndDimensions(ordering, dimensionsInOrder);
}

@Nullable
Expand All @@ -716,25 +698,6 @@ private Comparator<ResultRow> getTimeComparator(boolean granular)
}
}

private int compareDims(List<DimensionSpec> dimensions, ResultRow lhs, ResultRow rhs)
{
final int dimensionStart = getResultRowDimensionStart();

for (int i = 0; i < dimensions.size(); i++) {
DimensionSpec dimension = dimensions.get(i);
final int dimCompare = DimensionHandlerUtils.compareObjectsAsType(
lhs.get(dimensionStart + i),
rhs.get(dimensionStart + i),
dimension.getOutputType()
);
if (dimCompare != 0) {
return dimCompare;
}
}

return 0;
}

/**
* Computes the timestamp that will be returned by {@link #getUniversalTimestamp()}.
*/
Expand All @@ -760,12 +723,12 @@ private DateTime computeUniversalTimestamp()
}

/**
* Compares the dimensions for limit pushdown.
* Compares the dimensions.
*
* Due to legacy reason, the provided StringComparator for the arrays isn't applied and must be changed once we
* get rid of the StringComparators for array types
*/
private static int compareDimsForLimitPushDown(
private static int compareDims(
final IntList fields,
final List<Boolean> needsReverseList,
final List<ColumnType> dimensionTypes,
Expand Down Expand Up @@ -924,6 +887,28 @@ private static void verifyOutputNames(
}
}

public static class OrderingAndDimensions
{
Ordering<ResultRow> rowOrdering;
List<DimensionSpec> dimensions;

public OrderingAndDimensions(Ordering<ResultRow> rowOrdering, List<DimensionSpec> dimensions)
{
this.rowOrdering = rowOrdering;
this.dimensions = dimensions;
}

public Ordering<ResultRow> getRowOrdering()
{
return rowOrdering;
}

public List<DimensionSpec> getDimensions()
{
return dimensions;
}
}

public static class Builder
{
@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,8 +686,7 @@ public Sequence<ResultRow> processSubtotalsSpec(
processingConfig.intermediateComputeSizeBytes()
);

List<String> queryDimNames = baseSubtotalQuery.getDimensions().stream().map(DimensionSpec::getOutputName)
.collect(Collectors.toList());
List<String> queryDimNamesInOrder = baseSubtotalQuery.getDimensionNamesInOrder();

// Only needed to make LimitSpec.filterColumns(..) call later in case base query has a non default LimitSpec.
Set<String> aggsAndPostAggs = null;
Expand Down Expand Up @@ -724,7 +723,7 @@ public Sequence<ResultRow> processSubtotalsSpec(
.withLimitSpec(subtotalQueryLimitSpec);

final GroupByRowProcessor.ResultSupplier resultSupplierOneFinal = resultSupplierOne;
if (Utils.isPrefix(subtotalSpec, queryDimNames)) {
if (Utils.isPrefix(subtotalSpec, queryDimNamesInOrder)) {
// Since subtotalSpec is a prefix of base query dimensions, so results from base query are also sorted
// by subtotalSpec as needed by stream merging.
subtotalsResults.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13674,10 +13674,8 @@ public void testGroupingSetsWithLimitOrderByGran()
.build()
),
ImmutableList.<Object[]>builder().add(
new Object[]{"", null, 2L},
new Object[]{"a", null, 1L},
new Object[]{"", null, 1L},
new Object[]{"a", null, 1L},
new Object[]{"", null, 3L},
new Object[]{"a", null, 2L},
new Object[]{"abc", null, 1L},
new Object[]{NULL_STRING, null, 6L},
new Object[]{"", timestamp("2000-01-01"), 2L},
Expand Down Expand Up @@ -16142,4 +16140,29 @@ public void testGroupingSetsWithAggregateCase()
)
).run();
}

@SqlTestFrameworkConfig.NumMergeBuffers(3)
@Test
public void testGroupingSetsWithDifferentOrderLimitSpec()
{
msqIncompatible();
testBuilder()
.sql(
"SELECT\n"
+ " isNew, isRobot, COUNT(*) AS \"Cnt\"\n"
+ "FROM \"wikipedia\"\n"
+ "GROUP BY GROUPING SETS ((isRobot), (isNew))\n"
+ "ORDER BY 2, 1\n"
+ "limit 100"
)
.expectedResults(
ResultMatchMode.RELAX_NULLS,
ImmutableList.of(
new Object[]{"false", null, 36966L},
new Object[]{"true", null, 2278L},
new Object[]{null, "false", 23824L},
new Object[]{null, "true", 15420L}
)
).run();
}
}
Loading