diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java index 6f2e29a22402..993050d7898f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java @@ -23,11 +23,20 @@ import java.util.Comparator; import java.util.List; import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FilterContext; import org.apache.pinot.common.request.context.OrderByExpressionContext; +import org.apache.pinot.common.request.context.predicate.EqPredicate; +import org.apache.pinot.common.request.context.predicate.NotEqPredicate; +import org.apache.pinot.common.request.context.predicate.Predicate; +import org.apache.pinot.common.request.context.predicate.RangePredicate; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextUtils; import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.datasource.DataSourceMetadata; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; @@ -37,8 +46,14 @@ *
  • For selection query with LIMIT 0, keep 1 segment to create the data schema
  • *
  • For selection only query without filter, keep enough documents to fulfill the LIMIT requirement
  • *
  • - * For selection order-by query without filer, if the first order-by expression is an identifier (column), prune - * segments based on the column min/max value and keep enough documents to fulfill the LIMIT and OFFSET requirement. + * For selection order-by query, if the first order-by expression is an identifier (column), prune segments based on + * the column min/max value and keep enough documents to fulfill the LIMIT and OFFSET requirement. This works both + * without a filter and with a filter: with a filter, each segment contributes towards the LIMIT only the number of + * rows that provably match the filter based on min/max metadata (its total docs if it fully matches, 0 + * otherwise). Using this lower bound on matching rows keeps the boundary safe, so segments are never pruned when + * they might still hold a top-n matching row. The optimization is skipped when null handling is active for the + * order-by/predicate columns because nulls are stored as a default value that pollutes the min/max metadata + * (see #18685). *
  • * */ @@ -51,10 +66,24 @@ public void init(PinotConfiguration config) { @Override public boolean isApplicableTo(QueryContext query) { - // Only prune selection queries - // If LIMIT is not 0, only prune segments for selection queries without filter - return QueryContextUtils.isSelectionQuery(query) - && (query.getFilter() == null || query.getLimit() == 0); + if (!QueryContextUtils.isSelectionQuery(query)) { + return false; + } + // Without a filter (or for LIMIT 0, where we just keep one segment for the schema), pruning is always applicable. + if (query.getFilter() == null || query.getLimit() == 0) { + return true; + } + // With a filter, only the order-by-on-identifier path can prune safely (the selection-only path relies on exact + // doc counts, which a filter invalidates). Additionally, null handling must not be active for the order-by column, + // because nulls are stored as a default value that pollutes the column min/max metadata used for sorting and the + // boundary. + List orderByExpressions = query.getOrderByExpressions(); + if (orderByExpressions == null) { + return false; + } + ExpressionContext firstOrderByExpression = orderByExpressions.get(0).getExpression(); + return firstOrderByExpression.getType() == ExpressionContext.Type.IDENTIFIER + && !isNullHandlingActive(query, firstOrderByExpression.getIdentifier()); } @Override @@ -75,7 +104,9 @@ public List prune(List segments, QueryContext query) } if (query.getOrderByExpressions() == null) { - return pruneSelectionOnly(segments, query); + // Count-based selection-only pruning is only safe without a filter (total docs is an exact match count). With a + // filter present this path is not selected (see isApplicableTo); guard defensively in case it is reached. + return query.getFilter() == null ? pruneSelectionOnly(segments, query) : segments; } else { return pruneSelectionOrderBy(segments, query); } @@ -100,7 +131,7 @@ private List pruneSelectionOnly(List segments, Query } /** - * Helper method to prune segments for selection order-by queries without filter. + * Helper method to prune segments for selection order-by queries. *

    When the first order-by expression is an identifier (column), we can prune segments based on the column min/max * value: *

      @@ -108,6 +139,8 @@ private List pruneSelectionOnly(List segments, Query *
    • 2. Pick the top segments until we get enough documents to fulfill the LIMIT and OFFSET requirement
    • *
    • 3. Keep the segments that has value overlap with the selected ones; remove the others
    • *
    + *

    Each segment contributes towards the LIMIT only its {@link #guaranteedMatchingDocs} (a lower bound on its + * matching rows), so the optimization remains correct when a filter is present. */ private List pruneSelectionOrderBy(List segments, QueryContext query) { List orderByExpressions = query.getOrderByExpressions(); @@ -157,7 +190,7 @@ private List pruneSelectionOrderBy(List segments, Qu IndexSegment segment = segments.get(minMaxValue._index); if (remainingDocs > 0) { selectedSegments.add(segment); - remainingDocs -= segment.getSegmentMetadata().getTotalDocs(); + remainingDocs -= guaranteedMatchingDocs(segment, query); maxValue = minMaxValue._maxValue; } else { // After getting enough documents, prune all the segments with min value larger than the current max value, or @@ -184,7 +217,7 @@ private List pruneSelectionOrderBy(List segments, Qu IndexSegment segment = segments.get(minMaxValue._index); if (remainingDocs > 0) { selectedSegments.add(segment); - remainingDocs -= segment.getSegmentMetadata().getTotalDocs(); + remainingDocs -= guaranteedMatchingDocs(segment, query); minValue = minMaxValue._minValue; } else { // After getting enough documents, prune all the segments with max value smaller than the current min value, @@ -201,6 +234,192 @@ private List pruneSelectionOrderBy(List segments, Qu return selectedSegments; } + /** + * Returns a lower bound on the number of rows in the segment that match the query filter, used to decide how many + * leading segments must be kept to guarantee the LIMIT + OFFSET requirement. + *

      + *
    • Without a filter, every row matches, so this is the exact total doc count.
    • + *
    • With a filter, this is the total doc count if the segment provably matches the filter for all of its + * rows (based on min/max metadata), and 0 otherwise. Using 0 for segments that only partially (or not provably + * fully) match is a safe under-count: such segments are still kept (they overlap the boundary), but they never let + * the boundary advance past rows they might contain.
    • + *
    + */ + private long guaranteedMatchingDocs(IndexSegment segment, QueryContext query) { + int totalDocs = segment.getSegmentMetadata().getTotalDocs(); + FilterContext filter = query.getFilter(); + if (filter == null) { + return totalDocs; + } + return fullyMatches(segment, filter, query) ? totalDocs : 0; + } + + /** + * Returns {@code true} only if all rows of the segment provably satisfy the filter, based on min/max + * metadata. A {@code false} result never means "does not match"; it means "cannot prove that all rows match", which + * is always safe to treat as a 0 lower bound. NOT and unsupported predicates conservatively return {@code false}. + */ + private boolean fullyMatches(IndexSegment segment, FilterContext filter, QueryContext query) { + switch (filter.getType()) { + case AND: + assert filter.getChildren() != null; + for (FilterContext child : filter.getChildren()) { + if (!fullyMatches(segment, child, query)) { + return false; + } + } + return true; + case OR: + assert filter.getChildren() != null; + for (FilterContext child : filter.getChildren()) { + if (fullyMatches(segment, child, query)) { + return true; + } + } + return false; + case CONSTANT: + return filter.isConstantTrue(); + case PREDICATE: + return predicateFullyMatches(segment, filter.getPredicate(), query); + case NOT: + default: + return false; + } + } + + /** + * Returns {@code true} only if all rows of the segment provably satisfy the predicate, based on the predicate + * column's min/max metadata. Only identifier predicates on non-nullable columns of types {@code RANGE} (e.g. + * {@code >, >=, <, <=}), {@code EQ} ({@code =}) and {@code NOT_EQ} ({@code <>}) are supported; everything else + * conservatively returns {@code false}. + */ + private boolean predicateFullyMatches(IndexSegment segment, Predicate predicate, QueryContext query) { + ExpressionContext lhs = predicate.getLhs(); + if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) { + return false; + } + String column = lhs.getIdentifier(); + // Nulls are stored as a default value that pollutes the column min/max metadata (and are excluded from comparisons + // under null handling), so full-match cannot be reasoned about when null handling is active for the column. + if (isNullHandlingActive(query, column)) { + return false; + } + DataSource dataSource = segment.getDataSource(column, query.getSchema()); + DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata(); + Comparable minValue = dataSourceMetadata.getMinValue(); + Comparable maxValue = dataSourceMetadata.getMaxValue(); + if (minValue == null || maxValue == null) { + return false; + } + // NaN (FLOAT/DOUBLE) breaks the ordering assumptions: Float/Double.compareTo treats NaN as the largest value, which + // does not match filter semantics (comparisons against NaN are never true). A NaN min/max could make a segment look + // fully-matching when its NaN rows actually do not match, so refuse to reason about full match in that case. + if (isNaN(minValue) || isNaN(maxValue)) { + return false; + } + DataType dataType = dataSourceMetadata.getDataType(); + try { + switch (predicate.getType()) { + case RANGE: + return rangeFullyMatches((RangePredicate) predicate, minValue, maxValue, dataType); + case EQ: { + Comparable value = convertValue(((EqPredicate) predicate).getValue(), dataType); + // All rows equal the value iff the whole segment collapses to that single value. + return minValue.compareTo(value) == 0 && maxValue.compareTo(value) == 0; + } + case NOT_EQ: { + Comparable value = convertValue(((NotEqPredicate) predicate).getValue(), dataType); + // All rows differ from the value iff the value lies outside [min, max]. + return value.compareTo(minValue) < 0 || value.compareTo(maxValue) > 0; + } + default: + return false; + } + } catch (Exception e) { + // Different data types / unparseable literal: cannot prove full match. + return false; + } + } + + /** + * Returns {@code true} if the segment's whole {@code [minValue, maxValue]} range provably satisfies the range + * predicate (i.e. it is fully contained within the predicate's bounds). + */ + private boolean rangeFullyMatches(RangePredicate predicate, Comparable minValue, Comparable maxValue, + DataType dataType) { + String lowerBound = predicate.getLowerBound(); + if (!lowerBound.equals(RangePredicate.UNBOUNDED)) { + Comparable lowerBoundValue = convertValue(lowerBound, dataType); + if (predicate.isLowerInclusive()) { + if (minValue.compareTo(lowerBoundValue) < 0) { + return false; + } + } else { + if (minValue.compareTo(lowerBoundValue) <= 0) { + return false; + } + } + } + String upperBound = predicate.getUpperBound(); + if (!upperBound.equals(RangePredicate.UNBOUNDED)) { + Comparable upperBoundValue = convertValue(upperBound, dataType); + if (predicate.isUpperInclusive()) { + if (maxValue.compareTo(upperBoundValue) > 0) { + return false; + } + } else { + if (maxValue.compareTo(upperBoundValue) >= 0) { + return false; + } + } + } + return true; + } + + /** + * Returns whether null handling is active for the column, in which case this optimization must be skipped. Pinot + * stores nulls as a default value that pollutes the column min/max metadata and the total doc count; only when null + * handling is enabled are those null rows excluded from comparisons, which is when the pollution becomes unsafe to + * reason about (with null handling disabled, nulls are simply the default value and the min/max stay accurate). This + * mirrors the null caution in {@code SelectionPlanNode#isSorted}. + *

    A column carries null semantics only when null handling is enabled for the query and the column is + * nullable. Nullability follows the same resolution used at segment build time (e.g. + * {@code BaseSegmentCreator#isNullable}): under column-based null handling the per-column + * {@link FieldSpec#isNullable} flag, otherwise (table/query-level null handling) all columns are nullable. The check + * is conservative: an unknown schema or column is treated as null-handling-active. + */ + private static boolean isNullHandlingActive(QueryContext query, String column) { + if (!query.isNullHandlingEnabled()) { + // Null semantics are off: nulls are just the default value, so min/max and doc counts stay accurate. + return false; + } + Schema schema = query.getSchema(); + if (schema == null) { + return true; + } + if (schema.isEnableColumnBasedNullHandling()) { + // Column-based null handling: only nullable columns carry nulls. + FieldSpec fieldSpec = schema.getFieldSpecFor(column); + return fieldSpec == null || fieldSpec.isNullable(); + } + // Table/query-level (legacy) null handling applies null semantics to all columns. + return true; + } + + private static boolean isNaN(Comparable value) { + return (value instanceof Double && ((Double) value).isNaN()) + || (value instanceof Float && ((Float) value).isNaN()); + } + + /** + * Converts a predicate literal to the column's stored type. Any parse failure propagates to the caller, which treats + * it as "cannot prove full match" (this pruner must stay conservative; an actually invalid query is rejected by the + * preceding {@link ColumnValueSegmentPruner} or by query execution). + */ + private static Comparable convertValue(String stringValue, DataType dataType) { + return dataType.convertInternal(stringValue); + } + private static class MinMaxValue { final int _index; final Comparable _minValue; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java index 2f65ef11af85..fc1052325dba 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java @@ -29,6 +29,7 @@ import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.datasource.DataSourceMetadata; import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.testng.annotations.Test; @@ -37,6 +38,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; @@ -44,6 +46,15 @@ public class SelectionQuerySegmentPrunerTest { public static final String ORDER_BY_COLUMN = "testColumn"; + public static final String FILTER_COLUMN = "foo"; + + // Schema with a LONG order-by column and a STRING column, used by the filter-aware tests. Null handling is off, so + // these columns are not treated as null-handling-active even though they are nullable by default. + private static final Schema SCHEMA = new Schema.SchemaBuilder() + .addSingleValueDimension(ORDER_BY_COLUMN, FieldSpec.DataType.LONG) + .addSingleValueDimension(FILTER_COLUMN, FieldSpec.DataType.STRING) + .build(); + private final SelectionQuerySegmentPruner _segmentPruner = new SelectionQuerySegmentPruner(); @Test @@ -211,12 +222,279 @@ public void testUpsertTable() { assertEquals(result.size(), 3); } + /** + * Range-partitioned, non-overlapping segments on {@code testColumn}: [0,9], [10,19], [20,29], [30,39], [40,49]. + */ + private List rangePartitionedSegments() { + return Arrays.asList( + getIndexSegment(0L, 9L, 10), // 0 + getIndexSegment(10L, 19L, 10), // 1 + getIndexSegment(20L, 29L, 10), // 2 + getIndexSegment(30L, 39L, 10), // 3 + getIndexSegment(40L, 49L, 10)); // 4 + } + + private List prune(List segments, String query) { + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query); + queryContext.setSchema(SCHEMA); + return _segmentPruner.prune(segments, queryContext); + } + + @Test + public void testSelectionOrderByWithFilterDesc() { + List segments = rangePartitionedSegments(); + // DESC, want largest 5 with col > 25: all live in [40,49]. Only the top fully-matching segment is kept. + List result = + prune(segments, "SELECT * FROM testTable WHERE testColumn > 25 ORDER BY testColumn DESC LIMIT 5"); + assertEquals(result, List.of(segments.get(4))); + + // >= behaves the same here (min 40 >= 30). + result = prune(segments, "SELECT * FROM testTable WHERE testColumn >= 30 ORDER BY testColumn DESC LIMIT 5"); + assertEquals(result, List.of(segments.get(4))); + } + + @Test + public void testSelectionOrderByWithFilterAsc() { + List segments = rangePartitionedSegments(); + // ASC, want smallest 15 with col < 25: live in [0,9] (0-9) and [10,19] (10-14). Higher segments are pruned. + List result = + prune(segments, "SELECT * FROM testTable WHERE testColumn < 25 ORDER BY testColumn LIMIT 15"); + assertEquals(result, List.of(segments.get(0), segments.get(1))); + + // <= behaves the same here (max 19 <= 24). + result = prune(segments, "SELECT * FROM testTable WHERE testColumn <= 24 ORDER BY testColumn LIMIT 15"); + assertEquals(result, List.of(segments.get(0), segments.get(1))); + } + + @Test + public void testFilterStraddlingSegmentIsKept() { + // Correctness counterexample for the lower-bound rule: DESC, col < 35, LIMIT 5. The answer (34..30) lives in the + // straddling segment [30,39], which is counted as 0 matching docs. A naive getTotalDocs() accumulation would let + // the boundary advance past it and prune it; the lower-bound rule must keep it. + List segments = rangePartitionedSegments(); + List result = + prune(segments, "SELECT * FROM testTable WHERE testColumn < 35 ORDER BY testColumn DESC LIMIT 5"); + assertTrue(result.contains(segments.get(3)), "straddling segment [30,39] holding the top-n must be kept"); + } + + @Test + public void testFilterEqAndNotEq() { + // EQ fully matches only a single-valued segment; here add one collapsed to 25. + List segments = Arrays.asList( + getIndexSegment(0L, 9L, 10), // 0 + getIndexSegment(25L, 25L, 10), // 1 (single value 25) + getIndexSegment(40L, 49L, 10)); // 2 + // col = 25 ORDER BY col DESC LIMIT 5 -> only [25,25] fully matches; [40,49] (max 49 >= 25 boundary) pruned. + List result = + prune(segments, "SELECT * FROM testTable WHERE testColumn = 25 ORDER BY testColumn DESC LIMIT 5"); + assertTrue(result.contains(segments.get(1))); + + // col <> 25 DESC LIMIT 5 -> largest != 25 live in [40,49]; that segment fully matches (25 outside [40,49]). + result = prune(segments, "SELECT * FROM testTable WHERE testColumn <> 25 ORDER BY testColumn DESC LIMIT 5"); + assertEquals(result, List.of(segments.get(2))); + } + + @Test + public void testAndConjunctOnNonProvableColumnDisablesPruning() { + // col > 25 is provably-full on the high segments, but the ANDed predicate on an unanalyzable column ('foo' is not + // in the schema) cannot be proven full for any segment -> 0 lower bound everywhere -> nothing is pruned (safe). + List segments = rangePartitionedSegments(); + List result = prune(segments, + "SELECT * FROM testTable WHERE testColumn > 25 AND foo = 'x' ORDER BY testColumn DESC LIMIT 5"); + assertEquals(result.size(), segments.size()); + } + + @Test + public void testOrFilterFullMatch() { + // LIMIT exceeds the total doc count (5 segments * 10 docs), so guaranteedMatchingDocs() -> fullyMatches() is + // evaluated for every segment: the OR is fully matched when ANY child is (seg [30,39], [40,49] via testColumn > 25) + // and not matched otherwise. No segment can be pruned (the limit needs them all), so the result is unchanged. + List segments = rangePartitionedSegments(); + List result = prune(segments, + "SELECT * FROM testTable WHERE testColumn > 25 OR testColumn > 1000 ORDER BY testColumn LIMIT 100"); + assertEquals(result.size(), segments.size()); + } + + @Test + public void testFilterNotProvablyFullCases() { + // Each of these filters cannot be proven full-match from min/max metadata, so every segment contributes 0 and + // (with a LIMIT larger than the data) nothing is pruned. They exercise the conservative fall-through branches. + List segments = rangePartitionedSegments(); + // Predicate on a non-identifier (function) expression. + assertEquals(prune(segments, + "SELECT * FROM testTable WHERE testColumn + 1 > 5 ORDER BY testColumn LIMIT 100").size(), segments.size()); + // Unsupported predicate type (IN) for the full-match analysis. + assertEquals(prune(segments, + "SELECT * FROM testTable WHERE testColumn IN (1, 2, 3) ORDER BY testColumn LIMIT 100").size(), segments.size()); + // NOT filter is never treated as full-match. + assertEquals(prune(segments, + "SELECT * FROM testTable WHERE NOT (testColumn IN (1, 2, 3)) ORDER BY testColumn LIMIT 100").size(), + segments.size()); + // Bound larger than any value: no segment is fully contained, so nothing is pruned. + assertEquals(prune(segments, + "SELECT * FROM testTable WHERE testColumn > 99999999999999999999999 ORDER BY testColumn LIMIT 100").size(), + segments.size()); + } + + @Test + public void testAndAllChildrenFullMatch() { + // AND where every child is provably full on the top segment -> AND returns true and the set is limit-pruned. + List segments = rangePartitionedSegments(); + List result = prune(segments, + "SELECT * FROM testTable WHERE testColumn > 25 AND testColumn < 1000 ORDER BY testColumn DESC LIMIT 5"); + assertEquals(result, List.of(segments.get(4))); + } + + @Test + public void testRangeInclusiveBoundsNotFull() { + // Inclusive bounds where some processed segments are not fully contained, exercising the inclusive "not full" + // branches. LIMIT exceeds the data so nothing is pruned. + List segments = rangePartitionedSegments(); + assertEquals(prune(segments, + "SELECT * FROM testTable WHERE testColumn >= 25 ORDER BY testColumn LIMIT 100").size(), segments.size()); + assertEquals(prune(segments, + "SELECT * FROM testTable WHERE testColumn <= 25 ORDER BY testColumn LIMIT 100").size(), segments.size()); + } + + @Test + public void testPruneCalledWithNullHandlingActive() { + // prune() invoked directly (bypassing isApplicableTo) with null handling on: the predicate column is + // null-handling-active, so it is never provably full and nothing is pruned. + List segments = rangePartitionedSegments(); + List result = prune(segments, "SET enableNullHandling=true; " + + "SELECT * FROM testTable WHERE testColumn > 25 ORDER BY testColumn DESC LIMIT 100"); + assertEquals(result.size(), segments.size()); + } + + @Test + public void testFilterPredicateColumnWithoutMinMax() { + // The filter column has no min/max metadata, so it cannot be proven full-match -> 0 contribution, nothing pruned. + List segments = Arrays.asList( + getIndexSegment(0L, 9L, 10, false, null, null), + getIndexSegment(10L, 19L, 10, false, null, null)); + List result = + prune(segments, "SELECT * FROM testTable WHERE foo >= 'a' ORDER BY testColumn DESC LIMIT 5"); + assertEquals(result.size(), segments.size()); + } + + @Test + public void testFilteredSelectionOnlyNotPruned() { + // Selection-only (no ORDER BY) with a filter: count-based pruning is unsafe, so prune() keeps every segment. + List segments = rangePartitionedSegments(); + List result = prune(segments, "SELECT * FROM testTable WHERE testColumn > 25 LIMIT 5"); + assertEquals(result.size(), segments.size()); + } + + @Test + public void testFilterWithOffset() { + List segments = rangePartitionedSegments(); + // DESC, col > 25, LIMIT 5 OFFSET 30. remainingDocs = 35; the two fully-matching segments [30,39] and [40,49] + // provide only 20 matching docs, so the boundary never advances enough to prune them; both are kept. + List result = + prune(segments, "SELECT * FROM testTable WHERE testColumn > 25 ORDER BY testColumn DESC LIMIT 5, 30"); + assertEquals(result.size(), segments.size()); + } + + @Test + public void testFloatNaNNotTreatedAsFullMatch() { + // Regression: an all-NaN DOUBLE segment must not be counted as fully matching 'col > 5'. NaN sorts as the largest + // value, so without the NaN guard the all-NaN segment would (a) be counted as 10 matching docs and (b) sort first + // (DESC), advancing the boundary and wrongly pruning the segment [10, 20] that actually holds the top-n. + Schema doubleSchema = + new Schema.SchemaBuilder().addSingleValueDimension(ORDER_BY_COLUMN, FieldSpec.DataType.DOUBLE).build(); + IndexSegment nanSegment = getDoubleSegment(Double.NaN, Double.NaN, 10); + IndexSegment realSegment = getDoubleSegment(10.0, 20.0, 10); + QueryContext queryContext = QueryContextConverterUtils.getQueryContext( + "SELECT * FROM testTable WHERE testColumn > 5 ORDER BY testColumn DESC LIMIT 5"); + queryContext.setSchema(doubleSchema); + List result = _segmentPruner.prune(Arrays.asList(nanSegment, realSegment), queryContext); + assertTrue(result.contains(realSegment), "segment [10, 20] holding the top-n must not be pruned"); + } + + private IndexSegment getDoubleSegment(Double minValue, Double maxValue, int totalDocs) { + IndexSegment indexSegment = mock(IndexSegment.class); + DataSource dataSource = mock(DataSource.class); + when(indexSegment.getDataSource(eq(ORDER_BY_COLUMN), any(Schema.class))).thenReturn(dataSource); + DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class); + when(dataSource.getDataSourceMetadata()).thenReturn(dataSourceMetadata); + when(dataSourceMetadata.getMinValue()).thenReturn(minValue); + when(dataSourceMetadata.getMaxValue()).thenReturn(maxValue); + when(dataSourceMetadata.getDataType()).thenReturn(FieldSpec.DataType.DOUBLE); + SegmentMetadata segmentMetadata = mock(SegmentMetadata.class); + when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata); + when(segmentMetadata.getTotalDocs()).thenReturn(totalDocs); + return indexSegment; + } + + @Test + public void testIsApplicableTo() { + // No filter: applicable (existing behavior), with or without order by. + assertTrue(_segmentPruner.isApplicableTo(queryWithSchema("SELECT * FROM testTable LIMIT 5"))); + assertTrue(_segmentPruner.isApplicableTo(queryWithSchema("SELECT * FROM testTable ORDER BY testColumn LIMIT 5"))); + // LIMIT 0 with a filter: applicable (just keeps one segment for the schema). + assertTrue(_segmentPruner.isApplicableTo( + queryWithSchema("SELECT * FROM testTable WHERE testColumn > 5 ORDER BY testColumn LIMIT 0"))); + // Filtered order-by on a non-nullable identifier: applicable. + assertTrue(_segmentPruner.isApplicableTo( + queryWithSchema("SELECT * FROM testTable WHERE testColumn > 5 ORDER BY testColumn DESC LIMIT 5"))); + // Filtered selection-only (no order by): not applicable (count-based pruning unsafe with a filter). + assertFalse(_segmentPruner.isApplicableTo(queryWithSchema("SELECT * FROM testTable WHERE testColumn > 5 LIMIT 5"))); + // Filtered order-by on a non-identifier: not applicable. + assertFalse(_segmentPruner.isApplicableTo( + queryWithSchema("SELECT * FROM testTable WHERE testColumn > 5 ORDER BY testColumn + 1 DESC LIMIT 5"))); + // Filtered order-by but null handling enabled -> column treated as nullable -> not applicable. + assertFalse(_segmentPruner.isApplicableTo(queryWithSchema( + "SET enableNullHandling=true; SELECT * FROM testTable WHERE testColumn > 5 ORDER BY testColumn DESC LIMIT 5"))); + // Null handling on but no schema available -> conservatively treated as null-handling-active -> not applicable. + assertFalse(_segmentPruner.isApplicableTo(QueryContextConverterUtils.getQueryContext( + "SET enableNullHandling=true; SELECT * FROM testTable WHERE testColumn > 5 ORDER BY testColumn DESC LIMIT 5"))); + } + + @Test + public void testIsApplicableToColumnBasedNullHandling() { + String query = "SET enableNullHandling=true; " + + "SELECT * FROM testTable WHERE testColumn > 5 ORDER BY testColumn DESC LIMIT 5"; + + // Column-based null handling + non-nullable column: nulls cannot occur, so the optimization still applies even + // though null handling is enabled. + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query); + queryContext.setSchema(columnBasedNullHandlingSchema(false)); + assertTrue(_segmentPruner.isApplicableTo(queryContext)); + + // Column-based null handling + nullable column: skip (min/max may be polluted by nulls). + queryContext = QueryContextConverterUtils.getQueryContext(query); + queryContext.setSchema(columnBasedNullHandlingSchema(true)); + assertFalse(_segmentPruner.isApplicableTo(queryContext)); + } + + private static Schema columnBasedNullHandlingSchema(boolean orderByColumnNullable) { + Schema schema = new Schema.SchemaBuilder() + .addSingleValueDimension(ORDER_BY_COLUMN, FieldSpec.DataType.LONG) + .setEnableColumnBasedNullHandling(true) + .build(); + schema.getFieldSpecFor(ORDER_BY_COLUMN).setNullable(orderByColumnNullable); + return schema; + } + + private QueryContext queryWithSchema(String query) { + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query); + queryContext.setSchema(SCHEMA); + return queryContext; + } + private IndexSegment getIndexSegment(@Nullable Long minValue, @Nullable Long maxValue, int totalDocs) { return getIndexSegment(minValue, maxValue, totalDocs, false); } private IndexSegment getIndexSegment(@Nullable Long minValue, @Nullable Long maxValue, int totalDocs, boolean upsert) { + // A STRING filter column whose [min, max] = ["a", "z"] never collapses to a single value, so a predicate like + // 'foo = x' is never provably full-match -> exercises the "AND with a non-provable conjunct" path. + return getIndexSegment(minValue, maxValue, totalDocs, upsert, "a", "z"); + } + + private IndexSegment getIndexSegment(@Nullable Long minValue, @Nullable Long maxValue, int totalDocs, + boolean upsert, @Nullable String filterMinValue, @Nullable String filterMaxValue) { IndexSegment indexSegment = mock(IndexSegment.class); when(indexSegment.getColumnNames()).thenReturn(ImmutableSet.of("foo", "testColumn")); DataSource dataSource = mock(DataSource.class); @@ -225,6 +503,14 @@ private IndexSegment getIndexSegment(@Nullable Long minValue, @Nullable Long max when(dataSource.getDataSourceMetadata()).thenReturn(dataSourceMetadata); when(dataSourceMetadata.getMinValue()).thenReturn(minValue); when(dataSourceMetadata.getMaxValue()).thenReturn(maxValue); + when(dataSourceMetadata.getDataType()).thenReturn(FieldSpec.DataType.LONG); + DataSource filterDataSource = mock(DataSource.class); + when(indexSegment.getDataSource(eq(FILTER_COLUMN), any(Schema.class))).thenReturn(filterDataSource); + DataSourceMetadata filterMetadata = mock(DataSourceMetadata.class); + when(filterDataSource.getDataSourceMetadata()).thenReturn(filterMetadata); + when(filterMetadata.getMinValue()).thenReturn(filterMinValue); + when(filterMetadata.getMaxValue()).thenReturn(filterMaxValue); + when(filterMetadata.getDataType()).thenReturn(FieldSpec.DataType.STRING); SegmentMetadata segmentMetadata = mock(SegmentMetadata.class); when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata); when(segmentMetadata.getTotalDocs()).thenReturn(totalDocs); diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkSelectionOrderByFilterPruning.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkSelectionOrderByFilterPruning.java new file mode 100644 index 000000000000..ca35f0554333 --- /dev/null +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkSelectionOrderByFilterPruning.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.perf; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.core.query.config.SegmentPrunerConfig; +import org.apache.pinot.core.query.pruner.SegmentPrunerService; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.queries.BaseQueriesTest; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; +import org.openjdk.jmh.runner.options.OptionsBuilder; + + +/** + * End-to-end benchmark for the min/max segment pruning of selection {@code ORDER BY LIMIT n} queries with a + * filter (issue apache/pinot#18685). + *

    The table is range-partitioned on a sorted, nullable {@code TS_COL}: segment {@code i} covers the contiguous range + * {@code [i * numRows, (i + 1) * numRows)}. Each invocation runs the segment pruners (the default chain, which respects + * {@link org.apache.pinot.core.query.pruner.SegmentPruner#isApplicableTo}) and then executes the surviving segments via + * the single-stage engine. + *

    The comparison is self-contained (no need to diff against a base commit): the optimization is gated off when null + * handling is active, so running the same filtered query with {@code enableNullHandling=true} reproduces the + * pre-fix behavior (the filtered order-by is not limit-pruned and every matching segment is engaged), while running it + * without null handling exercises the fix (only the top segment(s) survive). The {@code NO_FILTER} query is a + * reference point that is limit-pruned regardless. + */ +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(1) +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Benchmark) +public class BenchmarkSelectionOrderByFilterPruning extends BaseQueriesTest { + private static final File INDEX_DIR = + new File(FileUtils.getTempDirectory(), "BenchmarkSelectionOrderByFilterPruning"); + private static final String TABLE_NAME = "MyTable"; + private static final String SEGMENT_NAME_TEMPLATE = "testSegment%d"; + private static final String TS_COL = "TS_COL"; + private static final String VAL_COL = "VAL_COL"; + + private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE) + .setTableName(TABLE_NAME) + .setSortedColumn(TS_COL) + .build(); + private static final Schema SCHEMA = new Schema.SchemaBuilder() + .setSchemaName(TABLE_NAME) + .addSingleValueDimension(TS_COL, FieldSpec.DataType.LONG) + .addSingleValueDimension(VAL_COL, FieldSpec.DataType.INT) + .build(); + + /** Which query shape to run; see {@link #buildQuery}. */ + public enum Mode { + /** Filtered order-by, optimization eligible (limit-pruned to the segments the LIMIT needs). */ + FILTER, + /** Same filtered query but with null handling on, which gates the optimization off (pre-fix behavior). */ + FILTER_NULL_HANDLING, + /** No filter: limit-pruned regardless of the fix (reference point). */ + NO_FILTER + } + + @Param({"50", "100", "200"}) + private int _numSegments; + // Small segments so a larger LIMIT spans several of them. + @Param({"20000"}) + private int _numRows; + @Param({"10", "200000"}) + private int _limit; + @Param({"FILTER", "FILTER_NULL_HANDLING", "NO_FILTER"}) + private Mode _mode; + + private String _query; + private List _allSegments; + private List _selectedSegments; + private SegmentPrunerService _segmentPrunerService; + + private static String buildQuery(Mode mode, int limit) { + String query = "SELECT * FROM MyTable " + + (mode == Mode.NO_FILTER ? "" : "WHERE TS_COL > 0 ") + + "ORDER BY TS_COL DESC LIMIT " + limit; + return mode == Mode.FILTER_NULL_HANDLING ? "SET enableNullHandling=true; " + query : query; + } + + public static void main(String[] args) + throws Exception { + ChainedOptionsBuilder opt = + new OptionsBuilder().include(BenchmarkSelectionOrderByFilterPruning.class.getSimpleName()); + new Runner(opt.build()).run(); + } + + @Setup + public void setUp() + throws Exception { + _query = buildQuery(_mode, _limit); + FileUtils.deleteQuietly(INDEX_DIR); + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(TABLE_CONFIG, SCHEMA); + _allSegments = new ArrayList<>(_numSegments); + for (int i = 0; i < _numSegments; i++) { + String name = String.format(SEGMENT_NAME_TEMPLATE, i); + buildSegment(name, i); + _allSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, name), indexLoadingConfig)); + } + _selectedSegments = _allSegments; + _segmentPrunerService = new SegmentPrunerService(new SegmentPrunerConfig(new PinotConfiguration())); + } + + @TearDown + public void tearDown() { + for (IndexSegment indexSegment : _allSegments) { + indexSegment.destroy(); + } + FileUtils.deleteQuietly(INDEX_DIR); + EXECUTOR_SERVICE.shutdownNow(); + } + + private void buildSegment(String segmentName, int segmentIndex) + throws Exception { + long baseValue = (long) segmentIndex * _numRows; + List rows = new ArrayList<>(_numRows); + for (int i = 0; i < _numRows; i++) { + GenericRow row = new GenericRow(); + // Contiguous, non-overlapping, sorted range per segment. + row.putValue(TS_COL, baseValue + i); + row.putValue(VAL_COL, ThreadLocalRandom.current().nextInt()); + rows.add(row); + } + SegmentGeneratorConfig config = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA); + config.setOutDir(INDEX_DIR.getPath()); + config.setTableName(TABLE_NAME); + config.setSegmentName(segmentName); + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + try (RecordReader recordReader = new GenericRowRecordReader(rows)) { + driver.init(config, recordReader); + driver.build(); + } + } + + @Benchmark + public BrokerResponseNative query() { + // Prune first (this is what the fix affects), then execute the surviving segments through the single-stage engine. + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(_query); + queryContext.setSchema(SCHEMA); + _selectedSegments = _segmentPrunerService.prune(_allSegments, queryContext); + return getBrokerResponse(_query); + } + + @Override + protected String getFilter() { + return ""; + } + + @Override + protected IndexSegment getIndexSegment() { + return _allSegments.get(0); + } + + @Override + protected List getIndexSegments() { + return _selectedSegments; + } +}