From a089e4ba78a3d63c4e7ace4351a68fd26c7cb862 Mon Sep 17 00:00:00 2001 From: Siddharth Teotia Date: Wed, 20 May 2026 22:26:34 -0700 Subject: [PATCH 1/4] Push JSON-index GROUP BY COUNT(*) into a dictionary scan For shapes like SELECT jsonExtractIndex(col, '$.x', 'STRING'), COUNT(*) FROM t GROUP BY 1, GroupByPlanNode now routes to a new JsonIndexGroupByOperator that counts via per-value posting-list cardinality over the JSON-index dictionary, avoiding forward-index reads and JSON parsing on the matching docs. Same-path JSON_MATCH predicates are pushed into the index lookup; cross-column filters are applied as a residual bitmap. Parsing and same-path JSON_MATCH push-down helpers shared with JsonIndexDistinctOperator are extracted into JsonExtractIndexUtils. The DISTINCT operator's behavior is unchanged. A same-path JSON_MATCH that could match missing-path docs (IS_NULL) is not forwarded into the JSON-index lookup, so correctness does not depend on implementation-specific "returns empty map" behavior of the reader SPI. The new operator emits IntermediateRecord per (value, count) and lets the combine path sum across segments via CountAggregationFunction.merge. Out of scope for v1 and falling back to the existing GroupByOperator: COUNT(DISTINCT), MIN/MAX/SUM, HAVING, multiple group-by keys, mutable segments, and the multi-stage engine. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../core/data/table/IntermediateRecord.java | 2 +- .../operator/query/JsonExtractIndexUtils.java | 261 ++++++++++ .../query/JsonIndexDistinctOperator.java | 211 +------- .../query/JsonIndexGroupByOperator.java | 323 ++++++++++++ .../pinot/core/plan/GroupByPlanNode.java | 7 + .../query/JsonIndexGroupByOperatorTest.java | 482 ++++++++++++++++++ 6 files changed, 1085 insertions(+), 201 deletions(-) create mode 100644 pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonExtractIndexUtils.java create mode 100644 pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperator.java create mode 100644 pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperatorTest.java diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java index 85288d49f4a6..8136293968fa 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java @@ -29,7 +29,7 @@ public class IntermediateRecord { public final Record _record; public final Comparable[] _values; - IntermediateRecord(Key key, Record record, Comparable[] values) { + public IntermediateRecord(Key key, Record record, Comparable[] values) { _key = key; _record = record; _values = values; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonExtractIndexUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonExtractIndexUtils.java new file mode 100644 index 000000000000..ea042c1593c4 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonExtractIndexUtils.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import java.util.List; +import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.pinot.common.function.JsonPathCache; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FilterContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.common.request.context.predicate.JsonMatchPredicate; +import org.apache.pinot.common.request.context.predicate.Predicate; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.index.IndexService; +import org.apache.pinot.segment.spi.index.IndexType; +import org.apache.pinot.segment.spi.index.reader.JsonIndexReader; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.sql.parsers.CalciteSqlParser; + + +/** + * Shared parsing and filter-pushdown helpers for index-aware operators that consume a scalar + * {@code jsonExtractIndex(column, path, type[, defaultValue])} expression and a JSON index on the column. + */ +public final class JsonExtractIndexUtils { + private static final String FUNCTION_NAME_EXTRACT_INDEX = "jsonExtractIndex"; + + private JsonExtractIndexUtils() { + } + + /** + * Parsed view of a {@code jsonExtractIndex} call. + */ + public static final class ParsedJsonExtractIndex { + public final String _columnName; + public final String _jsonPathString; + public final FieldSpec.DataType _dataType; + @Nullable + public final String _defaultValueLiteral; + + public ParsedJsonExtractIndex(String columnName, String jsonPathString, FieldSpec.DataType dataType, + @Nullable String defaultValueLiteral) { + _columnName = columnName; + _jsonPathString = jsonPathString; + _dataType = dataType; + _defaultValueLiteral = defaultValueLiteral; + } + } + + /** + * Parses the given expression as a 3/4-arg scalar {@code jsonExtractIndex} or {@code jsonExtractScalar} call. + * Returns {@code null} when the expression has a different shape (wrong arity, non-literal args, MV result type, + * unsupported scalar type, {@code [*]} wildcard, or malformed JSON path). + * + *

This is a pure shape check; it does not consult segment metadata. + * Pair with {@link #canUseJsonIndex} to verify that the column has a usable JSON index on the path. + */ + @Nullable + public static ParsedJsonExtractIndex parseJsonExtractIndex(ExpressionContext expr) { + if (expr.getType() != ExpressionContext.Type.FUNCTION) { + return null; + } + String functionName = expr.getFunction().getFunctionName(); + if (!FUNCTION_NAME_EXTRACT_INDEX.equalsIgnoreCase(functionName)) { + return null; + } + List args = expr.getFunction().getArguments(); + if (args.size() != 3 && args.size() != 4) { + return null; + } + if (args.get(0).getType() != ExpressionContext.Type.IDENTIFIER) { + return null; + } + if (args.get(1).getType() != ExpressionContext.Type.LITERAL + || args.get(2).getType() != ExpressionContext.Type.LITERAL + || (args.size() == 4 && args.get(3).getType() != ExpressionContext.Type.LITERAL)) { + return null; + } + + String columnName = args.get(0).getIdentifier(); + String jsonPathString = args.get(1).getLiteral().getStringValue(); + String resultsType = args.get(2).getLiteral().getStringValue().toUpperCase(); + // Only single-value types are supported; MV (_ARRAY) would have incorrect flattened-to-real + // docId intersection since convertFlattenedDocIdsToDocIds is skipped for MV. + if (resultsType.endsWith("_ARRAY")) { + return null; + } + if (jsonPathString.contains("[*]")) { + return null; + } + + FieldSpec.DataType dataType; + try { + dataType = FieldSpec.DataType.valueOf(resultsType); + } catch (IllegalArgumentException e) { + return null; + } + switch (dataType) { + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BIG_DECIMAL: + case STRING: + break; + default: + return null; + } + + try { + JsonPathCache.INSTANCE.getOrCompute(jsonPathString); + } catch (Exception e) { + return null; + } + + String defaultValueLiteral = null; + if (args.size() == 4) { + defaultValueLiteral = args.get(3).getLiteral().getStringValue(); + try { + dataType.convert(defaultValueLiteral); + } catch (Exception e) { + return null; + } + } + + return new ParsedJsonExtractIndex(columnName, jsonPathString, dataType, defaultValueLiteral); + } + + /** + * Returns the JSON index reader for the column if present, falling back to a composite JSON index when available. + */ + @Nullable + public static JsonIndexReader getJsonIndexReader(DataSource dataSource) { + JsonIndexReader reader = dataSource.getJsonIndex(); + if (reader == null) { + Optional> compositeIndex = + IndexService.getInstance().getOptional("composite_json_index"); + if (compositeIndex.isPresent()) { + reader = (JsonIndexReader) dataSource.getIndex(compositeIndex.get()); + } + } + return reader; + } + + /** + * Returns {@code true} when the expression is a parseable {@code jsonExtractIndex}/{@code jsonExtractScalar} + * call and the referenced column has a usable JSON index covering the path. + */ + public static boolean canUseJsonIndex(IndexSegment indexSegment, ExpressionContext expr) { + ParsedJsonExtractIndex parsed = parseJsonExtractIndex(expr); + if (parsed == null) { + return false; + } + DataSource dataSource = indexSegment.getDataSourceNullable(parsed._columnName); + if (dataSource == null) { + return false; + } + JsonIndexReader reader = getJsonIndexReader(dataSource); + if (reader == null) { + return false; + } + return reader.isPathIndexed(parsed._jsonPathString); + } + + /** + * Walks the filter and returns the inner JSON-match string for a single same-column-same-path {@code JSON_MATCH} + * predicate that can be pushed into the JSON-index lookup. Returns {@code null} when no such predicate exists, + * or when more than one candidate is found and disambiguation would be unsafe. + */ + @Nullable + public static String extractSamePathJsonMatchFilter(ParsedJsonExtractIndex parsed, @Nullable FilterContext filter) { + if (filter == null) { + return null; + } + switch (filter.getType()) { + case PREDICATE: + return extractSamePathJsonMatchFilter(parsed, filter.getPredicate()); + case AND: + String matchingFilter = null; + for (FilterContext child : filter.getChildren()) { + String childFilter = extractSamePathJsonMatchFilter(parsed, child); + if (childFilter == null) { + continue; + } + if (matchingFilter != null) { + return null; + } + matchingFilter = childFilter; + } + return matchingFilter; + default: + return null; + } + } + + /** + * Returns {@code true} when the filter is exactly a single same-path JSON_MATCH predicate (no other clauses). + */ + public static boolean isOnlySamePathJsonMatchFilter(ParsedJsonExtractIndex parsed, @Nullable FilterContext filter) { + if (filter == null || filter.getType() != FilterContext.Type.PREDICATE) { + return false; + } + return extractSamePathJsonMatchFilter(parsed, filter.getPredicate()) != null; + } + + /** + * Returns {@code true} when the filter string can match documents that lack the JSON path entirely (i.e. an + * {@code IS_NULL} on the path). Callers use this to decide whether they must still account for missing-path + * documents after running an index-side filter. + */ + public static boolean jsonMatchFilterCanMatchMissingPath(String filterJsonString) { + try { + FilterContext filter = RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression(filterJsonString)); + return filter.getType() == FilterContext.Type.PREDICATE + && filter.getPredicate().getType() == Predicate.Type.IS_NULL; + } catch (Exception e) { + return false; + } + } + + @Nullable + private static String extractSamePathJsonMatchFilter(ParsedJsonExtractIndex parsed, Predicate predicate) { + if (!(predicate instanceof JsonMatchPredicate)) { + return null; + } + ExpressionContext lhs = predicate.getLhs(); + if (lhs.getType() != ExpressionContext.Type.IDENTIFIER + || !parsed._columnName.equals(lhs.getIdentifier())) { + return null; + } + String filterJsonString = ((JsonMatchPredicate) predicate).getValue(); + int start = filterJsonString.indexOf('"'); + if (start < 0) { + return null; + } + int end = filterJsonString.indexOf('"', start + 1); + if (end < 0) { + return null; + } + String filterPath = filterJsonString.substring(start + 1, end); + return parsed._jsonPathString.equals(filterPath) ? filterJsonString : null; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperator.java index 0970cac305e3..d9c1c8d595ba 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperator.java @@ -23,16 +23,10 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import javax.annotation.Nullable; -import org.apache.pinot.common.function.JsonPathCache; import org.apache.pinot.common.request.context.ExpressionContext; -import org.apache.pinot.common.request.context.FilterContext; import org.apache.pinot.common.request.context.OrderByExpressionContext; -import org.apache.pinot.common.request.context.RequestContextUtils; -import org.apache.pinot.common.request.context.predicate.JsonMatchPredicate; -import org.apache.pinot.common.request.context.predicate.Predicate; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.Operator; @@ -41,6 +35,7 @@ import org.apache.pinot.core.operator.ExplainAttributeBuilder; import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock; import org.apache.pinot.core.operator.filter.BaseFilterOperator; +import org.apache.pinot.core.operator.query.JsonExtractIndexUtils.ParsedJsonExtractIndex; import org.apache.pinot.core.query.distinct.table.BigDecimalDistinctTable; import org.apache.pinot.core.query.distinct.table.DistinctTable; import org.apache.pinot.core.query.distinct.table.DoubleDistinctTable; @@ -52,12 +47,9 @@ import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.SegmentContext; import org.apache.pinot.segment.spi.datasource.DataSource; -import org.apache.pinot.segment.spi.index.IndexService; -import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.reader.JsonIndexReader; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.query.QueryThreadContext; -import org.apache.pinot.sql.parsers.CalciteSqlParser; import org.roaringbitmap.RoaringBitmap; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; @@ -72,7 +64,6 @@ */ public class JsonIndexDistinctOperator extends BaseOperator { private static final String EXPLAIN_NAME = "DISTINCT_JSON_INDEX"; - private static final String FUNCTION_NAME = "jsonExtractIndex"; private final IndexSegment _indexSegment; private final SegmentContext _segmentContext; @@ -98,21 +89,22 @@ protected DistinctResultsBlock getNextBlock() { } ExpressionContext expr = expressions.get(0); - ParsedJsonExtractIndex parsed = parseJsonExtractIndex(expr); + ParsedJsonExtractIndex parsed = JsonExtractIndexUtils.parseJsonExtractIndex(expr); if (parsed == null) { throw new IllegalStateException("Expected 3/4-arg scalar jsonExtractIndex expression"); } DataSource dataSource = _indexSegment.getDataSource(parsed._columnName, _queryContext.getSchema()); - JsonIndexReader jsonIndexReader = getJsonIndexReader(dataSource); + JsonIndexReader jsonIndexReader = JsonExtractIndexUtils.getJsonIndexReader(dataSource); if (jsonIndexReader == null) { throw new IllegalStateException("Column " + parsed._columnName + " has no JSON index"); } - String pushedDownFilterJson = extractSamePathJsonMatchFilter(parsed, _queryContext.getFilter()); + String pushedDownFilterJson = JsonExtractIndexUtils.extractSamePathJsonMatchFilter(parsed, + _queryContext.getFilter()); boolean filterFullyPushedDown = pushedDownFilterJson != null - && isOnlySamePathJsonMatchFilter(parsed, _queryContext.getFilter()) - && !jsonMatchFilterCanMatchMissingPath(pushedDownFilterJson); + && JsonExtractIndexUtils.isOnlySamePathJsonMatchFilter(parsed, _queryContext.getFilter()) + && !JsonExtractIndexUtils.jsonMatchFilterCanMatchMissingPath(pushedDownFilterJson); // Fast path: when the filter is fully pushed down into the JSON index, we only need the distinct value strings. // This avoids reading posting lists, building per-value bitmaps, and converting flattened doc IDs. @@ -251,72 +243,6 @@ private void handleMissingDocs(DistinctTable distinctTable, ParsedJsonExtractInd } } - @Nullable - private static String extractSamePathJsonMatchFilter(ParsedJsonExtractIndex parsed, @Nullable FilterContext filter) { - if (filter == null) { - return null; - } - switch (filter.getType()) { - case PREDICATE: - return extractSamePathJsonMatchFilter(parsed, filter.getPredicate()); - case AND: - String matchingFilter = null; - for (FilterContext child : filter.getChildren()) { - String childFilter = extractSamePathJsonMatchFilter(parsed, child); - if (childFilter == null) { - continue; - } - if (matchingFilter != null) { - return null; - } - matchingFilter = childFilter; - } - return matchingFilter; - default: - return null; - } - } - - private static boolean isOnlySamePathJsonMatchFilter(ParsedJsonExtractIndex parsed, @Nullable FilterContext filter) { - if (filter == null || filter.getType() != FilterContext.Type.PREDICATE) { - return false; - } - return extractSamePathJsonMatchFilter(parsed, filter.getPredicate()) != null; - } - - private static boolean jsonMatchFilterCanMatchMissingPath(String filterJsonString) { - try { - FilterContext filter = RequestContextUtils.getFilter(CalciteSqlParser.compileToExpression(filterJsonString)); - return filter.getType() == FilterContext.Type.PREDICATE - && filter.getPredicate().getType() == Predicate.Type.IS_NULL; - } catch (Exception e) { - return false; - } - } - - @Nullable - private static String extractSamePathJsonMatchFilter(ParsedJsonExtractIndex parsed, Predicate predicate) { - if (!(predicate instanceof JsonMatchPredicate)) { - return null; - } - ExpressionContext lhs = predicate.getLhs(); - if (lhs.getType() != ExpressionContext.Type.IDENTIFIER - || !parsed._columnName.equals(lhs.getIdentifier())) { - return null; - } - String filterJsonString = ((JsonMatchPredicate) predicate).getValue(); - int start = filterJsonString.indexOf('"'); - if (start < 0) { - return null; - } - int end = filterJsonString.indexOf('"', start + 1); - if (end < 0) { - return null; - } - String filterPath = filterJsonString.substring(start + 1, end); - return parsed._jsonPathString.equals(filterPath) ? filterJsonString : null; - } - private DistinctTable createDistinctTable(DataSchema dataSchema, FieldSpec.DataType dataType, @Nullable OrderByExpressionContext orderByExpression) { int limit = _queryContext.getLimit(); @@ -449,19 +375,6 @@ private static boolean addToTable(StringDistinctTable table, String value, } } - @Nullable - private static JsonIndexReader getJsonIndexReader(DataSource dataSource) { - JsonIndexReader reader = dataSource.getJsonIndex(); - if (reader == null) { - Optional> compositeIndex = - IndexService.getInstance().getOptional("composite_json_index"); - if (compositeIndex.isPresent()) { - reader = (JsonIndexReader) dataSource.getIndex(compositeIndex.get()); - } - } - return reader; - } - @Nullable private RoaringBitmap buildFilteredDocIds() { BaseFilterOperator.FilteredDocIds filteredDocIds = _filterOperator.getFilteredDocIds(); @@ -470,93 +383,6 @@ private RoaringBitmap buildFilteredDocIds() { return docIds != null ? docIds.toRoaringBitmap() : null; } - @Nullable - private static ParsedJsonExtractIndex parseJsonExtractIndex(ExpressionContext expr) { - if (expr.getType() != ExpressionContext.Type.FUNCTION) { - return null; - } - if (!FUNCTION_NAME.equalsIgnoreCase(expr.getFunction().getFunctionName())) { - return null; - } - List args = expr.getFunction().getArguments(); - if (args.size() != 3 && args.size() != 4) { - return null; - } - if (args.get(0).getType() != ExpressionContext.Type.IDENTIFIER) { - return null; - } - if (args.get(1).getType() != ExpressionContext.Type.LITERAL - || args.get(2).getType() != ExpressionContext.Type.LITERAL - || (args.size() == 4 && args.get(3).getType() != ExpressionContext.Type.LITERAL)) { - return null; - } - - String columnName = args.get(0).getIdentifier(); - String jsonPathString = args.get(1).getLiteral().getStringValue(); - String resultsType = args.get(2).getLiteral().getStringValue().toUpperCase(); - // Only single-value types are supported; MV (_ARRAY) would have incorrect flattened-to-real - // docId intersection since convertFlattenedDocIdsToDocIds is skipped for MV. - if (resultsType.endsWith("_ARRAY")) { - return null; - } - if (jsonPathString.contains("[*]")) { - return null; - } - - FieldSpec.DataType dataType; - try { - dataType = FieldSpec.DataType.valueOf(resultsType); - } catch (IllegalArgumentException e) { - return null; - } - // Only types with a corresponding DistinctTable implementation are supported - switch (dataType) { - case INT: - case LONG: - case FLOAT: - case DOUBLE: - case BIG_DECIMAL: - case STRING: - break; - default: - return null; - } - - try { - JsonPathCache.INSTANCE.getOrCompute(jsonPathString); - } catch (Exception e) { - return null; - } - - String defaultValueLiteral = null; - if (args.size() == 4) { - defaultValueLiteral = args.get(3).getLiteral().getStringValue(); - try { - dataType.convert(defaultValueLiteral); - } catch (Exception e) { - return null; - } - } - - return new ParsedJsonExtractIndex(columnName, jsonPathString, dataType, defaultValueLiteral); - } - - private static final class ParsedJsonExtractIndex { - final String _columnName; - final String _jsonPathString; - final FieldSpec.DataType _dataType; - @Nullable - final String _defaultValueLiteral; - - ParsedJsonExtractIndex(String columnName, String jsonPathString, FieldSpec.DataType dataType, - @Nullable String defaultValueLiteral) { - _columnName = columnName; - _jsonPathString = jsonPathString; - _dataType = dataType; - _defaultValueLiteral = defaultValueLiteral; - } - } - @Override public List getChildOperators() { return Collections.singletonList(_filterOperator); @@ -597,26 +423,11 @@ protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) { } /** - * Returns true if the expression is the 3/4-arg scalar jsonExtractIndex form on a column with JSON index and the - * path is indexed. For OSS JSON index all paths are indexed. For composite JSON index, only paths in - * invertedIndexConfigs are indexed per key. + * Returns true if the expression is the 3/4-arg scalar {@code jsonExtractIndex} form on a column with a JSON index + * that covers the path. For OSS JSON index all paths are indexed; for the composite JSON index, only paths in + * {@code invertedIndexConfigs} are indexed per key. */ public static boolean canUseJsonIndexDistinct(IndexSegment indexSegment, ExpressionContext expr) { - ParsedJsonExtractIndex parsed = parseJsonExtractIndex(expr); - if (parsed == null) { - return false; - } - DataSource dataSource = indexSegment.getDataSourceNullable(parsed._columnName); - if (dataSource == null) { - return false; - } - JsonIndexReader reader = getJsonIndexReader(dataSource); - if (reader == null) { - return false; - } - if (!reader.isPathIndexed(parsed._jsonPathString)) { - return false; - } - return true; + return JsonExtractIndexUtils.canUseJsonIndex(indexSegment, expr); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperator.java new file mode 100644 index 000000000000..992a42c1eb32 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperator.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import com.google.common.base.CaseFormat; +import com.google.common.base.Preconditions; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.data.table.IntermediateRecord; +import org.apache.pinot.core.data.table.Key; +import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.ExplainAttributeBuilder; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.operator.filter.BaseFilterOperator; +import org.apache.pinot.core.operator.query.JsonExtractIndexUtils.ParsedJsonExtractIndex; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.index.reader.JsonIndexReader; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; + + +/** + * Group-by operator for {@code SELECT jsonExtractIndex(col, path, type[, default]), COUNT(*) ... GROUP BY 1} + * when {@code col} has a JSON index covering {@code path}. + * + *

Execution flow: + * 1. Push a same-path {@code JSON_MATCH} predicate into the JSON-index lookup when it cannot match missing paths. + * 2. For each distinct value at the path, count the source documents via posting-list cardinality, intersecting + * with the residual WHERE bitmap when one is needed. + * 3. Track documents that have no value at the path; emit a missing-path group consistent with the + * {@link JsonIndexDistinctOperator} contract (default literal / null / throw). + * + *

The output is a {@link GroupByResultsBlock} carrying one {@link IntermediateRecord} per emitted group, which the + * downstream combine operator merges across segments via {@code CountAggregationFunction.merge} (sums). + * + *

Semantic note on array-traversing paths. The JSON index flattens JSON arrays into one row per element, + * so a document with {@code {"items": [{"name":"a"},{"name":"b"}]}} contributes to two distinct values at the + * path {@code $.items.name}. This operator therefore reports per-element counts ("multikey" semantics) — the same + * doc is counted once per distinct value present in its array. For non-array-traversing paths this matches the + * scalar {@code jsonExtractScalar} evaluation exactly; for array-traversing paths it diverges. This matches the + * behavior already established by {@link JsonIndexDistinctOperator}. + */ +public class JsonIndexGroupByOperator extends BaseOperator { + private static final String EXPLAIN_NAME = "GROUP_BY_JSON_INDEX"; + // Empty order-by-values array reused across IntermediateRecord instances. The combine path reads only key/record, + // not values, so a shared singleton is safe. + private static final Comparable[] EMPTY_ORDER_BY_VALUES = new Comparable[0]; + + private final IndexSegment _indexSegment; + private final QueryContext _queryContext; + private final BaseFilterOperator _filterOperator; + private final DataSchema _dataSchema; + private final ParsedJsonExtractIndex _parsed; + + private int _numEntriesExamined = 0; + private long _numEntriesScannedInFilter = 0; + + /** + * SegmentContext is accepted for parity with {@link JsonIndexDistinctOperator} and the regular + * {@code GroupByPlanNode} call site. It is intentionally not stored: the data source lookup goes through + * {@code IndexSegment#getDataSource}, and any upsert / valid-doc-id filtering flows through the + * {@code BaseFilterOperator} that {@code FilterPlanNode} produced for the caller. + */ + public JsonIndexGroupByOperator(IndexSegment indexSegment, SegmentContext segmentContext, + QueryContext queryContext, BaseFilterOperator filterOperator) { + _indexSegment = indexSegment; + _queryContext = queryContext; + _filterOperator = filterOperator; + + ExpressionContext groupByExpression = queryContext.getGroupByExpressions().get(0); + _parsed = JsonExtractIndexUtils.parseJsonExtractIndex(groupByExpression); + Preconditions.checkState(_parsed != null, + "Expected a parseable jsonExtractIndex group-by expression"); + + AggregationFunction countFn = queryContext.getAggregationFunctions()[0]; + _dataSchema = new DataSchema( + new String[]{groupByExpression.toString(), countFn.getResultColumnName()}, + new ColumnDataType[]{ColumnDataType.fromDataTypeSV(_parsed._dataType), + countFn.getIntermediateResultColumnType()}); + } + + /** + * Returns {@code true} when the query is a single-group-by, single-{@code COUNT(*)} aggregation over a JSON-indexed + * path, with no {@code HAVING} clause and no filtered aggregations. All other shapes fall back to + * {@link GroupByOperator}. + */ + public static boolean canUse(IndexSegment indexSegment, QueryContext queryContext) { + List groupByExpressions = queryContext.getGroupByExpressions(); + if (groupByExpressions == null || groupByExpressions.size() != 1) { + return false; + } + if (!JsonExtractIndexUtils.canUseJsonIndex(indexSegment, groupByExpressions.get(0))) { + return false; + } + + AggregationFunction[] aggFns = queryContext.getAggregationFunctions(); + if (aggFns == null || aggFns.length != 1) { + return false; + } + if (aggFns[0].getType() != AggregationFunctionType.COUNT) { + return false; + } + // CountAggregationFunction.getInputExpressions() returns the empty list for plain COUNT(*) (when null handling + // is disabled). COUNT(col) or null-handling COUNT(*) returns a non-empty list — both rejected for v1. + if (!aggFns[0].getInputExpressions().isEmpty()) { + return false; + } + if (queryContext.getHavingFilter() != null) { + return false; + } + if (queryContext.hasFilteredAggregations()) { + return false; + } + return true; + } + + @Override + protected GroupByResultsBlock getNextBlock() { + DataSource dataSource = _indexSegment.getDataSource(_parsed._columnName, _queryContext.getSchema()); + JsonIndexReader jsonIndexReader = JsonExtractIndexUtils.getJsonIndexReader(dataSource); + Preconditions.checkState(jsonIndexReader != null, + "Column %s has no JSON index", _parsed._columnName); + + // Decide which slice of the WHERE clause can be evaluated entirely inside the JSON-index lookup. + String pushedDownFilterJson = JsonExtractIndexUtils.extractSamePathJsonMatchFilter(_parsed, + _queryContext.getFilter()); + // Don't push a filter that can match missing-path docs into the JSON-index lookup. The JsonIndexReader SPI does + // not contractually guarantee how implementations represent missing-path matches in the returned map; relying on + // them to return an empty map for IS_NULL would tie correctness to an implementation detail. Fall back to the + // row-level filter, which evaluates IS_NULL correctly via the null-value vector / scan path. + if (pushedDownFilterJson != null + && JsonExtractIndexUtils.jsonMatchFilterCanMatchMissingPath(pushedDownFilterJson)) { + pushedDownFilterJson = null; + } + boolean filterFullyPushedDown = pushedDownFilterJson != null + && JsonExtractIndexUtils.isOnlySamePathJsonMatchFilter(_parsed, _queryContext.getFilter()); + + // When the filter is fully pushed down we trust the index-side filter and skip materializing the WHERE bitmap; + // otherwise we evaluate the full filter and intersect per-value posting lists with it. + RoaringBitmap fullFilterDocs = filterFullyPushedDown ? null : buildFilteredDocIds(); + if (fullFilterDocs != null && fullFilterDocs.isEmpty()) { + return new GroupByResultsBlock(_dataSchema, Collections.emptyList(), _queryContext); + } + + Map valueToDocs = + jsonIndexReader.getMatchingFlattenedDocsMap(_parsed._jsonPathString, pushedDownFilterJson); + jsonIndexReader.convertFlattenedDocIdsToDocIds(valueToDocs); + + // When the filter is fully pushed down, every doc that lacks the path was already excluded by it, so we don't + // need to track covered docs or emit a missing-path group. + RoaringBitmap coveredDocs = filterFullyPushedDown ? null : new RoaringBitmap(); + + List results = new ArrayList<>(valueToDocs.size()); + for (Map.Entry entry : valueToDocs.entrySet()) { + _numEntriesExamined++; + QueryThreadContext.checkTerminationAndSampleUsagePeriodically(_numEntriesExamined, EXPLAIN_NAME); + + String value = entry.getKey(); + RoaringBitmap docIds = entry.getValue(); + + long count; + if (fullFilterDocs == null) { + count = docIds.getLongCardinality(); + if (coveredDocs != null) { + coveredDocs.or(docIds); + } + } else { + count = RoaringBitmap.andCardinality(docIds, fullFilterDocs); + if (count == 0) { + continue; + } + coveredDocs.or(docIds); + } + if (count == 0) { + continue; + } + + Object groupKey = convertValue(value, _parsed._dataType); + results.add(buildRecord(groupKey, count)); + } + + // Account for documents that had no value at the path. When the filter was fully pushed down it can't match + // missing-path docs by construction (we required !jsonMatchFilterCanMatchMissingPath), so there are no missing + // docs to attribute. + if (!filterFullyPushedDown) { + long expectedTotal; + long coveredCard; + if (fullFilterDocs == null) { + expectedTotal = _indexSegment.getSegmentMetadata().getTotalDocs(); + coveredCard = coveredDocs.getLongCardinality(); + } else { + expectedTotal = fullFilterDocs.getLongCardinality(); + // Restrict the "covered" set to docs that survived the WHERE. + coveredDocs.and(fullFilterDocs); + coveredCard = coveredDocs.getLongCardinality(); + } + long missing = expectedTotal - coveredCard; + if (missing > 0) { + results.add(buildMissingPathRecord(missing)); + } + } + + return new GroupByResultsBlock(_dataSchema, results, _queryContext); + } + + private IntermediateRecord buildMissingPathRecord(long missing) { + Object missingKey; + if (_parsed._defaultValueLiteral != null) { + missingKey = convertValue(_parsed._defaultValueLiteral, _parsed._dataType); + } else if (_queryContext.isNullHandlingEnabled()) { + missingKey = null; + } else { + throw new RuntimeException( + String.format("Illegal Json Path: [%s], for some docIds in segment [%s]", + _parsed._jsonPathString, _indexSegment.getSegmentName())); + } + return buildRecord(missingKey, missing); + } + + private static IntermediateRecord buildRecord(@Nullable Object groupKey, long count) { + Object[] keyValues = {groupKey}; + Object[] rowValues = {groupKey, count}; + return new IntermediateRecord(new Key(keyValues), new Record(rowValues), EMPTY_ORDER_BY_VALUES); + } + + private static Object convertValue(String stringValue, FieldSpec.DataType dataType) { + switch (dataType) { + case INT: + return Integer.parseInt(stringValue); + case LONG: + return Long.parseLong(stringValue); + case FLOAT: + return Float.parseFloat(stringValue); + case DOUBLE: + return Double.parseDouble(stringValue); + case BIG_DECIMAL: + return new BigDecimal(stringValue); + case STRING: + return stringValue; + default: + throw new IllegalStateException("Unsupported data type for JSON index group-by: " + dataType); + } + } + + @Nullable + private RoaringBitmap buildFilteredDocIds() { + BaseFilterOperator.FilteredDocIds filteredDocIds = _filterOperator.getFilteredDocIds(); + _numEntriesScannedInFilter = filteredDocIds.getNumEntriesScannedInFilter(); + ImmutableRoaringBitmap docIds = filteredDocIds.getDocIds(); + return docIds != null ? docIds.toRoaringBitmap() : null; + } + + @Override + public List getChildOperators() { + return Collections.singletonList(_filterOperator); + } + + @Override + public IndexSegment getIndexSegment() { + return _indexSegment; + } + + @Override + public ExecutionStatistics getExecutionStatistics() { + int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs(); + return new ExecutionStatistics(0, _numEntriesScannedInFilter, 0, numTotalDocs); + } + + @Override + public String toExplainString() { + StringBuilder sb = new StringBuilder(EXPLAIN_NAME).append("(groupKey:"); + sb.append(_queryContext.getGroupByExpressions().get(0).toString()); + sb.append(", aggregations:").append(_queryContext.getAggregationFunctions()[0].toExplainString()); + return sb.append(')').toString(); + } + + @Override + protected String getExplainName() { + return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, EXPLAIN_NAME); + } + + @Override + protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) { + super.explainAttributes(attributeBuilder); + attributeBuilder.putStringList("groupKeys", + List.of(_queryContext.getGroupByExpressions().get(0).toString())); + attributeBuilder.putStringList("aggregations", + List.of(_queryContext.getAggregationFunctions()[0].toExplainString())); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java index b44b8dc8ee4f..1d52a3d56bc9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java @@ -23,6 +23,7 @@ import org.apache.pinot.core.operator.filter.BaseFilterOperator; import org.apache.pinot.core.operator.query.FilteredGroupByOperator; import org.apache.pinot.core.operator.query.GroupByOperator; +import org.apache.pinot.core.operator.query.JsonIndexGroupByOperator; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.IndexSegment; @@ -46,6 +47,12 @@ public GroupByPlanNode(SegmentContext segmentContext, QueryContext queryContext) @Override public Operator run() { assert _queryContext.getAggregationFunctions() != null && _queryContext.getGroupByExpressions() != null; + // Prefer the index-based GROUP BY when the shape is GROUP BY jsonExtract...(col, path, type[, default]) + COUNT(*) + // on a column whose JSON index covers the path. canUse() already excludes filtered aggregations and HAVING. + if (JsonIndexGroupByOperator.canUse(_indexSegment, _queryContext)) { + BaseFilterOperator filterOperator = new FilterPlanNode(_segmentContext, _queryContext).run(); + return new JsonIndexGroupByOperator(_indexSegment, _segmentContext, _queryContext, filterOperator); + } return _queryContext.hasFilteredAggregations() ? buildFilteredGroupByPlan() : buildNonFilteredGroupByPlan(); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperatorTest.java new file mode 100644 index 000000000000..67ce7e3d69b5 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperatorTest.java @@ -0,0 +1,482 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.data.table.IntermediateRecord; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.operator.filter.BaseFilterOperator; +import org.apache.pinot.core.operator.filter.BitmapCollection; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.index.reader.JsonIndexReader; +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; + + +/** + * Unit tests for {@link JsonIndexGroupByOperator}. + */ +public class JsonIndexGroupByOperatorTest { + private static final String EXTRACT_INDEX = "JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING')"; + private static final String EXTRACT_INDEX_WITH_DEFAULT = + "JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING', 'missing')"; + private static final String SAME_PATH_FILTER = "REGEXP_LIKE(\"$.instance\", '.*test.*')"; + private static final String CROSS_PATH_FILTER = "REGEXP_LIKE(\"$.env\", 'prod.*')"; + private static final String SAME_PATH_IS_NULL_FILTER = "\"$.instance\" IS NULL"; + + @Test + public void testGroupByWithoutFilterCountsAllDocs() { + QueryContext queryContext = groupByQuery(EXTRACT_INDEX, null); + + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + Map flattenedDocsByValue = new HashMap<>(); + flattenedDocsByValue.put("east", bitmap(100, 101)); + flattenedDocsByValue.put("west", bitmap(200)); + when(jsonIndexReader.getMatchingFlattenedDocsMap("$.instance", null)).thenReturn(flattenedDocsByValue); + stubConvertedDocIds(jsonIndexReader, Map.of( + "east", bitmap(0, 1), + "west", bitmap(2))); + + GroupByResultsBlock block = buildOperatorMatchAll(queryContext, jsonIndexReader, 3).nextBlock(); + + assertEquals(extractCounts(block), Map.of("east", 2L, "west", 1L)); + } + + @Test + public void testSamePathJsonMatchPushedDownNoMissingPathGroup() { + QueryContext queryContext = groupByQuery(EXTRACT_INDEX, SAME_PATH_FILTER); + + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + // The pushed-down filter restricts which flattened docs come back. + Map flattenedDocsByValue = new HashMap<>(); + flattenedDocsByValue.put("test-east", bitmap(10)); + flattenedDocsByValue.put("test-west", bitmap(20, 21)); + when(jsonIndexReader.getMatchingFlattenedDocsMap("$.instance", SAME_PATH_FILTER)) + .thenReturn(flattenedDocsByValue); + stubConvertedDocIds(jsonIndexReader, Map.of( + "test-east", bitmap(0), + "test-west", bitmap(1, 2))); + + GroupByResultsBlock block = buildOperator(queryContext, jsonIndexReader, bufferBitmap(0, 1, 2), 5).nextBlock(); + + // Even though only 3 of 5 docs are covered, no missing-path group is emitted because the same-path JSON_MATCH + // filter cannot match docs that lack the path. + assertEquals(extractCounts(block), Map.of("test-east", 1L, "test-west", 2L)); + } + + @Test + public void testCrossPathFilterAppliedAsResidual() { + QueryContext queryContext = groupByQuery(EXTRACT_INDEX, CROSS_PATH_FILTER); + + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + Map flattenedDocsByValue = new HashMap<>(); + flattenedDocsByValue.put("prod-a", bitmap(100)); + flattenedDocsByValue.put("prod-b", bitmap(200)); + flattenedDocsByValue.put("other", bitmap(300)); + // The cross-path filter is NOT pushed down so getMatchingFlattenedDocsMap is called with null. + when(jsonIndexReader.getMatchingFlattenedDocsMap("$.instance", null)).thenReturn(flattenedDocsByValue); + stubConvertedDocIds(jsonIndexReader, Map.of( + "prod-a", bitmap(0), + "prod-b", bitmap(1), + "other", bitmap(2))); + + // Filter operator says docs 0 and 1 pass the cross-path filter. + GroupByResultsBlock block = buildOperator(queryContext, jsonIndexReader, bufferBitmap(0, 1), 3).nextBlock(); + + // "other" is filtered out; both surviving docs have a value at the path so no missing-path group. + assertEquals(extractCounts(block), Map.of("prod-a", 1L, "prod-b", 1L)); + verify(jsonIndexReader).getMatchingFlattenedDocsMap("$.instance", null); + } + + @Test + public void testMissingPathDocsUseDefaultLiteral() { + QueryContext queryContext = groupByQuery(EXTRACT_INDEX_WITH_DEFAULT, null); + + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + Map flattenedDocsByValue = new HashMap<>(); + flattenedDocsByValue.put("east", bitmap(0)); + when(jsonIndexReader.getMatchingFlattenedDocsMap("$.instance", null)).thenReturn(flattenedDocsByValue); + stubConvertedDocIds(jsonIndexReader, Map.of("east", bitmap(0))); + + // 3 total docs, only doc 0 has a value at the path; docs 1 and 2 should land in the "missing" group. + GroupByResultsBlock block = buildOperatorMatchAll(queryContext, jsonIndexReader, 3).nextBlock(); + + assertEquals(extractCounts(block), Map.of("east", 1L, "missing", 2L)); + } + + @Test + public void testMissingPathDocsUseNullWhenNullHandlingEnabled() { + QueryContext queryContext = groupByQuery(EXTRACT_INDEX, null, true); + + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + Map flattenedDocsByValue = new HashMap<>(); + flattenedDocsByValue.put("east", bitmap(0)); + when(jsonIndexReader.getMatchingFlattenedDocsMap("$.instance", null)).thenReturn(flattenedDocsByValue); + stubConvertedDocIds(jsonIndexReader, Map.of("east", bitmap(0))); + + GroupByResultsBlock block = buildOperatorMatchAll(queryContext, jsonIndexReader, 3).nextBlock(); + + Map counts = extractCountsAllowingNull(block); + assertEquals(counts.get("east"), Long.valueOf(1L)); + assertEquals(counts.get(null), Long.valueOf(2L)); + assertEquals(counts.size(), 2); + } + + @Test + public void testMissingPathDocsThrowWhenNoDefaultAndNullHandlingDisabled() { + QueryContext queryContext = groupByQuery(EXTRACT_INDEX, null); + + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + Map flattenedDocsByValue = new HashMap<>(); + flattenedDocsByValue.put("east", bitmap(0)); + when(jsonIndexReader.getMatchingFlattenedDocsMap("$.instance", null)).thenReturn(flattenedDocsByValue); + stubConvertedDocIds(jsonIndexReader, Map.of("east", bitmap(0))); + + RuntimeException ex = expectThrows(RuntimeException.class, + () -> buildOperatorMatchAll(queryContext, jsonIndexReader, 3).nextBlock()); + assertTrue(ex.getMessage().contains("Illegal Json Path")); + } + + @Test + public void testSamePathIsNullEmitsOnlyMissingPathGroup() { + QueryContext queryContext = groupByQuery(EXTRACT_INDEX_WITH_DEFAULT, SAME_PATH_IS_NULL_FILTER); + + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + // The operator does NOT push the IS_NULL filter into the index (it would rely on undocumented "returns empty" + // SPI behavior). Instead, it asks for all flattened docs at the path and lets the row-side filter operator + // produce the IS_NULL bitmap. + Map flattenedDocsByValue = new HashMap<>(); + flattenedDocsByValue.put("east", bitmap(100)); + when(jsonIndexReader.getMatchingFlattenedDocsMap("$.instance", null)).thenReturn(flattenedDocsByValue); + stubConvertedDocIds(jsonIndexReader, Map.of("east", bitmap(0))); + + // Filter operator returns docs 1 and 2 (docs without a value at the path), satisfying the IS_NULL filter. + GroupByResultsBlock block = buildOperator(queryContext, jsonIndexReader, bufferBitmap(1, 2), 3).nextBlock(); + + // Doc 0 has the "east" value but is filtered out by IS_NULL. Docs 1 and 2 have no value and fall into "missing". + assertEquals(extractCounts(block), Map.of("missing", 2L)); + verify(jsonIndexReader).getMatchingFlattenedDocsMap("$.instance", null); + verify(jsonIndexReader, never()).getMatchingFlattenedDocsMap("$.instance", SAME_PATH_IS_NULL_FILTER); + } + + @Test + public void testMultikeyArrayPathSplitsCountsAcrossElements() { + // A document with an implicit-array-traversal path lands in multiple value bitmaps after flattening. + // The operator reports per-element counts ("multikey" semantics), diverging from scalar jsonExtractScalar + // evaluation. This test locks in the documented behavior. + QueryContext queryContext = groupByQuery(EXTRACT_INDEX, null); + + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + Map flattenedDocsByValue = new HashMap<>(); + flattenedDocsByValue.put("east", bitmap(10, 11)); + flattenedDocsByValue.put("west", bitmap(20)); + when(jsonIndexReader.getMatchingFlattenedDocsMap("$.instance", null)).thenReturn(flattenedDocsByValue); + // Doc 0 has BOTH values "east" and "west" (array traversal), doc 1 has only "east". + stubConvertedDocIds(jsonIndexReader, Map.of( + "east", bitmap(0, 1), + "west", bitmap(0))); + + GroupByResultsBlock block = buildOperatorMatchAll(queryContext, jsonIndexReader, 2).nextBlock(); + + // Multikey: doc 0 contributes to both groups → counts sum to 3, exceeding total docs (2). + // The missing-path bucket is correctly 0 because both docs appear in the covered set (union). + assertEquals(extractCounts(block), Map.of("east", 2L, "west", 1L)); + } + + @Test + public void testEmptyFilterShortCircuitsToEmptyBlock() { + QueryContext queryContext = groupByQuery(EXTRACT_INDEX, CROSS_PATH_FILTER); + + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + + // Filter operator returns no docs. + GroupByResultsBlock block = buildOperator(queryContext, jsonIndexReader, bufferBitmap(), 3).nextBlock(); + + assertEquals(extractCounts(block), Map.of()); + // We never need to consult the index when the filter rules out every doc. + verify(jsonIndexReader, never()).getMatchingFlattenedDocsMap(any(), any()); + } + + @Test + public void testCanUseAcceptsThreeAndFourArgForms() { + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); + IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader); + + assertTrue(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null))); + assertTrue(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX_WITH_DEFAULT, null))); + } + + @Test + public void testCanUseRejectsMultipleAggregations() { + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); + IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader); + + QueryContext queryContext = QueryContextConverterUtils.getQueryContext( + "SELECT " + EXTRACT_INDEX + " AS k, COUNT(*), MAX(rating) FROM myTable GROUP BY k"); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext)); + } + + @Test + public void testCanUseRejectsNonCountAggregation() { + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); + IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader); + + QueryContext queryContext = QueryContextConverterUtils.getQueryContext( + "SELECT " + EXTRACT_INDEX + " AS k, SUM(rating) FROM myTable GROUP BY k"); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext)); + } + + @Test + public void testCanUseRejectsCountColumn() { + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); + IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader); + + QueryContext queryContext = QueryContextConverterUtils.getQueryContext( + "SELECT " + EXTRACT_INDEX + " AS k, COUNT(rating) FROM myTable GROUP BY k " + + "OPTION(enableNullHandling=true)"); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext)); + } + + @Test + public void testCanUseRejectsHaving() { + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); + IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader); + + QueryContext queryContext = QueryContextConverterUtils.getQueryContext( + "SELECT " + EXTRACT_INDEX + " AS k, COUNT(*) AS c FROM myTable GROUP BY k HAVING COUNT(*) > 10"); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext)); + } + + @Test + public void testCanUseRejectsMultipleGroupByKeys() { + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); + IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader); + + QueryContext queryContext = QueryContextConverterUtils.getQueryContext( + "SELECT " + EXTRACT_INDEX + " AS k, env, COUNT(*) FROM myTable GROUP BY k, env"); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext)); + } + + @Test + public void testCanUseRejectsWhenPathNotIndexed() { + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(false); + IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader); + + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null))); + } + + private static QueryContext groupByQuery(String groupKeyExpression, String jsonMatchFilter) { + return groupByQuery(groupKeyExpression, jsonMatchFilter, false); + } + + private static QueryContext groupByQuery(String groupKeyExpression, String jsonMatchFilter, + boolean enableNullHandling) { + StringBuilder sql = new StringBuilder("SELECT "); + sql.append(groupKeyExpression).append(" AS tag_value, COUNT(*) FROM myTable"); + if (jsonMatchFilter != null) { + sql.append(" WHERE JSON_MATCH(tags, '").append(jsonMatchFilter.replace("'", "''")).append("')"); + } + sql.append(" GROUP BY tag_value"); + if (enableNullHandling) { + sql.append(" OPTION(enableNullHandling=true)"); + } + return QueryContextConverterUtils.getQueryContext(sql.toString()); + } + + private static void stubConvertedDocIds(JsonIndexReader jsonIndexReader, + Map convertedDocIds) { + doAnswer(invocation -> { + @SuppressWarnings("unchecked") + Map docsByValue = (Map) invocation.getArgument(0); + docsByValue.clear(); + docsByValue.putAll(convertedDocIds); + return null; + }).when(jsonIndexReader).convertFlattenedDocIdsToDocIds(any()); + } + + private static IndexSegment buildCanUseIndexSegment(JsonIndexReader jsonIndexReader) { + DataSource dataSource = mock(DataSource.class); + when(dataSource.getJsonIndex()).thenReturn(jsonIndexReader); + + IndexSegment indexSegment = mock(IndexSegment.class); + when(indexSegment.getDataSourceNullable("tags")).thenReturn(dataSource); + return indexSegment; + } + + private static JsonIndexGroupByOperator buildOperator(QueryContext queryContext, JsonIndexReader jsonIndexReader, + MutableRoaringBitmap filterBitmap, int numDocs) { + return buildOperator(queryContext, jsonIndexReader, + new StaticBitmapFilterOperator(numDocs, filterBitmap), numDocs); + } + + /** + * Used by tests that simulate a query with no WHERE clause — the FilterPlanNode would produce a match-all operator. + */ + private static JsonIndexGroupByOperator buildOperatorMatchAll(QueryContext queryContext, + JsonIndexReader jsonIndexReader, int numDocs) { + return buildOperator(queryContext, jsonIndexReader, new MatchAllFilterOperator(numDocs), numDocs); + } + + private static JsonIndexGroupByOperator buildOperator(QueryContext queryContext, JsonIndexReader jsonIndexReader, + BaseFilterOperator filterOperator, int numDocs) { + SegmentMetadata segmentMetadata = mock(SegmentMetadata.class); + when(segmentMetadata.getTotalDocs()).thenReturn(numDocs); + + DataSource dataSource = mock(DataSource.class); + when(dataSource.getJsonIndex()).thenReturn(jsonIndexReader); + + IndexSegment indexSegment = mock(IndexSegment.class); + when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata); + when(indexSegment.getSegmentName()).thenReturn("testSegment"); + when(indexSegment.getDataSource(eq("tags"), any())).thenReturn(dataSource); + when(indexSegment.getDataSourceNullable("tags")).thenReturn(dataSource); + + return new JsonIndexGroupByOperator(indexSegment, new SegmentContext(indexSegment), queryContext, filterOperator); + } + + private static RoaringBitmap bitmap(int... docIds) { + RoaringBitmap bitmap = new RoaringBitmap(); + for (int docId : docIds) { + bitmap.add(docId); + } + return bitmap; + } + + private static MutableRoaringBitmap bufferBitmap(int... docIds) { + MutableRoaringBitmap bitmap = new MutableRoaringBitmap(); + for (int docId : docIds) { + bitmap.add(docId); + } + return bitmap; + } + + private static Map extractCounts(GroupByResultsBlock block) { + Map result = new HashMap<>(); + List records = block.getIntermediateRecords(); + if (records == null) { + return result; + } + for (IntermediateRecord record : records) { + Object[] values = record._record.getValues(); + result.put((String) values[0], (Long) values[1]); + } + return result; + } + + private static Map extractCountsAllowingNull(GroupByResultsBlock block) { + Map result = new HashMap<>(); + List records = block.getIntermediateRecords(); + if (records == null) { + return result; + } + for (IntermediateRecord record : records) { + Object[] values = record._record.getValues(); + result.put(values[0], (Long) values[1]); + } + return result; + } + + private static final class MatchAllFilterOperator extends BaseFilterOperator { + MatchAllFilterOperator(int numDocs) { + super(numDocs, false); + } + + @Override + public boolean isResultMatchingAll() { + return true; + } + + @Override + public List getChildOperators() { + return List.of(); + } + + @Override + protected org.apache.pinot.core.common.BlockDocIdSet getTrues() { + throw new UnsupportedOperationException("Match-all should never reach getTrues"); + } + + @Override + public String toExplainString() { + return "MATCH_ALL"; + } + } + + private static final class StaticBitmapFilterOperator extends BaseFilterOperator { + private final MutableRoaringBitmap _bitmap; + + StaticBitmapFilterOperator(int numDocs, MutableRoaringBitmap bitmap) { + super(numDocs, false); + _bitmap = bitmap; + } + + @Override + public boolean canProduceBitmaps() { + return true; + } + + @Override + public BitmapCollection getBitmaps() { + return new BitmapCollection(_numDocs, false, _bitmap); + } + + @Override + public List getChildOperators() { + return List.of(); + } + + @Override + protected org.apache.pinot.core.common.BlockDocIdSet getTrues() { + throw new UnsupportedOperationException("Bitmap path only"); + } + + @Override + public String toExplainString() { + return "STATIC_BITMAP_FILTER"; + } + } +} From 6d1cdd21026eed720f83afe4eb0fbddd44141432 Mon Sep 17 00:00:00 2001 From: Siddharth Teotia Date: Thu, 21 May 2026 15:32:31 -0700 Subject: [PATCH 2/4] Add selectivity gate to JsonIndexGroupByOperator and JMH benchmark to tune the threshold --- .../query/JsonIndexGroupByOperator.java | 61 ++++- .../pinot/core/plan/GroupByPlanNode.java | 20 +- .../query/JsonIndexGroupByOperatorTest.java | 87 +++++- .../perf/BenchmarkJsonIndexGroupByCount.java | 253 ++++++++++++++++++ .../impl/json/MutableJsonIndexImpl.java | 20 ++ .../json/ImmutableJsonIndexReader.java | 24 ++ .../spi/index/reader/JsonIndexReader.java | 13 + 7 files changed, 459 insertions(+), 19 deletions(-) create mode 100644 pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkJsonIndexGroupByCount.java diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperator.java index 992a42c1eb32..6ac6182e252d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.operator.query; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.CaseFormat; import com.google.common.base.Preconditions; import java.math.BigDecimal; @@ -112,12 +113,26 @@ public JsonIndexGroupByOperator(IndexSegment indexSegment, SegmentContext segmen countFn.getIntermediateResultColumnType()}); } + /** + * Selectivity threshold for routing to the dictionary-scan operator. The index path wins when path cardinality + * (D = distinct values seen at the path) is small relative to matched-doc count (M). Above + * {@code D > SELECTIVITY_THRESHOLD * M} the dictionary scan can be slower than the row-by-row baseline, so the + * query falls back to {@link GroupByOperator}. + * + *

The current value is a conservative placeholder. The final value is being established by a JMH benchmark + * (see {@code BenchmarkJsonIndexGroupByCount}); update this constant once the empirical crossover lands. + */ + @VisibleForTesting + static final double SELECTIVITY_THRESHOLD = 2.0; + /** * Returns {@code true} when the query is a single-group-by, single-{@code COUNT(*)} aggregation over a JSON-indexed - * path, with no {@code HAVING} clause and no filtered aggregations. All other shapes fall back to + * path, with no {@code HAVING} clause and no filtered aggregations, AND the path's distinct-value cardinality is + * small enough that the dictionary scan is expected to beat the row-by-row scan. All other shapes fall back to * {@link GroupByOperator}. */ - public static boolean canUse(IndexSegment indexSegment, QueryContext queryContext) { + public static boolean canUse(IndexSegment indexSegment, QueryContext queryContext, + BaseFilterOperator filterOperator) { List groupByExpressions = queryContext.getGroupByExpressions(); if (groupByExpressions == null || groupByExpressions.size() != 1) { return false; @@ -144,7 +159,47 @@ public static boolean canUse(IndexSegment indexSegment, QueryContext queryContex if (queryContext.hasFilteredAggregations()) { return false; } - return true; + return passesSelectivityGate(indexSegment, queryContext, filterOperator, groupByExpressions.get(0)); + } + + /** + * Compares path cardinality {@code D} against matched-doc count {@code M}; routes to the dictionary scan only when + * {@code D <= SELECTIVITY_THRESHOLD * M}. A null WHERE bitmap from the filter operator means "match all", in which + * case {@code M = totalDocs}. + */ + private static boolean passesSelectivityGate(IndexSegment indexSegment, QueryContext queryContext, + BaseFilterOperator filterOperator, ExpressionContext groupByExpression) { + ParsedJsonExtractIndex parsed = JsonExtractIndexUtils.parseJsonExtractIndex(groupByExpression); + if (parsed == null) { + return false; + } + DataSource dataSource = indexSegment.getDataSourceNullable(parsed._columnName); + if (dataSource == null) { + return false; + } + JsonIndexReader reader = JsonExtractIndexUtils.getJsonIndexReader(dataSource); + if (reader == null) { + return false; + } + long d = reader.getDistinctValueCountForPath(parsed._jsonPathString); + if (d <= 0) { + // No values at this path — index path is trivially cheaper than parsing 0 docs. + return true; + } + long m; + BaseFilterOperator.FilteredDocIds filteredDocIds = filterOperator.getFilteredDocIds(); + ImmutableRoaringBitmap whereBitmap = filteredDocIds.getDocIds(); + if (whereBitmap == null) { + // Match-all (no WHERE or a trivially true predicate): every doc would be visited by the baseline. + m = indexSegment.getSegmentMetadata().getTotalDocs(); + } else { + m = whereBitmap.getLongCardinality(); + } + if (m == 0) { + // No matching docs — baseline does zero parses. Skip the dictionary scan too. + return false; + } + return d <= (long) Math.ceil(SELECTIVITY_THRESHOLD * m); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java index 1d52a3d56bc9..348e3ffe6bb4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java @@ -47,13 +47,18 @@ public GroupByPlanNode(SegmentContext segmentContext, QueryContext queryContext) @Override public Operator run() { assert _queryContext.getAggregationFunctions() != null && _queryContext.getGroupByExpressions() != null; - // Prefer the index-based GROUP BY when the shape is GROUP BY jsonExtract...(col, path, type[, default]) + COUNT(*) - // on a column whose JSON index covers the path. canUse() already excludes filtered aggregations and HAVING. - if (JsonIndexGroupByOperator.canUse(_indexSegment, _queryContext)) { - BaseFilterOperator filterOperator = new FilterPlanNode(_segmentContext, _queryContext).run(); + if (_queryContext.hasFilteredAggregations()) { + return buildFilteredGroupByPlan(); + } + // Build the filter operator once and reuse it for either the index-based fast path or the default plan. + // JsonIndexGroupByOperator.canUse(...) inspects the filter operator's WHERE bitmap to apply a selectivity + // gate (skip the index path when path cardinality is too high relative to matched-doc count). + FilterPlanNode filterPlanNode = new FilterPlanNode(_segmentContext, _queryContext); + BaseFilterOperator filterOperator = filterPlanNode.run(); + if (JsonIndexGroupByOperator.canUse(_indexSegment, _queryContext, filterOperator)) { return new JsonIndexGroupByOperator(_indexSegment, _segmentContext, _queryContext, filterOperator); } - return _queryContext.hasFilteredAggregations() ? buildFilteredGroupByPlan() : buildNonFilteredGroupByPlan(); + return buildNonFilteredGroupByPlan(filterPlanNode, filterOperator); } private FilteredGroupByOperator buildFilteredGroupByPlan() { @@ -62,9 +67,8 @@ private FilteredGroupByOperator buildFilteredGroupByPlan() { _indexSegment.getSegmentMetadata().getTotalDocs()); } - private GroupByOperator buildNonFilteredGroupByPlan() { - FilterPlanNode filterPlanNode = new FilterPlanNode(_segmentContext, _queryContext); - BaseFilterOperator filterOperator = filterPlanNode.run(); + private GroupByOperator buildNonFilteredGroupByPlan(FilterPlanNode filterPlanNode, + BaseFilterOperator filterOperator) { AggregationFunctionUtils.AggregationInfo aggregationInfo = AggregationFunctionUtils.buildAggregationInfo(_segmentContext, _queryContext, _queryContext.getAggregationFunctions(), _queryContext.getFilter(), filterOperator, diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperatorTest.java index 67ce7e3d69b5..c2c973575394 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperatorTest.java @@ -240,8 +240,9 @@ public void testCanUseAcceptsThreeAndFourArgForms() { when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader); - assertTrue(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null))); - assertTrue(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX_WITH_DEFAULT, null))); + assertTrue(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null), matchAll(1000))); + assertTrue(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX_WITH_DEFAULT, null), + matchAll(1000))); } @Test @@ -252,7 +253,7 @@ public void testCanUseRejectsMultipleAggregations() { QueryContext queryContext = QueryContextConverterUtils.getQueryContext( "SELECT " + EXTRACT_INDEX + " AS k, COUNT(*), MAX(rating) FROM myTable GROUP BY k"); - assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext)); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext, matchAll(1000))); } @Test @@ -263,7 +264,7 @@ public void testCanUseRejectsNonCountAggregation() { QueryContext queryContext = QueryContextConverterUtils.getQueryContext( "SELECT " + EXTRACT_INDEX + " AS k, SUM(rating) FROM myTable GROUP BY k"); - assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext)); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext, matchAll(1000))); } @Test @@ -275,7 +276,7 @@ public void testCanUseRejectsCountColumn() { QueryContext queryContext = QueryContextConverterUtils.getQueryContext( "SELECT " + EXTRACT_INDEX + " AS k, COUNT(rating) FROM myTable GROUP BY k " + "OPTION(enableNullHandling=true)"); - assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext)); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext, matchAll(1000))); } @Test @@ -286,7 +287,7 @@ public void testCanUseRejectsHaving() { QueryContext queryContext = QueryContextConverterUtils.getQueryContext( "SELECT " + EXTRACT_INDEX + " AS k, COUNT(*) AS c FROM myTable GROUP BY k HAVING COUNT(*) > 10"); - assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext)); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext, matchAll(1000))); } @Test @@ -297,7 +298,7 @@ public void testCanUseRejectsMultipleGroupByKeys() { QueryContext queryContext = QueryContextConverterUtils.getQueryContext( "SELECT " + EXTRACT_INDEX + " AS k, env, COUNT(*) FROM myTable GROUP BY k, env"); - assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext)); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext, matchAll(1000))); } @Test @@ -306,7 +307,65 @@ public void testCanUseRejectsWhenPathNotIndexed() { when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(false); IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader); - assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null))); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null), matchAll(1000))); + } + + /// Selectivity-gate tests below. SELECTIVITY_THRESHOLD = 2.0 (placeholder until JMH bench lands). + + @Test + public void testCanUseRejectsHighCardinalityRelativeToMatchedDocs() { + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); + // Path cardinality 1000, segment total 100 → D >> SELECTIVITY_THRESHOLD * M, bail. + when(jsonIndexReader.getDistinctValueCountForPath("$.instance")).thenReturn(1000L); + IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader, 100); + + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null), matchAll(100))); + } + + @Test + public void testCanUseAcceptsAtSelectivityBoundary() { + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); + // D = floor(SELECTIVITY_THRESHOLD * M) = 2 * 100 = 200 → exactly at the boundary, route to the index. + when(jsonIndexReader.getDistinctValueCountForPath("$.instance")).thenReturn(200L); + IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader, 100); + + assertTrue(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null), matchAll(100))); + } + + @Test + public void testCanUseRejectsJustAboveSelectivityBoundary() { + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); + // D = SELECTIVITY_THRESHOLD * M + 1 → just above the boundary, bail. + when(jsonIndexReader.getDistinctValueCountForPath("$.instance")).thenReturn(201L); + IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader, 100); + + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null), matchAll(100))); + } + + @Test + public void testCanUseRejectsZeroMatchingDocs() { + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); + when(jsonIndexReader.getDistinctValueCountForPath("$.instance")).thenReturn(5L); + IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader, 100); + + // Empty WHERE bitmap → M = 0 → no docs to count; the default operator handles this trivially. + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null), + new StaticBitmapFilterOperator(100, new MutableRoaringBitmap()))); + } + + @Test + public void testCanUseAcceptsWhenPathHasNoValues() { + JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); + when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); + // D = 0 (path not present in the index) → still cheaper than parsing every matched doc. + when(jsonIndexReader.getDistinctValueCountForPath("$.instance")).thenReturn(0L); + IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader, 100); + + assertTrue(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null), matchAll(100))); } private static QueryContext groupByQuery(String groupKeyExpression, String jsonMatchFilter) { @@ -339,14 +398,26 @@ private static void stubConvertedDocIds(JsonIndexReader jsonIndexReader, } private static IndexSegment buildCanUseIndexSegment(JsonIndexReader jsonIndexReader) { + return buildCanUseIndexSegment(jsonIndexReader, 1000); + } + + private static IndexSegment buildCanUseIndexSegment(JsonIndexReader jsonIndexReader, int totalDocs) { DataSource dataSource = mock(DataSource.class); when(dataSource.getJsonIndex()).thenReturn(jsonIndexReader); + SegmentMetadata segmentMetadata = mock(SegmentMetadata.class); + when(segmentMetadata.getTotalDocs()).thenReturn(totalDocs); + IndexSegment indexSegment = mock(IndexSegment.class); when(indexSegment.getDataSourceNullable("tags")).thenReturn(dataSource); + when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata); return indexSegment; } + private static BaseFilterOperator matchAll(int numDocs) { + return new MatchAllFilterOperator(numDocs); + } + private static JsonIndexGroupByOperator buildOperator(QueryContext queryContext, JsonIndexReader jsonIndexReader, MutableRoaringBitmap filterBitmap, int numDocs) { return buildOperator(queryContext, jsonIndexReader, diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkJsonIndexGroupByCount.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkJsonIndexGroupByCount.java new file mode 100644 index 000000000000..3e28fd9632cb --- /dev/null +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkJsonIndexGroupByCount.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.perf; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.operator.filter.BaseFilterOperator; +import org.apache.pinot.core.operator.query.GroupByOperator; +import org.apache.pinot.core.operator.query.JsonIndexGroupByOperator; +import org.apache.pinot.core.plan.FilterPlanNode; +import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; + + +/** + * JMH benchmark for {@code GROUP BY jsonExtractIndex(...) + COUNT(*)} comparing the dictionary-scan path + * ({@link JsonIndexGroupByOperator}) against the row-by-row baseline ({@link GroupByOperator}). + * + *

The benchmark exists to settle the {@code SELECTIVITY_THRESHOLD} constant in + * {@link JsonIndexGroupByOperator}. We sweep two dimensions: + *

+ * For each combination we time both operators, with no planner gating, and report the crossover point. + * + *

Both operators are constructed directly (not via {@code GroupByPlanNode}) so the selectivity gate does not + * influence the measurement — we want the raw cost of each path at every (D, M) cell. + */ +@State(Scope.Benchmark) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(1) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +public class BenchmarkJsonIndexGroupByCount { + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "BenchmarkJsonIndexGroupByCount"); + private static final String TABLE_NAME = "myTable"; + private static final String SEGMENT_NAME = "jsonIndexGroupByCountSegment"; + private static final String ID_COLUMN = "id"; + private static final String TAGS_COLUMN = "tags"; + // Same-column filter that selects a subset of docs based on the synthetic "match" field embedded in the JSON. + // We use a cross-path filter so the index lookup for the GROUP BY path runs unfiltered, isolating the cost + // model of "dictionary scan of D entries vs row-by-row scan of M docs". + private static final String FILTER_CLAUSE = + "WHERE JSON_MATCH(tags, '\"$.matchKey\" = ''yes''')"; + private static final String BASE_QUERY = + "SELECT JSON_EXTRACT_INDEX(tags, '$.instance', 'STRING') AS k, COUNT(*) FROM " + TABLE_NAME + + " " + FILTER_CLAUSE + " GROUP BY k LIMIT 1000000"; + + private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE) + .setTableName(TABLE_NAME) + .setJsonIndexColumns(Collections.singletonList(TAGS_COLUMN)) + .build(); + + private static final Schema SCHEMA = new Schema.SchemaBuilder() + .setSchemaName(TABLE_NAME) + .addSingleValueDimension(ID_COLUMN, FieldSpec.DataType.INT) + .addSingleValueDimension(TAGS_COLUMN, FieldSpec.DataType.JSON) + .build(); + + @Param({"500000"}) + int _numRows; + + // Sweep path cardinality across two orders of magnitude. The crossover where index path stops winning sits + // somewhere in this range for typical JSON sizes. + @Param({"10", "100", "1000", "10000", "100000", "500000"}) + int _pathCardinality; + + // Matched-doc fraction: 1% (very selective), 10%, 50%, 100% (no filter). + @Param({"0.01", "0.1", "0.5", "1.0"}) + double _matchedFraction; + + private IndexSegment _indexSegment; + private SegmentContext _segmentContext; + private QueryContext _queryContext; + private BaseFilterOperator _filterOperator; + private AggregationFunctionUtils.AggregationInfo _aggregationInfo; + private int _expectedDistinctGroupCount; + + @Setup(Level.Trial) + public void setup() + throws Exception { + Preconditions.checkState(_pathCardinality <= _numRows, + "pathCardinality (%s) must not exceed numRows (%s)", _pathCardinality, _numRows); + + FileUtils.deleteQuietly(INDEX_DIR); + buildSegment(); + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(TABLE_CONFIG, SCHEMA); + _indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), indexLoadingConfig); + Preconditions.checkState(_indexSegment.getDataSource(TAGS_COLUMN, SCHEMA).getJsonIndex() != null, + "Loaded segment must expose JSON index on %s", TAGS_COLUMN); + _segmentContext = new SegmentContext(_indexSegment); + + PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(BASE_QUERY); + _queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery); + _queryContext.setSchema(SCHEMA); + + FilterPlanNode filterPlanNode = new FilterPlanNode(_segmentContext, _queryContext); + _filterOperator = filterPlanNode.run(); + _aggregationInfo = AggregationFunctionUtils.buildAggregationInfo(_segmentContext, _queryContext, + _queryContext.getAggregationFunctions(), _queryContext.getFilter(), _filterOperator, + filterPlanNode.getPredicateEvaluators()); + + // Cross-validate the two operators on a sanity run before measuring. + int baselineGroups = numGroupsFromBaseline(); + int indexGroups = numGroupsFromIndex(); + Preconditions.checkState(baselineGroups == indexGroups, + "Baseline and index operators disagreed on group count. pathCardinality=%s matchedFraction=%s " + + "baseline=%s index=%s", _pathCardinality, _matchedFraction, baselineGroups, indexGroups); + _expectedDistinctGroupCount = baselineGroups; + } + + @Benchmark + public int baselineGroupByOperator() { + return numGroupsFromBaseline(); + } + + @Benchmark + public int jsonIndexGroupByOperator() { + return numGroupsFromIndex(); + } + + @TearDown(Level.Trial) + public void tearDown() { + if (_indexSegment != null) { + _indexSegment.destroy(); + } + FileUtils.deleteQuietly(INDEX_DIR); + } + + private int numGroupsFromBaseline() { + GroupByOperator operator = new GroupByOperator(_queryContext, _aggregationInfo, + _indexSegment.getSegmentMetadata().getTotalDocs()); + return countGroups(operator); + } + + private int numGroupsFromIndex() { + JsonIndexGroupByOperator operator = new JsonIndexGroupByOperator(_indexSegment, _segmentContext, _queryContext, + _filterOperator); + return countGroups(operator); + } + + private static int countGroups(Operator operator) { + GroupByResultsBlock block = operator.nextBlock(); + if (block.getIntermediateRecords() != null) { + return block.getIntermediateRecords().size(); + } + if (block.getAggregationGroupByResult() != null) { + return block.getAggregationGroupByResult().getNumGroups(); + } + return 0; + } + + private void buildSegment() + throws Exception { + SegmentGeneratorConfig config = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA); + config.setOutDir(INDEX_DIR.getPath()); + config.setTableName(TABLE_NAME); + config.setSegmentName(SEGMENT_NAME); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + try (RecordReader recordReader = new GeneratedDataRecordReader(createData())) { + driver.init(config, recordReader); + driver.build(); + } + } + + private LazyDataGenerator createData() { + // Round the matched-doc count so that an integer number of docs match. The "matchKey" field flips between + // "yes" and "no" based on whether the row index falls below the matched-doc threshold. + int matchedDocs = (int) Math.round(_numRows * _matchedFraction); + return new LazyDataGenerator() { + @Override + public int size() { + return _numRows; + } + + @Override + public GenericRow next(GenericRow row, int index) { + int instance = index % _pathCardinality; + String matchValue = index < matchedDocs ? "yes" : "no"; + row.putValue(ID_COLUMN, index); + row.putValue(TAGS_COLUMN, + "{\"instance\":\"instance-" + instance + "\",\"matchKey\":\"" + matchValue + "\"}"); + return row; + } + + @Override + public void rewind() { + } + }; + } + + public static void main(String[] args) + throws Exception { + new Runner(new OptionsBuilder() + .include(BenchmarkJsonIndexGroupByCount.class.getSimpleName()) + .build()).run(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java index 9e0583ab0bf7..b65d5f1ee417 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java @@ -676,6 +676,26 @@ private Map getMatchingKeysMap(String key) { key + JsonIndexCreator.KEY_VALUE_SEPARATOR_NEXT_CHAR, false); } + @Override + public long getDistinctValueCountForPath(String jsonPathKey) { + if (jsonPathKey.startsWith("$")) { + jsonPathKey = jsonPathKey.substring(1); + } else { + jsonPathKey = JsonUtils.KEY_SEPARATOR + jsonPathKey; + } + _readLock.lock(); + try { + Pair result = getKeyAndFlattenedDocIds(jsonPathKey); + // Array-index paths require posting-list intersection — defer to the materializing default. + if (result.getRight() != null) { + return MutableJsonIndex.super.getDistinctValueCountForPath(jsonPathKey); + } + return getMatchingKeysMap(result.getLeft()).size(); + } finally { + _readLock.unlock(); + } + } + @Override public String[][] getValuesMV(int[] docIds, int length, Map valueToMatchingFlattenedDocs) { String[][] result = new String[length][]; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java index fc03e03c614e..e7354f1b80b2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java @@ -814,6 +814,30 @@ public String[] getValuesSV(int[] docIds, int length, Map return values; } + @Override + public long getDistinctValueCountForPath(String jsonPathKey) { + // Normalize the path key (same logic as getMatchingFlattenedDocsMap) + if (_version == BaseJsonIndexCreator.VERSION_2) { + if (jsonPathKey.startsWith("$")) { + jsonPathKey = jsonPathKey.substring(1); + } else { + jsonPathKey = JsonUtils.KEY_SEPARATOR + jsonPathKey; + } + } else if (jsonPathKey.startsWith("$.")) { + jsonPathKey = jsonPathKey.substring(2); + } + Pair pair = getKeyAndFlattenedDocIds(jsonPathKey); + // Array-index paths require posting-list intersection — fall back to materialization for correctness. + if (pair.getRight() != null) { + return JsonIndexReader.super.getDistinctValueCountForPath(jsonPathKey); + } + int[] dictIds = getDictIdRangeForKey(pair.getLeft()); + if (dictIds[0] < 0) { + return 0; + } + return (long) dictIds[1] - dictIds[0]; + } + /** * For a JSON key path, returns an int array of the range [min, max] spanning all values for the JSON key path */ diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java index 06a095b4e206..251e5a4e30a0 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java @@ -101,4 +101,17 @@ default Set getMatchingDistinctValues(String key, @Nullable String filte default boolean isPathIndexed(String jsonPath) { return true; } + + /** + * Returns the number of distinct values seen at the given JSON path across all documents in the segment, or a + * non-negative upper bound that the caller can use as a cardinality estimate. The OSS immutable implementation + * answers this in O(log N) via the sorted dictionary range; the default implementation materializes the value set + * to preserve correctness for third-party readers and is intentionally slow on high-cardinality paths. + * + *

Callers use this to decide whether a dictionary-scan execution plan is cheaper than the row-by-row scan + * baseline. A path that does not appear in the index returns 0. + */ + default long getDistinctValueCountForPath(String jsonPath) { + return getMatchingDistinctValues(jsonPath, null).size(); + } } From 90228df7332467c869ddaa6cc05e9d42f7c413bd Mon Sep 17 00:00:00 2001 From: Siddharth Teotia Date: Thu, 21 May 2026 16:39:20 -0700 Subject: [PATCH 3/4] Fix double-normalization in getDistinctValueCountForPath fallback; fix JMH bench per-invocation state --- .../operator/query/JsonExtractIndexUtils.java | 6 +- .../perf/BenchmarkJsonIndexGroupByCount.java | 29 +++++---- .../impl/json/MutableJsonIndexImpl.java | 14 +++- .../json/ImmutableJsonIndexReader.java | 7 +- .../local/segment/index/JsonIndexTest.java | 64 ++++++++++++++++++- .../spi/index/reader/JsonIndexReader.java | 20 ++++-- 6 files changed, 115 insertions(+), 25 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonExtractIndexUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonExtractIndexUtils.java index ea042c1593c4..245264c6eb5f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonExtractIndexUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonExtractIndexUtils.java @@ -66,7 +66,7 @@ public ParsedJsonExtractIndex(String columnName, String jsonPathString, FieldSpe } /** - * Parses the given expression as a 3/4-arg scalar {@code jsonExtractIndex} or {@code jsonExtractScalar} call. + * Parses the given expression as a 3/4-arg scalar {@code jsonExtractIndex} call. * Returns {@code null} when the expression has a different shape (wrong arity, non-literal args, MV result type, * unsupported scalar type, {@code [*]} wildcard, or malformed JSON path). * @@ -161,8 +161,8 @@ public static JsonIndexReader getJsonIndexReader(DataSource dataSource) { } /** - * Returns {@code true} when the expression is a parseable {@code jsonExtractIndex}/{@code jsonExtractScalar} - * call and the referenced column has a usable JSON index covering the path. + * Returns {@code true} when the expression is a parseable {@code jsonExtractIndex} call and the referenced column + * has a usable JSON index covering the path. */ public static boolean canUseJsonIndex(IndexSegment indexSegment, ExpressionContext expr) { ParsedJsonExtractIndex parsed = parseJsonExtractIndex(expr); diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkJsonIndexGroupByCount.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkJsonIndexGroupByCount.java index 3e28fd9632cb..aeb1e1fb8cde 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkJsonIndexGroupByCount.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkJsonIndexGroupByCount.java @@ -126,8 +126,6 @@ public class BenchmarkJsonIndexGroupByCount { private IndexSegment _indexSegment; private SegmentContext _segmentContext; private QueryContext _queryContext; - private BaseFilterOperator _filterOperator; - private AggregationFunctionUtils.AggregationInfo _aggregationInfo; private int _expectedDistinctGroupCount; @Setup(Level.Trial) @@ -148,13 +146,8 @@ public void setup() _queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery); _queryContext.setSchema(SCHEMA); - FilterPlanNode filterPlanNode = new FilterPlanNode(_segmentContext, _queryContext); - _filterOperator = filterPlanNode.run(); - _aggregationInfo = AggregationFunctionUtils.buildAggregationInfo(_segmentContext, _queryContext, - _queryContext.getAggregationFunctions(), _queryContext.getFilter(), _filterOperator, - filterPlanNode.getPredicateEvaluators()); - - // Cross-validate the two operators on a sanity run before measuring. + // Cross-validate the two operators on a sanity run before measuring. Each call constructs fresh per-invocation + // operator state, identical to what the @Benchmark methods do. int baselineGroups = numGroupsFromBaseline(); int indexGroups = numGroupsFromIndex(); Preconditions.checkState(baselineGroups == indexGroups, @@ -182,14 +175,28 @@ public void tearDown() { } private int numGroupsFromBaseline() { - GroupByOperator operator = new GroupByOperator(_queryContext, _aggregationInfo, + // Build fresh per-invocation state. The baseline GroupByOperator consumes its DocIdSetOperator to exhaustion on + // the first nextBlock call; reusing the AggregationInfo / project operator across iterations would have every + // subsequent invocation see an empty doc stream and return in ~1us, making the bench meaningless. + FilterPlanNode filterPlanNode = new FilterPlanNode(_segmentContext, _queryContext); + BaseFilterOperator filterOperator = filterPlanNode.run(); + AggregationFunctionUtils.AggregationInfo aggregationInfo = + AggregationFunctionUtils.buildAggregationInfo(_segmentContext, _queryContext, + _queryContext.getAggregationFunctions(), _queryContext.getFilter(), filterOperator, + filterPlanNode.getPredicateEvaluators()); + GroupByOperator operator = new GroupByOperator(_queryContext, aggregationInfo, _indexSegment.getSegmentMetadata().getTotalDocs()); return countGroups(operator); } private int numGroupsFromIndex() { + // Build fresh per-invocation state, matching the baseline path's per-invocation cost. JsonIndexGroupByOperator + // does not consume the filter operator across iterations (it uses cached getFilteredDocIds), but building both + // paths identically keeps the comparison fair. + FilterPlanNode filterPlanNode = new FilterPlanNode(_segmentContext, _queryContext); + BaseFilterOperator filterOperator = filterPlanNode.run(); JsonIndexGroupByOperator operator = new JsonIndexGroupByOperator(_indexSegment, _segmentContext, _queryContext, - _filterOperator); + filterOperator); return countGroups(operator); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java index b65d5f1ee417..db9587a0acf8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java @@ -678,6 +678,9 @@ private Map getMatchingKeysMap(String key) { @Override public long getDistinctValueCountForPath(String jsonPathKey) { + // Hold on to the caller-supplied path; the fallback delegates to getMatchingDistinctValues, which does its own + // normalization. Forwarding the already-normalized key would double-prefix and miss every entry. + String originalPathKey = jsonPathKey; if (jsonPathKey.startsWith("$")) { jsonPathKey = jsonPathKey.substring(1); } else { @@ -686,9 +689,16 @@ public long getDistinctValueCountForPath(String jsonPathKey) { _readLock.lock(); try { Pair result = getKeyAndFlattenedDocIds(jsonPathKey); - // Array-index paths require posting-list intersection — defer to the materializing default. + // Array-index paths require posting-list intersection — defer to the materializing default. Release the read + // lock first because the default path re-enters this class via getMatchingFlattenedDocsMap, which acquires + // the read lock itself (the lock is reentrant but releasing keeps the lock-discipline obvious for readers). if (result.getRight() != null) { - return MutableJsonIndex.super.getDistinctValueCountForPath(jsonPathKey); + _readLock.unlock(); + try { + return MutableJsonIndex.super.getDistinctValueCountForPath(originalPathKey); + } finally { + _readLock.lock(); + } } return getMatchingKeysMap(result.getLeft()).size(); } finally { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java index e7354f1b80b2..f53faf5802fe 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java @@ -816,7 +816,10 @@ public String[] getValuesSV(int[] docIds, int length, Map @Override public long getDistinctValueCountForPath(String jsonPathKey) { - // Normalize the path key (same logic as getMatchingFlattenedDocsMap) + // Hold on to the caller-supplied path; fallback paths re-enter this class via getMatchingDistinctValues, which + // does its own normalization. Forwarding the already-normalized key would double-prefix the KEY_SEPARATOR and + // miss every entry. + String originalPathKey = jsonPathKey; if (_version == BaseJsonIndexCreator.VERSION_2) { if (jsonPathKey.startsWith("$")) { jsonPathKey = jsonPathKey.substring(1); @@ -829,7 +832,7 @@ public long getDistinctValueCountForPath(String jsonPathKey) { Pair pair = getKeyAndFlattenedDocIds(jsonPathKey); // Array-index paths require posting-list intersection — fall back to materialization for correctness. if (pair.getRight() != null) { - return JsonIndexReader.super.getDistinctValueCountForPath(jsonPathKey); + return JsonIndexReader.super.getDistinctValueCountForPath(originalPathKey); } int[] dictIds = getDictIdRangeForKey(pair.getLeft()); if (dictIds[0] < 0) { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java index b576a4439426..2ed336970b49 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java @@ -277,6 +277,67 @@ private void assertDocIds(JsonIndexReader indexReader, String filter, int[] expe } } + @Test + public void testGetDistinctValueCountForPath() + throws Exception { + // @formatter:off + // CHECKSTYLE:OFF + String[] records = new String[]{ + "{\"name\":\"adam\"," + + "\"addresses\":[{\"country\":\"us\"},{\"country\":\"ca\"}]," + + "\"skills\":[\"english\",\"programming\"]}", + "{\"name\":\"bob\"," + + "\"addresses\":[{\"country\":\"ca\"},{\"country\":\"uk\"}]," + + "\"skills\":[\"english\",\"math\"]}", + "{\"name\":\"carl\"," + + "\"addresses\":[{\"country\":\"jp\"},{\"country\":\"kr\"}]," + + "\"skills\":[\"japanese\"]}" + }; + // CHECKSTYLE:ON + // @formatter:on + JsonIndexConfig jsonIndexConfig = getIndexConfig(); + + createIndex(true, jsonIndexConfig, records); + File onHeapIndexFile = new File(INDEX_DIR, ON_HEAP_COLUMN_NAME + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION); + createIndex(false, jsonIndexConfig, records); + File offHeapIndexFile = new File(INDEX_DIR, OFF_HEAP_COLUMN_NAME + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION); + + try (PinotDataBuffer onHeapBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(onHeapIndexFile); + PinotDataBuffer offHeapBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile); + JsonIndexReader onHeapReader = new ImmutableJsonIndexReader(onHeapBuffer, records.length); + JsonIndexReader offHeapReader = new ImmutableJsonIndexReader(offHeapBuffer, records.length); + MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl(jsonIndexConfig, "table__0__1", "col")) { + for (String record : records) { + mutableJsonIndex.add(record); + } + JsonIndexReader[] indexReaders = new JsonIndexReader[]{onHeapReader, offHeapReader, mutableJsonIndex}; + for (JsonIndexReader reader : indexReaders) { + String label = reader.getClass().getSimpleName(); + // Scalar path: exercises the cheap dict-range / TreeMap-subrange fast path. Exact count. + assertEquals(reader.getDistinctValueCountForPath("$.name"), 3L, + label + " distinct count at $.name"); + // Wildcard array path normalizes to a value-only key with no bitmap constraint, so it also takes the fast + // path and returns the exact count. + assertEquals(reader.getDistinctValueCountForPath("$.skills[*]"), 4L, + label + " distinct count at $.skills[*]"); + // Array-index paths require posting-list intersection and exercise the fallback that previously returned 0 + // for both reader implementations due to double-normalization of the path key. + // The SPI contract allows an upper bound (see Javadoc); collectValuesFromFlattenedDocsMap currently drops + // the array-index bitmap and returns the union across all indices (5 distinct countries here, not the 3 at + // index 0/1). We assert >= the true value so this test (a) fails on the previous 0-return bug, and (b) + // continues to pass if collectValuesFromFlattenedDocsMap is later tightened to the exact 3. + assertTrue(reader.getDistinctValueCountForPath("$.addresses[0].country") >= 3L, + label + " distinct count at $.addresses[0].country (upper bound)"); + assertTrue(reader.getDistinctValueCountForPath("$.addresses[1].country") >= 3L, + label + " distinct count at $.addresses[1].country (upper bound)"); + // Path absent from the index returns 0 via the fast path; this is the "trust the index, skip scan" signal + // used by JsonIndexGroupByOperator's selectivity gate. + assertEquals(reader.getDistinctValueCountForPath("$.does_not_exist"), 0L, + label + " distinct count at non-existent path"); + } + } + } + @Test public void testLargeIndex() throws Exception { @@ -408,7 +469,8 @@ private void createIndex(boolean createOnHeap, JsonIndexConfig jsonIndexConfig, * @param continueOnError whether continueOnError should be enabled or disabled * @throws IOException on error */ - private void createIndex(boolean createOnHeap, JsonIndexConfig jsonIndexConfig, String[] records, boolean continueOnError) + private void createIndex(boolean createOnHeap, JsonIndexConfig jsonIndexConfig, String[] records, + boolean continueOnError) throws IOException { try (JsonIndexCreator indexCreator = createOnHeap ? new OnHeapJsonIndexCreator(INDEX_DIR, ON_HEAP_COLUMN_NAME, "myTable_OFFLINE", continueOnError, diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java index 251e5a4e30a0..7f9699d345d5 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java @@ -103,13 +103,21 @@ default boolean isPathIndexed(String jsonPath) { } /** - * Returns the number of distinct values seen at the given JSON path across all documents in the segment, or a - * non-negative upper bound that the caller can use as a cardinality estimate. The OSS immutable implementation - * answers this in O(log N) via the sorted dictionary range; the default implementation materializes the value set - * to preserve correctness for third-party readers and is intentionally slow on high-cardinality paths. + * Returns a non-negative upper bound on the number of distinct values seen at the given JSON path across all + * documents in the segment. Implementations are free to return the exact count when it is cheap to compute; a + * looser upper bound is permitted (and required for some array-indexed paths, see below). * - *

Callers use this to decide whether a dictionary-scan execution plan is cheaper than the row-by-row scan - * baseline. A path that does not appear in the index returns 0. + *

The OSS immutable implementation answers scalar / wildcard paths in O(log N) via the sorted dictionary + * range. Array-indexed paths (for example {@code $.items[0].name}) fall back to the materializing default and + * return the union of distinct values across all indices, which is a strict upper bound on the per-index count. + * The default implementation materializes the full value set via {@link #getMatchingDistinctValues}; it is + * correct but intentionally slow on high-cardinality paths. + * + *

Callers use this value as a cardinality estimate to decide whether a dictionary-scan execution plan is + * cheaper than a row-by-row scan baseline. Because the contract is an upper bound, an estimate that is too high + * is perf-conservative (it may pick the slower row-scan path when the dictionary scan would have won); an + * estimate that is too low would be unsafe, so implementations must never under-report. A path that does not + * appear in the index returns 0. */ default long getDistinctValueCountForPath(String jsonPath) { return getMatchingDistinctValues(jsonPath, null).size(); From 711e45b3eb63d2983b9b61fa47220861e9e76403 Mon Sep 17 00:00:00 2001 From: Siddharth Teotia Date: Thu, 21 May 2026 17:19:18 -0700 Subject: [PATCH 4/4] Drop selectivity gate; bench showed lopsided wins for the dictionary-scan path --- .../query/JsonIndexGroupByOperator.java | 69 +++------------ .../pinot/core/plan/GroupByPlanNode.java | 21 ++--- .../query/JsonIndexGroupByOperatorTest.java | 87 ++----------------- .../impl/json/MutableJsonIndexImpl.java | 30 ------- .../json/ImmutableJsonIndexReader.java | 27 ------ .../local/segment/index/JsonIndexTest.java | 61 ------------- .../spi/index/reader/JsonIndexReader.java | 21 ----- 7 files changed, 28 insertions(+), 288 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperator.java index 6ac6182e252d..8244747e28d7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperator.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.core.operator.query; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.CaseFormat; import com.google.common.base.Preconditions; import java.math.BigDecimal; @@ -113,26 +112,20 @@ public JsonIndexGroupByOperator(IndexSegment indexSegment, SegmentContext segmen countFn.getIntermediateResultColumnType()}); } - /** - * Selectivity threshold for routing to the dictionary-scan operator. The index path wins when path cardinality - * (D = distinct values seen at the path) is small relative to matched-doc count (M). Above - * {@code D > SELECTIVITY_THRESHOLD * M} the dictionary scan can be slower than the row-by-row baseline, so the - * query falls back to {@link GroupByOperator}. - * - *

The current value is a conservative placeholder. The final value is being established by a JMH benchmark - * (see {@code BenchmarkJsonIndexGroupByCount}); update this constant once the empirical crossover lands. - */ - @VisibleForTesting - static final double SELECTIVITY_THRESHOLD = 2.0; - /** * Returns {@code true} when the query is a single-group-by, single-{@code COUNT(*)} aggregation over a JSON-indexed - * path, with no {@code HAVING} clause and no filtered aggregations, AND the path's distinct-value cardinality is - * small enough that the dictionary scan is expected to beat the row-by-row scan. All other shapes fall back to + * path, with no {@code HAVING} clause and no filtered aggregations. All other shapes fall back to * {@link GroupByOperator}. + * + *

No cost-model gate. Bench results (see + * {@code BenchmarkJsonIndexGroupByCount} and the reference memo on this branch) show the dictionary scan wins on + * most realistic workloads, with the only loss quadrant at very-selective WHERE clauses ({@code M ≲ 1% of segment}) + * combined with medium path cardinality ({@code D ∈ 1k..10k}), where it can be up to ~65% slower than the + * baseline. The wins (2-4× on loose WHEREs) dominate that worst case in aggregate, so the operator is always + * picked when the shape matches. Re-evaluate if production data shows the worst-case quadrant matters more than + * the bench suggested. */ - public static boolean canUse(IndexSegment indexSegment, QueryContext queryContext, - BaseFilterOperator filterOperator) { + public static boolean canUse(IndexSegment indexSegment, QueryContext queryContext) { List groupByExpressions = queryContext.getGroupByExpressions(); if (groupByExpressions == null || groupByExpressions.size() != 1) { return false; @@ -159,47 +152,7 @@ public static boolean canUse(IndexSegment indexSegment, QueryContext queryContex if (queryContext.hasFilteredAggregations()) { return false; } - return passesSelectivityGate(indexSegment, queryContext, filterOperator, groupByExpressions.get(0)); - } - - /** - * Compares path cardinality {@code D} against matched-doc count {@code M}; routes to the dictionary scan only when - * {@code D <= SELECTIVITY_THRESHOLD * M}. A null WHERE bitmap from the filter operator means "match all", in which - * case {@code M = totalDocs}. - */ - private static boolean passesSelectivityGate(IndexSegment indexSegment, QueryContext queryContext, - BaseFilterOperator filterOperator, ExpressionContext groupByExpression) { - ParsedJsonExtractIndex parsed = JsonExtractIndexUtils.parseJsonExtractIndex(groupByExpression); - if (parsed == null) { - return false; - } - DataSource dataSource = indexSegment.getDataSourceNullable(parsed._columnName); - if (dataSource == null) { - return false; - } - JsonIndexReader reader = JsonExtractIndexUtils.getJsonIndexReader(dataSource); - if (reader == null) { - return false; - } - long d = reader.getDistinctValueCountForPath(parsed._jsonPathString); - if (d <= 0) { - // No values at this path — index path is trivially cheaper than parsing 0 docs. - return true; - } - long m; - BaseFilterOperator.FilteredDocIds filteredDocIds = filterOperator.getFilteredDocIds(); - ImmutableRoaringBitmap whereBitmap = filteredDocIds.getDocIds(); - if (whereBitmap == null) { - // Match-all (no WHERE or a trivially true predicate): every doc would be visited by the baseline. - m = indexSegment.getSegmentMetadata().getTotalDocs(); - } else { - m = whereBitmap.getLongCardinality(); - } - if (m == 0) { - // No matching docs — baseline does zero parses. Skip the dictionary scan too. - return false; - } - return d <= (long) Math.ceil(SELECTIVITY_THRESHOLD * m); + return true; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java index 348e3ffe6bb4..41dae5ef88c7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/GroupByPlanNode.java @@ -47,18 +47,14 @@ public GroupByPlanNode(SegmentContext segmentContext, QueryContext queryContext) @Override public Operator run() { assert _queryContext.getAggregationFunctions() != null && _queryContext.getGroupByExpressions() != null; - if (_queryContext.hasFilteredAggregations()) { - return buildFilteredGroupByPlan(); - } - // Build the filter operator once and reuse it for either the index-based fast path or the default plan. - // JsonIndexGroupByOperator.canUse(...) inspects the filter operator's WHERE bitmap to apply a selectivity - // gate (skip the index path when path cardinality is too high relative to matched-doc count). - FilterPlanNode filterPlanNode = new FilterPlanNode(_segmentContext, _queryContext); - BaseFilterOperator filterOperator = filterPlanNode.run(); - if (JsonIndexGroupByOperator.canUse(_indexSegment, _queryContext, filterOperator)) { + // Prefer the index-based GROUP BY when the shape is GROUP BY jsonExtractIndex(col, path, type[, default]) + // + COUNT(*) on a column whose JSON index covers the path. canUse() already excludes filtered aggregations + // and HAVING. + if (JsonIndexGroupByOperator.canUse(_indexSegment, _queryContext)) { + BaseFilterOperator filterOperator = new FilterPlanNode(_segmentContext, _queryContext).run(); return new JsonIndexGroupByOperator(_indexSegment, _segmentContext, _queryContext, filterOperator); } - return buildNonFilteredGroupByPlan(filterPlanNode, filterOperator); + return _queryContext.hasFilteredAggregations() ? buildFilteredGroupByPlan() : buildNonFilteredGroupByPlan(); } private FilteredGroupByOperator buildFilteredGroupByPlan() { @@ -67,8 +63,9 @@ private FilteredGroupByOperator buildFilteredGroupByPlan() { _indexSegment.getSegmentMetadata().getTotalDocs()); } - private GroupByOperator buildNonFilteredGroupByPlan(FilterPlanNode filterPlanNode, - BaseFilterOperator filterOperator) { + private GroupByOperator buildNonFilteredGroupByPlan() { + FilterPlanNode filterPlanNode = new FilterPlanNode(_segmentContext, _queryContext); + BaseFilterOperator filterOperator = filterPlanNode.run(); AggregationFunctionUtils.AggregationInfo aggregationInfo = AggregationFunctionUtils.buildAggregationInfo(_segmentContext, _queryContext, _queryContext.getAggregationFunctions(), _queryContext.getFilter(), filterOperator, diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperatorTest.java index c2c973575394..67ce7e3d69b5 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/JsonIndexGroupByOperatorTest.java @@ -240,9 +240,8 @@ public void testCanUseAcceptsThreeAndFourArgForms() { when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader); - assertTrue(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null), matchAll(1000))); - assertTrue(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX_WITH_DEFAULT, null), - matchAll(1000))); + assertTrue(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null))); + assertTrue(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX_WITH_DEFAULT, null))); } @Test @@ -253,7 +252,7 @@ public void testCanUseRejectsMultipleAggregations() { QueryContext queryContext = QueryContextConverterUtils.getQueryContext( "SELECT " + EXTRACT_INDEX + " AS k, COUNT(*), MAX(rating) FROM myTable GROUP BY k"); - assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext, matchAll(1000))); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext)); } @Test @@ -264,7 +263,7 @@ public void testCanUseRejectsNonCountAggregation() { QueryContext queryContext = QueryContextConverterUtils.getQueryContext( "SELECT " + EXTRACT_INDEX + " AS k, SUM(rating) FROM myTable GROUP BY k"); - assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext, matchAll(1000))); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext)); } @Test @@ -276,7 +275,7 @@ public void testCanUseRejectsCountColumn() { QueryContext queryContext = QueryContextConverterUtils.getQueryContext( "SELECT " + EXTRACT_INDEX + " AS k, COUNT(rating) FROM myTable GROUP BY k " + "OPTION(enableNullHandling=true)"); - assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext, matchAll(1000))); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext)); } @Test @@ -287,7 +286,7 @@ public void testCanUseRejectsHaving() { QueryContext queryContext = QueryContextConverterUtils.getQueryContext( "SELECT " + EXTRACT_INDEX + " AS k, COUNT(*) AS c FROM myTable GROUP BY k HAVING COUNT(*) > 10"); - assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext, matchAll(1000))); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext)); } @Test @@ -298,7 +297,7 @@ public void testCanUseRejectsMultipleGroupByKeys() { QueryContext queryContext = QueryContextConverterUtils.getQueryContext( "SELECT " + EXTRACT_INDEX + " AS k, env, COUNT(*) FROM myTable GROUP BY k, env"); - assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext, matchAll(1000))); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, queryContext)); } @Test @@ -307,65 +306,7 @@ public void testCanUseRejectsWhenPathNotIndexed() { when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(false); IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader); - assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null), matchAll(1000))); - } - - /// Selectivity-gate tests below. SELECTIVITY_THRESHOLD = 2.0 (placeholder until JMH bench lands). - - @Test - public void testCanUseRejectsHighCardinalityRelativeToMatchedDocs() { - JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); - when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); - // Path cardinality 1000, segment total 100 → D >> SELECTIVITY_THRESHOLD * M, bail. - when(jsonIndexReader.getDistinctValueCountForPath("$.instance")).thenReturn(1000L); - IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader, 100); - - assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null), matchAll(100))); - } - - @Test - public void testCanUseAcceptsAtSelectivityBoundary() { - JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); - when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); - // D = floor(SELECTIVITY_THRESHOLD * M) = 2 * 100 = 200 → exactly at the boundary, route to the index. - when(jsonIndexReader.getDistinctValueCountForPath("$.instance")).thenReturn(200L); - IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader, 100); - - assertTrue(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null), matchAll(100))); - } - - @Test - public void testCanUseRejectsJustAboveSelectivityBoundary() { - JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); - when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); - // D = SELECTIVITY_THRESHOLD * M + 1 → just above the boundary, bail. - when(jsonIndexReader.getDistinctValueCountForPath("$.instance")).thenReturn(201L); - IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader, 100); - - assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null), matchAll(100))); - } - - @Test - public void testCanUseRejectsZeroMatchingDocs() { - JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); - when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); - when(jsonIndexReader.getDistinctValueCountForPath("$.instance")).thenReturn(5L); - IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader, 100); - - // Empty WHERE bitmap → M = 0 → no docs to count; the default operator handles this trivially. - assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null), - new StaticBitmapFilterOperator(100, new MutableRoaringBitmap()))); - } - - @Test - public void testCanUseAcceptsWhenPathHasNoValues() { - JsonIndexReader jsonIndexReader = mock(JsonIndexReader.class); - when(jsonIndexReader.isPathIndexed("$.instance")).thenReturn(true); - // D = 0 (path not present in the index) → still cheaper than parsing every matched doc. - when(jsonIndexReader.getDistinctValueCountForPath("$.instance")).thenReturn(0L); - IndexSegment indexSegment = buildCanUseIndexSegment(jsonIndexReader, 100); - - assertTrue(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null), matchAll(100))); + assertFalse(JsonIndexGroupByOperator.canUse(indexSegment, groupByQuery(EXTRACT_INDEX, null))); } private static QueryContext groupByQuery(String groupKeyExpression, String jsonMatchFilter) { @@ -398,26 +339,14 @@ private static void stubConvertedDocIds(JsonIndexReader jsonIndexReader, } private static IndexSegment buildCanUseIndexSegment(JsonIndexReader jsonIndexReader) { - return buildCanUseIndexSegment(jsonIndexReader, 1000); - } - - private static IndexSegment buildCanUseIndexSegment(JsonIndexReader jsonIndexReader, int totalDocs) { DataSource dataSource = mock(DataSource.class); when(dataSource.getJsonIndex()).thenReturn(jsonIndexReader); - SegmentMetadata segmentMetadata = mock(SegmentMetadata.class); - when(segmentMetadata.getTotalDocs()).thenReturn(totalDocs); - IndexSegment indexSegment = mock(IndexSegment.class); when(indexSegment.getDataSourceNullable("tags")).thenReturn(dataSource); - when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata); return indexSegment; } - private static BaseFilterOperator matchAll(int numDocs) { - return new MatchAllFilterOperator(numDocs); - } - private static JsonIndexGroupByOperator buildOperator(QueryContext queryContext, JsonIndexReader jsonIndexReader, MutableRoaringBitmap filterBitmap, int numDocs) { return buildOperator(queryContext, jsonIndexReader, diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java index db9587a0acf8..9e0583ab0bf7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java @@ -676,36 +676,6 @@ private Map getMatchingKeysMap(String key) { key + JsonIndexCreator.KEY_VALUE_SEPARATOR_NEXT_CHAR, false); } - @Override - public long getDistinctValueCountForPath(String jsonPathKey) { - // Hold on to the caller-supplied path; the fallback delegates to getMatchingDistinctValues, which does its own - // normalization. Forwarding the already-normalized key would double-prefix and miss every entry. - String originalPathKey = jsonPathKey; - if (jsonPathKey.startsWith("$")) { - jsonPathKey = jsonPathKey.substring(1); - } else { - jsonPathKey = JsonUtils.KEY_SEPARATOR + jsonPathKey; - } - _readLock.lock(); - try { - Pair result = getKeyAndFlattenedDocIds(jsonPathKey); - // Array-index paths require posting-list intersection — defer to the materializing default. Release the read - // lock first because the default path re-enters this class via getMatchingFlattenedDocsMap, which acquires - // the read lock itself (the lock is reentrant but releasing keeps the lock-discipline obvious for readers). - if (result.getRight() != null) { - _readLock.unlock(); - try { - return MutableJsonIndex.super.getDistinctValueCountForPath(originalPathKey); - } finally { - _readLock.lock(); - } - } - return getMatchingKeysMap(result.getLeft()).size(); - } finally { - _readLock.unlock(); - } - } - @Override public String[][] getValuesMV(int[] docIds, int length, Map valueToMatchingFlattenedDocs) { String[][] result = new String[length][]; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java index f53faf5802fe..fc03e03c614e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/json/ImmutableJsonIndexReader.java @@ -814,33 +814,6 @@ public String[] getValuesSV(int[] docIds, int length, Map return values; } - @Override - public long getDistinctValueCountForPath(String jsonPathKey) { - // Hold on to the caller-supplied path; fallback paths re-enter this class via getMatchingDistinctValues, which - // does its own normalization. Forwarding the already-normalized key would double-prefix the KEY_SEPARATOR and - // miss every entry. - String originalPathKey = jsonPathKey; - if (_version == BaseJsonIndexCreator.VERSION_2) { - if (jsonPathKey.startsWith("$")) { - jsonPathKey = jsonPathKey.substring(1); - } else { - jsonPathKey = JsonUtils.KEY_SEPARATOR + jsonPathKey; - } - } else if (jsonPathKey.startsWith("$.")) { - jsonPathKey = jsonPathKey.substring(2); - } - Pair pair = getKeyAndFlattenedDocIds(jsonPathKey); - // Array-index paths require posting-list intersection — fall back to materialization for correctness. - if (pair.getRight() != null) { - return JsonIndexReader.super.getDistinctValueCountForPath(originalPathKey); - } - int[] dictIds = getDictIdRangeForKey(pair.getLeft()); - if (dictIds[0] < 0) { - return 0; - } - return (long) dictIds[1] - dictIds[0]; - } - /** * For a JSON key path, returns an int array of the range [min, max] spanning all values for the JSON key path */ diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java index 2ed336970b49..670ed60baaf3 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java @@ -277,67 +277,6 @@ private void assertDocIds(JsonIndexReader indexReader, String filter, int[] expe } } - @Test - public void testGetDistinctValueCountForPath() - throws Exception { - // @formatter:off - // CHECKSTYLE:OFF - String[] records = new String[]{ - "{\"name\":\"adam\"," - + "\"addresses\":[{\"country\":\"us\"},{\"country\":\"ca\"}]," - + "\"skills\":[\"english\",\"programming\"]}", - "{\"name\":\"bob\"," - + "\"addresses\":[{\"country\":\"ca\"},{\"country\":\"uk\"}]," - + "\"skills\":[\"english\",\"math\"]}", - "{\"name\":\"carl\"," - + "\"addresses\":[{\"country\":\"jp\"},{\"country\":\"kr\"}]," - + "\"skills\":[\"japanese\"]}" - }; - // CHECKSTYLE:ON - // @formatter:on - JsonIndexConfig jsonIndexConfig = getIndexConfig(); - - createIndex(true, jsonIndexConfig, records); - File onHeapIndexFile = new File(INDEX_DIR, ON_HEAP_COLUMN_NAME + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION); - createIndex(false, jsonIndexConfig, records); - File offHeapIndexFile = new File(INDEX_DIR, OFF_HEAP_COLUMN_NAME + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION); - - try (PinotDataBuffer onHeapBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(onHeapIndexFile); - PinotDataBuffer offHeapBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile); - JsonIndexReader onHeapReader = new ImmutableJsonIndexReader(onHeapBuffer, records.length); - JsonIndexReader offHeapReader = new ImmutableJsonIndexReader(offHeapBuffer, records.length); - MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl(jsonIndexConfig, "table__0__1", "col")) { - for (String record : records) { - mutableJsonIndex.add(record); - } - JsonIndexReader[] indexReaders = new JsonIndexReader[]{onHeapReader, offHeapReader, mutableJsonIndex}; - for (JsonIndexReader reader : indexReaders) { - String label = reader.getClass().getSimpleName(); - // Scalar path: exercises the cheap dict-range / TreeMap-subrange fast path. Exact count. - assertEquals(reader.getDistinctValueCountForPath("$.name"), 3L, - label + " distinct count at $.name"); - // Wildcard array path normalizes to a value-only key with no bitmap constraint, so it also takes the fast - // path and returns the exact count. - assertEquals(reader.getDistinctValueCountForPath("$.skills[*]"), 4L, - label + " distinct count at $.skills[*]"); - // Array-index paths require posting-list intersection and exercise the fallback that previously returned 0 - // for both reader implementations due to double-normalization of the path key. - // The SPI contract allows an upper bound (see Javadoc); collectValuesFromFlattenedDocsMap currently drops - // the array-index bitmap and returns the union across all indices (5 distinct countries here, not the 3 at - // index 0/1). We assert >= the true value so this test (a) fails on the previous 0-return bug, and (b) - // continues to pass if collectValuesFromFlattenedDocsMap is later tightened to the exact 3. - assertTrue(reader.getDistinctValueCountForPath("$.addresses[0].country") >= 3L, - label + " distinct count at $.addresses[0].country (upper bound)"); - assertTrue(reader.getDistinctValueCountForPath("$.addresses[1].country") >= 3L, - label + " distinct count at $.addresses[1].country (upper bound)"); - // Path absent from the index returns 0 via the fast path; this is the "trust the index, skip scan" signal - // used by JsonIndexGroupByOperator's selectivity gate. - assertEquals(reader.getDistinctValueCountForPath("$.does_not_exist"), 0L, - label + " distinct count at non-existent path"); - } - } - } - @Test public void testLargeIndex() throws Exception { diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java index 7f9699d345d5..06a095b4e206 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/JsonIndexReader.java @@ -101,25 +101,4 @@ default Set getMatchingDistinctValues(String key, @Nullable String filte default boolean isPathIndexed(String jsonPath) { return true; } - - /** - * Returns a non-negative upper bound on the number of distinct values seen at the given JSON path across all - * documents in the segment. Implementations are free to return the exact count when it is cheap to compute; a - * looser upper bound is permitted (and required for some array-indexed paths, see below). - * - *

The OSS immutable implementation answers scalar / wildcard paths in O(log N) via the sorted dictionary - * range. Array-indexed paths (for example {@code $.items[0].name}) fall back to the materializing default and - * return the union of distinct values across all indices, which is a strict upper bound on the per-index count. - * The default implementation materializes the full value set via {@link #getMatchingDistinctValues}; it is - * correct but intentionally slow on high-cardinality paths. - * - *

Callers use this value as a cardinality estimate to decide whether a dictionary-scan execution plan is - * cheaper than a row-by-row scan baseline. Because the contract is an upper bound, an estimate that is too high - * is perf-conservative (it may pick the slower row-scan path when the dictionary scan would have won); an - * estimate that is too low would be unsafe, so implementations must never under-report. A path that does not - * appear in the index returns 0. - */ - default long getDistinctValueCountForPath(String jsonPath) { - return getMatchingDistinctValues(jsonPath, null).size(); - } }