Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,20 @@
import java.util.Comparator;
import java.util.List;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.common.request.context.predicate.EqPredicate;
import org.apache.pinot.common.request.context.predicate.NotEqPredicate;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.request.context.predicate.RangePredicate;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;


Expand All @@ -37,8 +46,14 @@
* <li>For selection query with LIMIT 0, keep 1 segment to create the data schema</li>
* <li>For selection only query without filter, keep enough documents to fulfill the LIMIT requirement</li>
* <li>
* For selection order-by query without filer, if the first order-by expression is an identifier (column), prune
* segments based on the column min/max value and keep enough documents to fulfill the LIMIT and OFFSET requirement.
* For selection order-by query, if the first order-by expression is an identifier (column), prune segments based on
* the column min/max value and keep enough documents to fulfill the LIMIT and OFFSET requirement. This works both
* without a filter and with a filter: with a filter, each segment contributes towards the LIMIT only the number of
* rows that <em>provably</em> match the filter based on min/max metadata (its total docs if it fully matches, 0
* otherwise). Using this lower bound on matching rows keeps the boundary safe, so segments are never pruned when
* they might still hold a top-n matching row. The optimization is skipped when null handling is active for the
* order-by/predicate columns because nulls are stored as a default value that pollutes the min/max metadata
* (see #18685).
* </li>
* </ul>
*/
Expand All @@ -51,10 +66,24 @@ public void init(PinotConfiguration config) {

@Override
public boolean isApplicableTo(QueryContext query) {
// Only prune selection queries
// If LIMIT is not 0, only prune segments for selection queries without filter
return QueryContextUtils.isSelectionQuery(query)
&& (query.getFilter() == null || query.getLimit() == 0);
if (!QueryContextUtils.isSelectionQuery(query)) {
return false;
}
// Without a filter (or for LIMIT 0, where we just keep one segment for the schema), pruning is always applicable.
if (query.getFilter() == null || query.getLimit() == 0) {
return true;
}
// With a filter, only the order-by-on-identifier path can prune safely (the selection-only path relies on exact
// doc counts, which a filter invalidates). Additionally, null handling must not be active for the order-by column,
// because nulls are stored as a default value that pollutes the column min/max metadata used for sorting and the
// boundary.
List<OrderByExpressionContext> orderByExpressions = query.getOrderByExpressions();
if (orderByExpressions == null) {
return false;
}
ExpressionContext firstOrderByExpression = orderByExpressions.get(0).getExpression();
return firstOrderByExpression.getType() == ExpressionContext.Type.IDENTIFIER
&& !isNullHandlingActive(query, firstOrderByExpression.getIdentifier());
}

@Override
Expand All @@ -75,7 +104,9 @@ public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query)
}

if (query.getOrderByExpressions() == null) {
return pruneSelectionOnly(segments, query);
// Count-based selection-only pruning is only safe without a filter (total docs is an exact match count). With a
// filter present this path is not selected (see isApplicableTo); guard defensively in case it is reached.
return query.getFilter() == null ? pruneSelectionOnly(segments, query) : segments;
} else {
return pruneSelectionOrderBy(segments, query);
}
Expand All @@ -100,14 +131,16 @@ private List<IndexSegment> pruneSelectionOnly(List<IndexSegment> segments, Query
}

/**
* Helper method to prune segments for selection order-by queries without filter.
* Helper method to prune segments for selection order-by queries.
* <p>When the first order-by expression is an identifier (column), we can prune segments based on the column min/max
* value:
* <ul>
* <li>1. Sort all the segments by the column min/max value</li>
* <li>2. Pick the top segments until we get enough documents to fulfill the LIMIT and OFFSET requirement</li>
* <li>3. Keep the segments that has value overlap with the selected ones; remove the others</li>
* </ul>
* <p>Each segment contributes towards the LIMIT only its {@link #guaranteedMatchingDocs} (a lower bound on its
* matching rows), so the optimization remains correct when a filter is present.
*/
private List<IndexSegment> pruneSelectionOrderBy(List<IndexSegment> segments, QueryContext query) {
List<OrderByExpressionContext> orderByExpressions = query.getOrderByExpressions();
Expand Down Expand Up @@ -157,7 +190,7 @@ private List<IndexSegment> pruneSelectionOrderBy(List<IndexSegment> segments, Qu
IndexSegment segment = segments.get(minMaxValue._index);
if (remainingDocs > 0) {
selectedSegments.add(segment);
remainingDocs -= segment.getSegmentMetadata().getTotalDocs();
remainingDocs -= guaranteedMatchingDocs(segment, query);
maxValue = minMaxValue._maxValue;
} else {
// After getting enough documents, prune all the segments with min value larger than the current max value, or
Expand All @@ -184,7 +217,7 @@ private List<IndexSegment> pruneSelectionOrderBy(List<IndexSegment> segments, Qu
IndexSegment segment = segments.get(minMaxValue._index);
if (remainingDocs > 0) {
selectedSegments.add(segment);
remainingDocs -= segment.getSegmentMetadata().getTotalDocs();
remainingDocs -= guaranteedMatchingDocs(segment, query);
minValue = minMaxValue._minValue;
} else {
// After getting enough documents, prune all the segments with max value smaller than the current min value,
Expand All @@ -201,6 +234,192 @@ private List<IndexSegment> pruneSelectionOrderBy(List<IndexSegment> segments, Qu
return selectedSegments;
}

/**
* Returns a lower bound on the number of rows in the segment that match the query filter, used to decide how many
* leading segments must be kept to guarantee the LIMIT + OFFSET requirement.
* <ul>
* <li>Without a filter, every row matches, so this is the exact total doc count.</li>
* <li>With a filter, this is the total doc count if the segment <em>provably</em> matches the filter for all of its
* rows (based on min/max metadata), and 0 otherwise. Using 0 for segments that only partially (or not provably
* fully) match is a safe under-count: such segments are still kept (they overlap the boundary), but they never let
* the boundary advance past rows they might contain.</li>
* </ul>
*/
private long guaranteedMatchingDocs(IndexSegment segment, QueryContext query) {
int totalDocs = segment.getSegmentMetadata().getTotalDocs();
FilterContext filter = query.getFilter();
if (filter == null) {
return totalDocs;
}
return fullyMatches(segment, filter, query) ? totalDocs : 0;
}

/**
* Returns {@code true} only if <em>all</em> rows of the segment provably satisfy the filter, based on min/max
* metadata. A {@code false} result never means "does not match"; it means "cannot prove that all rows match", which
* is always safe to treat as a 0 lower bound. NOT and unsupported predicates conservatively return {@code false}.
*/
private boolean fullyMatches(IndexSegment segment, FilterContext filter, QueryContext query) {
switch (filter.getType()) {
case AND:
assert filter.getChildren() != null;
for (FilterContext child : filter.getChildren()) {
if (!fullyMatches(segment, child, query)) {
return false;
}
}
return true;
case OR:
assert filter.getChildren() != null;
for (FilterContext child : filter.getChildren()) {
if (fullyMatches(segment, child, query)) {
return true;
}
}
return false;
case CONSTANT:
return filter.isConstantTrue();
case PREDICATE:
return predicateFullyMatches(segment, filter.getPredicate(), query);
case NOT:
default:
return false;
}
}

/**
* Returns {@code true} only if all rows of the segment provably satisfy the predicate, based on the predicate
* column's min/max metadata. Only identifier predicates on non-nullable columns of types {@code RANGE} (e.g.
* {@code >, >=, <, <=}), {@code EQ} ({@code =}) and {@code NOT_EQ} ({@code <>}) are supported; everything else
* conservatively returns {@code false}.
*/
private boolean predicateFullyMatches(IndexSegment segment, Predicate predicate, QueryContext query) {
ExpressionContext lhs = predicate.getLhs();
if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) {
return false;
}
String column = lhs.getIdentifier();
// Nulls are stored as a default value that pollutes the column min/max metadata (and are excluded from comparisons
// under null handling), so full-match cannot be reasoned about when null handling is active for the column.
if (isNullHandlingActive(query, column)) {
return false;
}
DataSource dataSource = segment.getDataSource(column, query.getSchema());
DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
Comparable minValue = dataSourceMetadata.getMinValue();
Comparable maxValue = dataSourceMetadata.getMaxValue();
if (minValue == null || maxValue == null) {
return false;
}
// NaN (FLOAT/DOUBLE) breaks the ordering assumptions: Float/Double.compareTo treats NaN as the largest value, which
// does not match filter semantics (comparisons against NaN are never true). A NaN min/max could make a segment look
// fully-matching when its NaN rows actually do not match, so refuse to reason about full match in that case.
if (isNaN(minValue) || isNaN(maxValue)) {
return false;
}
DataType dataType = dataSourceMetadata.getDataType();
try {
switch (predicate.getType()) {
case RANGE:
return rangeFullyMatches((RangePredicate) predicate, minValue, maxValue, dataType);
case EQ: {
Comparable value = convertValue(((EqPredicate) predicate).getValue(), dataType);
// All rows equal the value iff the whole segment collapses to that single value.
return minValue.compareTo(value) == 0 && maxValue.compareTo(value) == 0;
}
case NOT_EQ: {
Comparable value = convertValue(((NotEqPredicate) predicate).getValue(), dataType);
// All rows differ from the value iff the value lies outside [min, max].
return value.compareTo(minValue) < 0 || value.compareTo(maxValue) > 0;
}
default:
return false;
}
} catch (Exception e) {
// Different data types / unparseable literal: cannot prove full match.
return false;
}
}

/**
* Returns {@code true} if the segment's whole {@code [minValue, maxValue]} range provably satisfies the range
* predicate (i.e. it is fully contained within the predicate's bounds).
*/
private boolean rangeFullyMatches(RangePredicate predicate, Comparable minValue, Comparable maxValue,
DataType dataType) {
String lowerBound = predicate.getLowerBound();
if (!lowerBound.equals(RangePredicate.UNBOUNDED)) {
Comparable lowerBoundValue = convertValue(lowerBound, dataType);
if (predicate.isLowerInclusive()) {
if (minValue.compareTo(lowerBoundValue) < 0) {
return false;
}
} else {
if (minValue.compareTo(lowerBoundValue) <= 0) {
return false;
}
}
}
String upperBound = predicate.getUpperBound();
if (!upperBound.equals(RangePredicate.UNBOUNDED)) {
Comparable upperBoundValue = convertValue(upperBound, dataType);
if (predicate.isUpperInclusive()) {
if (maxValue.compareTo(upperBoundValue) > 0) {
return false;
}
} else {
if (maxValue.compareTo(upperBoundValue) >= 0) {
return false;
}
}
}
return true;
}

/**
* Returns whether null handling is active for the column, in which case this optimization must be skipped. Pinot
* stores nulls as a default value that pollutes the column min/max metadata and the total doc count; only when null
* handling is enabled are those null rows excluded from comparisons, which is when the pollution becomes unsafe to
* reason about (with null handling disabled, nulls are simply the default value and the min/max stay accurate). This
* mirrors the null caution in {@code SelectionPlanNode#isSorted}.
* <p>A column carries null semantics only when null handling is enabled for the query <b>and</b> the column is
* nullable. Nullability follows the same resolution used at segment build time (e.g.
* {@code BaseSegmentCreator#isNullable}): under column-based null handling the per-column
* {@link FieldSpec#isNullable} flag, otherwise (table/query-level null handling) all columns are nullable. The check
* is conservative: an unknown schema or column is treated as null-handling-active.
*/
private static boolean isNullHandlingActive(QueryContext query, String column) {
if (!query.isNullHandlingEnabled()) {
// Null semantics are off: nulls are just the default value, so min/max and doc counts stay accurate.
return false;
}
Schema schema = query.getSchema();
if (schema == null) {
return true;
}
if (schema.isEnableColumnBasedNullHandling()) {
// Column-based null handling: only nullable columns carry nulls.
FieldSpec fieldSpec = schema.getFieldSpecFor(column);
return fieldSpec == null || fieldSpec.isNullable();
}
// Table/query-level (legacy) null handling applies null semantics to all columns.
return true;
}

private static boolean isNaN(Comparable value) {
return (value instanceof Double && ((Double) value).isNaN())
|| (value instanceof Float && ((Float) value).isNaN());
}

/**
* Converts a predicate literal to the column's stored type. Any parse failure propagates to the caller, which treats
* it as "cannot prove full match" (this pruner must stay conservative; an actually invalid query is rejected by the
* preceding {@link ColumnValueSegmentPruner} or by query execution).
*/
private static Comparable convertValue(String stringValue, DataType dataType) {
return dataType.convertInternal(stringValue);
}

private static class MinMaxValue {
final int _index;
final Comparable _minValue;
Expand Down
Loading
Loading