diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java index cdcf9e3daf40..994705f55e30 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java @@ -560,15 +560,20 @@ private boolean canDoLimitPushDown( return false; } - /** - * When limit push down is applied, the partial results would be sorted by the ordering specified by the - * limit/order spec (unlike non-push down case where the results always use the default natural ascending order), - * so when merging these partial result streams, the merge needs to use the same ordering to get correct results. - */ - private Ordering getRowOrderingForPushDown( - final boolean granular, - final DefaultLimitSpec limitSpec - ) + public Ordering getRowOrdering(final boolean granular) + { + return getOrderingAndDimensions(granular).getRowOrdering(); + } + + public List getDimensionNamesInOrder() + { + return getOrderingAndDimensions(false).getDimensions() + .stream() + .map(DimensionSpec::getOutputName) + .collect(Collectors.toList()); + } + + public OrderingAndDimensions getOrderingAndDimensions(final boolean granular) { final boolean sortByDimsFirst = getContextSortByDimsFirst(); @@ -577,18 +582,30 @@ private Ordering getRowOrderingForPushDown( final List needsReverseList = new ArrayList<>(); final List dimensionTypes = new ArrayList<>(); final List comparators = new ArrayList<>(); + final List dimensionsInOrder = new ArrayList<>(); - for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) { - boolean needsReverse = orderSpec.getDirection() != OrderByColumnSpec.Direction.ASCENDING; - int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, dimensions); - if (dimIndex >= 0) { - DimensionSpec dim = dimensions.get(dimIndex); - orderedFieldNumbers.add(resultRowSignature.indexOf(dim.getOutputName())); - dimsInOrderBy.add(dimIndex); - needsReverseList.add(needsReverse); - final ColumnType type = dimensions.get(dimIndex).getOutputType(); - dimensionTypes.add(type); - comparators.add(orderSpec.getDimensionComparator()); + /* + * When limit push down is applied, the partial results would be sorted by the ordering specified by the + * limit/order spec (unlike non-push down case where the results always use the default natural ascending order), + * so when merging these partial result streams, the merge needs to use the same ordering to get correct results. + */ + if (isApplyLimitPushDown()) { + DefaultLimitSpec limitSpec1 = (DefaultLimitSpec) limitSpec; + if (!DefaultLimitSpec.sortingOrderHasNonGroupingFields(limitSpec1, dimensions)) { + for (OrderByColumnSpec orderSpec : ((DefaultLimitSpec) limitSpec).getColumns()) { + boolean needsReverse = orderSpec.getDirection() != OrderByColumnSpec.Direction.ASCENDING; + int dimIndex = OrderByColumnSpec.getDimIndexForOrderBy(orderSpec, dimensions); + if (dimIndex >= 0) { + DimensionSpec dim = dimensions.get(dimIndex); + orderedFieldNumbers.add(resultRowSignature.indexOf(dim.getOutputName())); + dimsInOrderBy.add(dimIndex); + needsReverseList.add(needsReverse); + final ColumnType type = dimensions.get(dimIndex).getOutputType(); + dimensionTypes.add(type); + comparators.add(orderSpec.getDimensionComparator()); + dimensionsInOrder.add(dim); + } + } } } @@ -599,14 +616,16 @@ private Ordering getRowOrderingForPushDown( final ColumnType type = dimensions.get(i).getOutputType(); dimensionTypes.add(type); comparators.add(StringComparators.NATURAL); + dimensionsInOrder.add(dimensions.get(i)); } } final Comparator timeComparator = getTimeComparator(granular); + Ordering ordering; if (timeComparator == null) { - return Ordering.from( - (lhs, rhs) -> compareDimsForLimitPushDown( + ordering = Ordering.from( + (lhs, rhs) -> compareDims( orderedFieldNumbers, needsReverseList, dimensionTypes, @@ -616,9 +635,9 @@ private Ordering getRowOrderingForPushDown( ) ); } else if (sortByDimsFirst) { - return Ordering.from( + ordering = Ordering.from( (lhs, rhs) -> { - final int cmp = compareDimsForLimitPushDown( + final int cmp = compareDims( orderedFieldNumbers, needsReverseList, dimensionTypes, @@ -634,7 +653,7 @@ private Ordering getRowOrderingForPushDown( } ); } else { - return Ordering.from( + ordering = Ordering.from( (lhs, rhs) -> { final int timeCompare = timeComparator.compare(lhs, rhs); @@ -642,7 +661,7 @@ private Ordering getRowOrderingForPushDown( return timeCompare; } - return compareDimsForLimitPushDown( + return compareDims( orderedFieldNumbers, needsReverseList, dimensionTypes, @@ -653,45 +672,8 @@ private Ordering getRowOrderingForPushDown( } ); } - } - - public Ordering getRowOrdering(final boolean granular) - { - if (isApplyLimitPushDown()) { - if (!DefaultLimitSpec.sortingOrderHasNonGroupingFields((DefaultLimitSpec) limitSpec, dimensions)) { - return getRowOrderingForPushDown(granular, (DefaultLimitSpec) limitSpec); - } - } - - final boolean sortByDimsFirst = getContextSortByDimsFirst(); - final Comparator timeComparator = getTimeComparator(granular); - - if (timeComparator == null) { - return Ordering.from((lhs, rhs) -> compareDims(dimensions, lhs, rhs)); - } else if (sortByDimsFirst) { - return Ordering.from( - (lhs, rhs) -> { - final int cmp = compareDims(dimensions, lhs, rhs); - if (cmp != 0) { - return cmp; - } - - return timeComparator.compare(lhs, rhs); - } - ); - } else { - return Ordering.from( - (lhs, rhs) -> { - final int timeCompare = timeComparator.compare(lhs, rhs); - if (timeCompare != 0) { - return timeCompare; - } - - return compareDims(dimensions, lhs, rhs); - } - ); - } + return new OrderingAndDimensions(ordering, dimensionsInOrder); } @Nullable @@ -716,25 +698,6 @@ private Comparator getTimeComparator(boolean granular) } } - private int compareDims(List dimensions, ResultRow lhs, ResultRow rhs) - { - final int dimensionStart = getResultRowDimensionStart(); - - for (int i = 0; i < dimensions.size(); i++) { - DimensionSpec dimension = dimensions.get(i); - final int dimCompare = DimensionHandlerUtils.compareObjectsAsType( - lhs.get(dimensionStart + i), - rhs.get(dimensionStart + i), - dimension.getOutputType() - ); - if (dimCompare != 0) { - return dimCompare; - } - } - - return 0; - } - /** * Computes the timestamp that will be returned by {@link #getUniversalTimestamp()}. */ @@ -760,12 +723,12 @@ private DateTime computeUniversalTimestamp() } /** - * Compares the dimensions for limit pushdown. + * Compares the dimensions. * * Due to legacy reason, the provided StringComparator for the arrays isn't applied and must be changed once we * get rid of the StringComparators for array types */ - private static int compareDimsForLimitPushDown( + private static int compareDims( final IntList fields, final List needsReverseList, final List dimensionTypes, @@ -924,6 +887,28 @@ private static void verifyOutputNames( } } + public static class OrderingAndDimensions + { + Ordering rowOrdering; + List dimensions; + + public OrderingAndDimensions(Ordering rowOrdering, List dimensions) + { + this.rowOrdering = rowOrdering; + this.dimensions = dimensions; + } + + public Ordering getRowOrdering() + { + return rowOrdering; + } + + public List getDimensions() + { + return dimensions; + } + } + public static class Builder { @Nullable diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java index 6451fb9b943d..ab1ee1052b4b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java @@ -686,8 +686,7 @@ public Sequence processSubtotalsSpec( processingConfig.intermediateComputeSizeBytes() ); - List queryDimNames = baseSubtotalQuery.getDimensions().stream().map(DimensionSpec::getOutputName) - .collect(Collectors.toList()); + List queryDimNamesInOrder = baseSubtotalQuery.getDimensionNamesInOrder(); // Only needed to make LimitSpec.filterColumns(..) call later in case base query has a non default LimitSpec. Set aggsAndPostAggs = null; @@ -724,7 +723,7 @@ public Sequence processSubtotalsSpec( .withLimitSpec(subtotalQueryLimitSpec); final GroupByRowProcessor.ResultSupplier resultSupplierOneFinal = resultSupplierOne; - if (Utils.isPrefix(subtotalSpec, queryDimNames)) { + if (Utils.isPrefix(subtotalSpec, queryDimNamesInOrder)) { // Since subtotalSpec is a prefix of base query dimensions, so results from base query are also sorted // by subtotalSpec as needed by stream merging. subtotalsResults.add( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 0b6dd10e3c9e..5447a090db38 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -13674,10 +13674,8 @@ public void testGroupingSetsWithLimitOrderByGran() .build() ), ImmutableList.builder().add( - new Object[]{"", null, 2L}, - new Object[]{"a", null, 1L}, - new Object[]{"", null, 1L}, - new Object[]{"a", null, 1L}, + new Object[]{"", null, 3L}, + new Object[]{"a", null, 2L}, new Object[]{"abc", null, 1L}, new Object[]{NULL_STRING, null, 6L}, new Object[]{"", timestamp("2000-01-01"), 2L}, @@ -16142,4 +16140,29 @@ public void testGroupingSetsWithAggregateCase() ) ).run(); } + + @SqlTestFrameworkConfig.NumMergeBuffers(3) + @Test + public void testGroupingSetsWithDifferentOrderLimitSpec() + { + msqIncompatible(); + testBuilder() + .sql( + "SELECT\n" + + " isNew, isRobot, COUNT(*) AS \"Cnt\"\n" + + "FROM \"wikipedia\"\n" + + "GROUP BY GROUPING SETS ((isRobot), (isNew))\n" + + "ORDER BY 2, 1\n" + + "limit 100" + ) + .expectedResults( + ResultMatchMode.RELAX_NULLS, + ImmutableList.of( + new Object[]{"false", null, 36966L}, + new Object[]{"true", null, 2278L}, + new Object[]{null, "false", 23824L}, + new Object[]{null, "true", 15420L} + ) + ).run(); + } }