Skip to content

Commit

Permalink
sasi: intersection optimizations
Browse files Browse the repository at this point in the history
in general:
- try order expressions by cardinality

During intersections (when op is AND):
- stop searching expressions in indexes if we think we already found
  enough tokens (based on LIMIT).
- ignored other indexes if they return way more results (100x)
  than the smallest set of results.

See CASSANDRA-12915, CASSANDRA-10765
  • Loading branch information
Corentin Chary committed Jun 8, 2017
1 parent 30412b0 commit edbc0a0
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 20 deletions.
7 changes: 3 additions & 4 deletions src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,9 @@ public ByteBuffer minKey()
return index.minKey();
}

public ByteBuffer maxKey()
{
return index.maxKey();
}
public ByteBuffer maxKey() { return index.maxKey(); }

public long getEstimatedResultRows() { return index.getEstimatedResultRows(); }

public RangeIterator<Long, Token> search(Expression expression)
{
Expand Down
29 changes: 25 additions & 4 deletions src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.*;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.index.sasi.Term;
import org.apache.cassandra.index.sasi.plan.Expression;
Expand All @@ -46,6 +49,8 @@

public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
{
private static final Logger logger = LoggerFactory.getLogger(OnDiskIndex.class);

public enum IteratorOrder
{
DESC(1), ASC(-1);
Expand Down Expand Up @@ -115,6 +120,8 @@ public int startAt(SearchResult<DataTerm> found, boolean inclusive)

protected final ByteBuffer minTerm, maxTerm, minKey, maxKey;

protected final long estimatedResultRows;

@SuppressWarnings("resource")
public OnDiskIndex(File index, AbstractType<?> cmp, Function<Long, DecoratedKey> keyReader)
{
Expand Down Expand Up @@ -147,17 +154,30 @@ public OnDiskIndex(File index, AbstractType<?> cmp, Function<Long, DecoratedKey>
// start of the levels
indexFile.position(indexFile.getLong(indexSize - 8));

logger.trace("Index: {}", index);
int numLevels = indexFile.getInt();
int pointBlockcount = 0;
levels = new PointerLevel[numLevels];
for (int i = 0; i < levels.length; i++)
{
int blockCount = indexFile.getInt();
logger.trace("PointerLevel: {} blocks", blockCount);
levels[i] = new PointerLevel(indexFile.position(), blockCount);
indexFile.position(indexFile.position() + blockCount * 8);

pointBlockcount += blockCount;
}

int blockCount = indexFile.getInt();
dataLevel = new DataLevel(indexFile.position(), blockCount);
logger.trace("DataLevel: {} blocks, {} bytes per term, {} bytes total",
blockCount, termSize, indexSize);

// This isn't really related to the number of tokens
// in this index but should still be a good indicator
// of the cardinality of the index.
// Instead we should add a field containing the number of tokens in this index.
estimatedResultRows = pointBlockcount + blockCount;
}
catch (IOException e)
{
Expand Down Expand Up @@ -194,10 +214,7 @@ public ByteBuffer minKey()
return minKey;
}

public ByteBuffer maxKey()
{
return maxKey;
}
public ByteBuffer maxKey() { return maxKey; }

public DataTerm min()
{
Expand All @@ -210,6 +227,10 @@ public DataTerm max()
return block.getTerm(block.termCount() - 1);
}

public long getEstimatedResultRows() {
return estimatedResultRows;
}

/**
* Search for rows which match all of the terms inside the given expression in the index file.
*
Expand Down
121 changes: 110 additions & 11 deletions src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@
*/
package org.apache.cassandra.index.sasi.plan;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.TimeUnit;
import static java.lang.Long.min;

import com.google.common.collect.Sets;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.sasi.SASIIndex;
Expand All @@ -39,16 +44,22 @@
import org.apache.cassandra.index.sasi.utils.RangeIntersectionIterator;
import org.apache.cassandra.index.sasi.utils.RangeIterator;
import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.Pair;



public class QueryController
{
private static final Logger logger = LoggerFactory.getLogger(QueryController.class);

private final long executionQuota;
private final long executionStart;

private final long indexIgnoreTokenCountThreshold;
private final double indexIgnoreTokenRatioThreshold = 0.1d;

private final ColumnFamilyStore cfs;
private final PartitionRangeReadCommand command;
private final DataRange range;
Expand All @@ -61,6 +72,10 @@ public QueryController(ColumnFamilyStore cfs, PartitionRangeReadCommand command,
this.range = command.dataRange();
this.executionQuota = TimeUnit.MILLISECONDS.toNanos(timeQuotaMs);
this.executionStart = System.nanoTime();
if (command.limits().count() != DataLimits.NO_LIMIT)
this.indexIgnoreTokenCountThreshold = min(command.limits().count(), 10000);
else
this.indexIgnoreTokenCountThreshold = 10000;
}

public boolean isForThrift()
Expand Down Expand Up @@ -133,22 +148,62 @@ public RangeIterator.Builder<Long, Token> getIndexes(OperationType op, Collectio
if (resources.containsKey(expressions))
throw new IllegalArgumentException("Can't process the same expressions multiple times.");

RangeIterator.Builder<Long, Token> builder = op == OperationType.OR
? RangeUnionIterator.<Long, Token>builder()
: RangeIntersectionIterator.<Long, Token>builder();

List<RangeIterator<Long, Token>> perIndexUnions = new ArrayList<>();
List<RangeIterator<Long, Token>> indexes = new ArrayList<>();
long minCount = Long.MAX_VALUE;

for (Map.Entry<Expression, Set<SSTableIndex>> e : getView(op, expressions).entrySet())
// First execute the search on every index.
for (Pair<Expression, Set<SSTableIndex>> e : getWeightedViews(op, expressions))
{
logger.trace("Searching {}", e);

@SuppressWarnings("resource") // RangeIterators are closed by releaseIndexes
RangeIterator<Long, Token> index = TermIterator.build(e.getKey(), e.getValue());
RangeIterator<Long, Token> index = TermIterator.build(e.left, e.right);
logger.trace("Found {} tokens", index.getCount());

// Keep track of the index that returned the least amount of results.
if (index.getCount() < minCount)
minCount = index.getCount();

indexes.add(index);

// If we found an index that already returned less results than the user
// specified limit, it's probably faster to stop here because we already
// ordered the expressions by cardinality.
logger.trace("{} {} {}", e, command.limits(), index.getCount());
if (op == OperationType.AND &&
command.limits().count() != DataLimits.NO_LIMIT &&
index.getCount() < command.limits().count()) {
logger.trace("We found less than {} tokens. Stopping.",
command.limits().count());
break;
}
}

builder.add(index);
perIndexUnions.add(index);
RangeIterator.Builder<Long, Token> builder = op == OperationType.OR
? RangeUnionIterator.<Long, Token>builder()
: RangeIntersectionIterator.<Long, Token>builder();


for (RangeIterator<Long, Token> index : indexes) {
if (op == OperationType.AND) {
long tokenCount = index.getCount();
float ratio = minCount == 0 ? 0 : (float) minCount / tokenCount;
// If we are doing intersections between indexes we can enable some
// additional optimizations.

// See CASSANDRA-12915 for details. OnDiskIndexIterator can be rather
// inefficient and in some cases it is faster to rely on post-filtering.
logger.trace("count: {}, ratio: {}, threshold: {}",
tokenCount, ratio, indexIgnoreTokenCountThreshold);
if (tokenCount > indexIgnoreTokenCountThreshold && ratio < indexIgnoreTokenRatioThreshold) {
logger.trace("Skipping");
continue;
}
}
builder.add(index);
}

resources.put(expressions, perIndexUnions);
resources.put(expressions, indexes);
return builder;
}

Expand Down Expand Up @@ -177,12 +232,55 @@ public void finish()
resources.values().forEach(this::releaseIndexes);
}

/**
* Returns a list of Pair<Expression, Set<SSTableIndex>> ordered by guessed cardinality. The first
* element of this list is likely to have a higher cardinality and be able to filter out more elements.
* @param op
* @param expressions
* @return A sorted list of expression and associated sstables.
*/
private List<Pair<Expression, Set<SSTableIndex>>> getWeightedViews(OperationType op,
Collection<Expression> expressions) {
Map<Expression, Set<SSTableIndex>> views = getView(op, expressions);

// Generate a list of expression sorted by score.
List<Pair<Expression, Long>> expressionScores = new ArrayList<>();
for (Map.Entry<Expression, Set<SSTableIndex>> e : views.entrySet()) {
Expression expression = e.getKey();

long estimatedRowCount = 0;

// Prioritize operations that are likely to make a good use of the index.
if (expression.getOp() == Expression.Op.EQ
|| expression.getOp() == Expression.Op.PREFIX) {
for (SSTableIndex index : e.getValue())
estimatedRowCount += index.getEstimatedResultRows();
}
// Let's assume that prefix will return way more values so let's reduce its score.
if (expression.getOp() == Expression.Op.PREFIX)
estimatedRowCount = (long)Math.sqrt((double)estimatedRowCount);

expressionScores.add(Pair.create(expression, estimatedRowCount));
logger.trace("Estimated row count for {}: {}", expression.toString(), estimatedRowCount);
}
expressionScores.sort(((a, b) -> b.right.compareTo(a.right)));

// Now move that back in something more useful to the caller.
List<Pair<Expression, Set<SSTableIndex>>> sortedViews = new ArrayList<>();
for (Pair<Expression, Long> e : expressionScores)
sortedViews.add(Pair.create(e.left, views.get(e.left)));

return sortedViews;
}

private Map<Expression, Set<SSTableIndex>> getView(OperationType op, Collection<Expression> expressions)
{
// first let's determine the primary expression if op is AND
Pair<Expression, Set<SSTableIndex>> primary = (op == OperationType.AND) ? calculatePrimary(expressions) : null;

Map<Expression, Set<SSTableIndex>> indexes = new HashMap<>();

logger.trace("Expressions: {}, Op: {}", expressions.toString(), op);
for (Expression e : expressions)
{
// NO_EQ and non-index column query should only act as FILTER BY for satisfiedBy(Row) method
Expand Down Expand Up @@ -214,6 +312,7 @@ private Map<Expression, Set<SSTableIndex>> getView(OperationType op, Collection<

indexes.put(e, readers);
}
logger.trace("Final view: {}", indexes);

return indexes;
}
Expand Down
11 changes: 10 additions & 1 deletion src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.cassandra.index.sasi.plan;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

import org.apache.cassandra.config.CFMetaData;
Expand All @@ -32,6 +35,8 @@

public class QueryPlan
{
private static final Logger logger = LoggerFactory.getLogger(QueryPlan.class);

private final QueryController controller;

public QueryPlan(ColumnFamilyStore cfs, ReadCommand command, long executionQuotaMs)
Expand All @@ -52,9 +57,13 @@ private Operation analyze()
{
try
{
logger.trace("Analyzing {} {}",
controller.metadata(), controller.getExpressions());
Operation.Builder and = new Operation.Builder(OperationType.AND, controller);
controller.getExpressions().forEach(and::add);
return and.complete();
Operation op = and.complete();
logger.trace("done");
return op;
}
catch (Exception | Error e)
{
Expand Down

0 comments on commit edbc0a0

Please sign in to comment.