diff --git a/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java b/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java index c67c39c645bc..3c9f48c0d48a 100644 --- a/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java @@ -93,10 +93,9 @@ public ByteBuffer minKey() return index.minKey(); } - public ByteBuffer maxKey() - { - return index.maxKey(); - } + public ByteBuffer maxKey() { return index.maxKey(); } + + public long getEstimatedResultRows() { return index.getEstimatedResultRows(); } public RangeIterator search(Expression expression) { diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java index 4d43cd908430..e02ce30ba91f 100644 --- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java @@ -22,6 +22,9 @@ import java.util.*; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.index.sasi.Term; import org.apache.cassandra.index.sasi.plan.Expression; @@ -46,6 +49,8 @@ public class OnDiskIndex implements Iterable, Closeable { + private static final Logger logger = LoggerFactory.getLogger(OnDiskIndex.class); + public enum IteratorOrder { DESC(1), ASC(-1); @@ -115,6 +120,8 @@ public int startAt(SearchResult found, boolean inclusive) protected final ByteBuffer minTerm, maxTerm, minKey, maxKey; + protected final long estimatedResultRows; + @SuppressWarnings("resource") public OnDiskIndex(File index, AbstractType cmp, Function keyReader) { @@ -147,17 +154,30 @@ public OnDiskIndex(File index, AbstractType cmp, Function // start of the levels indexFile.position(indexFile.getLong(indexSize - 8)); + logger.trace("Index: {}", index); int numLevels = indexFile.getInt(); + int pointBlockcount = 0; levels = new PointerLevel[numLevels]; for (int i = 0; i < levels.length; i++) { int blockCount = indexFile.getInt(); + logger.trace("PointerLevel: {} blocks", blockCount); levels[i] = new PointerLevel(indexFile.position(), blockCount); indexFile.position(indexFile.position() + blockCount * 8); + + pointBlockcount += blockCount; } int blockCount = indexFile.getInt(); dataLevel = new DataLevel(indexFile.position(), blockCount); + logger.trace("DataLevel: {} blocks, {} bytes per term, {} bytes total", + blockCount, termSize, indexSize); + + // This isn't really related to the number of tokens + // in this index but should still be a good indicator + // of the cardinality of the index. + // Instead we should add a field containing the number of tokens in this index. + estimatedResultRows = pointBlockcount + blockCount; } catch (IOException e) { @@ -194,10 +214,7 @@ public ByteBuffer minKey() return minKey; } - public ByteBuffer maxKey() - { - return maxKey; - } + public ByteBuffer maxKey() { return maxKey; } public DataTerm min() { @@ -210,6 +227,10 @@ public DataTerm max() return block.getTerm(block.termCount() - 1); } + public long getEstimatedResultRows() { + return estimatedResultRows; + } + /** * Search for rows which match all of the terms inside the given expression in the index file. * diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java index 22fca68f230b..7f49af4338fb 100644 --- a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java +++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java @@ -17,8 +17,12 @@ */ package org.apache.cassandra.index.sasi.plan; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.*; import java.util.concurrent.TimeUnit; +import static java.lang.Long.min; import com.google.common.collect.Sets; @@ -26,6 +30,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.index.Index; import org.apache.cassandra.index.sasi.SASIIndex; @@ -39,16 +44,22 @@ import org.apache.cassandra.index.sasi.utils.RangeIntersectionIterator; import org.apache.cassandra.index.sasi.utils.RangeIterator; import org.apache.cassandra.index.sasi.utils.RangeUnionIterator; -import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.Pair; + + public class QueryController { + private static final Logger logger = LoggerFactory.getLogger(QueryController.class); + private final long executionQuota; private final long executionStart; + private final long indexIgnoreTokenCountThreshold; + private final double indexIgnoreTokenRatioThreshold = 0.1d; + private final ColumnFamilyStore cfs; private final PartitionRangeReadCommand command; private final DataRange range; @@ -61,6 +72,10 @@ public QueryController(ColumnFamilyStore cfs, PartitionRangeReadCommand command, this.range = command.dataRange(); this.executionQuota = TimeUnit.MILLISECONDS.toNanos(timeQuotaMs); this.executionStart = System.nanoTime(); + if (command.limits().count() != DataLimits.NO_LIMIT) + this.indexIgnoreTokenCountThreshold = min(command.limits().count(), 10000); + else + this.indexIgnoreTokenCountThreshold = 10000; } public boolean isForThrift() @@ -133,22 +148,62 @@ public RangeIterator.Builder getIndexes(OperationType op, Collectio if (resources.containsKey(expressions)) throw new IllegalArgumentException("Can't process the same expressions multiple times."); - RangeIterator.Builder builder = op == OperationType.OR - ? RangeUnionIterator.builder() - : RangeIntersectionIterator.builder(); - - List> perIndexUnions = new ArrayList<>(); + List> indexes = new ArrayList<>(); + long minCount = Long.MAX_VALUE; - for (Map.Entry> e : getView(op, expressions).entrySet()) + // First execute the search on every index. + for (Pair> e : getWeightedViews(op, expressions)) { + logger.trace("Searching {}", e); + @SuppressWarnings("resource") // RangeIterators are closed by releaseIndexes - RangeIterator index = TermIterator.build(e.getKey(), e.getValue()); + RangeIterator index = TermIterator.build(e.left, e.right); + logger.trace("Found {} tokens", index.getCount()); + + // Keep track of the index that returned the least amount of results. + if (index.getCount() < minCount) + minCount = index.getCount(); + + indexes.add(index); + + // If we found an index that already returned less results than the user + // specified limit, it's probably faster to stop here because we already + // ordered the expressions by cardinality. + logger.trace("{} {} {}", e, command.limits(), index.getCount()); + if (op == OperationType.AND && + command.limits().count() != DataLimits.NO_LIMIT && + index.getCount() < command.limits().count()) { + logger.trace("We found less than {} tokens. Stopping.", + command.limits().count()); + break; + } + } - builder.add(index); - perIndexUnions.add(index); + RangeIterator.Builder builder = op == OperationType.OR + ? RangeUnionIterator.builder() + : RangeIntersectionIterator.builder(); + + + for (RangeIterator index : indexes) { + if (op == OperationType.AND) { + long tokenCount = index.getCount(); + float ratio = minCount == 0 ? 0 : (float) minCount / tokenCount; + // If we are doing intersections between indexes we can enable some + // additional optimizations. + + // See CASSANDRA-12915 for details. OnDiskIndexIterator can be rather + // inefficient and in some cases it is faster to rely on post-filtering. + logger.trace("count: {}, ratio: {}, threshold: {}", + tokenCount, ratio, indexIgnoreTokenCountThreshold); + if (tokenCount > indexIgnoreTokenCountThreshold && ratio < indexIgnoreTokenRatioThreshold) { + logger.trace("Skipping"); + continue; + } + } + builder.add(index); } - resources.put(expressions, perIndexUnions); + resources.put(expressions, indexes); return builder; } @@ -177,12 +232,55 @@ public void finish() resources.values().forEach(this::releaseIndexes); } + /** + * Returns a list of Pair> ordered by guessed cardinality. The first + * element of this list is likely to have a higher cardinality and be able to filter out more elements. + * @param op + * @param expressions + * @return A sorted list of expression and associated sstables. + */ + private List>> getWeightedViews(OperationType op, + Collection expressions) { + Map> views = getView(op, expressions); + + // Generate a list of expression sorted by score. + List> expressionScores = new ArrayList<>(); + for (Map.Entry> e : views.entrySet()) { + Expression expression = e.getKey(); + + long estimatedRowCount = 0; + + // Prioritize operations that are likely to make a good use of the index. + if (expression.getOp() == Expression.Op.EQ + || expression.getOp() == Expression.Op.PREFIX) { + for (SSTableIndex index : e.getValue()) + estimatedRowCount += index.getEstimatedResultRows(); + } + // Let's assume that prefix will return way more values so let's reduce its score. + if (expression.getOp() == Expression.Op.PREFIX) + estimatedRowCount = (long)Math.sqrt((double)estimatedRowCount); + + expressionScores.add(Pair.create(expression, estimatedRowCount)); + logger.trace("Estimated row count for {}: {}", expression.toString(), estimatedRowCount); + } + expressionScores.sort(((a, b) -> b.right.compareTo(a.right))); + + // Now move that back in something more useful to the caller. + List>> sortedViews = new ArrayList<>(); + for (Pair e : expressionScores) + sortedViews.add(Pair.create(e.left, views.get(e.left))); + + return sortedViews; + } + private Map> getView(OperationType op, Collection expressions) { // first let's determine the primary expression if op is AND Pair> primary = (op == OperationType.AND) ? calculatePrimary(expressions) : null; Map> indexes = new HashMap<>(); + + logger.trace("Expressions: {}, Op: {}", expressions.toString(), op); for (Expression e : expressions) { // NO_EQ and non-index column query should only act as FILTER BY for satisfiedBy(Row) method @@ -214,6 +312,7 @@ private Map> getView(OperationType op, Collection< indexes.put(e, readers); } + logger.trace("Final view: {}", indexes); return indexes; } diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java index 326ea0d47308..35c6109590c0 100644 --- a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java +++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java @@ -17,6 +17,9 @@ */ package org.apache.cassandra.index.sasi.plan; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.*; import org.apache.cassandra.config.CFMetaData; @@ -32,6 +35,8 @@ public class QueryPlan { + private static final Logger logger = LoggerFactory.getLogger(QueryPlan.class); + private final QueryController controller; public QueryPlan(ColumnFamilyStore cfs, ReadCommand command, long executionQuotaMs) @@ -52,9 +57,13 @@ private Operation analyze() { try { + logger.trace("Analyzing {} {}", + controller.metadata(), controller.getExpressions()); Operation.Builder and = new Operation.Builder(OperationType.AND, controller); controller.getExpressions().forEach(and::add); - return and.complete(); + Operation op = and.complete(); + logger.trace("done"); + return op; } catch (Exception | Error e) {