Skip to content

Commit

Permalink
Add support for PK lookup if additional filters are combined by AND
Browse files Browse the repository at this point in the history
If filters are combined by AND operators in addition to filters
on primary key columns, the optimized PK lookup plan is used now.

Closes #10298.
  • Loading branch information
seut committed Oct 29, 2020
1 parent 81f9108 commit 514fe79
Show file tree
Hide file tree
Showing 14 changed files with 193 additions and 35 deletions.
3 changes: 3 additions & 0 deletions docs/appendices/release-notes/unreleased.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ None
Changes
=======

- Added support for using the optimized primary key lookup plan if additional
filters are combined via ``AND`` operators.

- Improved the performance of queries on the ``sys.allocations`` table in cases
where there are filters restricting the result set or if only a sub-set of
the columns is selected.
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/io/crate/analyze/WhereClause.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public WhereClause(@Nullable Symbol normalizedQuery,
this.partitions = Objects.requireNonNullElse(partitions, List.of());
this.query = normalizedQuery;
if (normalizedQuery != null) {
validateVersioningColumnsUsage();
validateVersioningColumnsUsage(normalizedQuery);
}
}

Expand All @@ -93,7 +93,7 @@ public Symbol queryOrFallback() {
return query == null ? Literal.BOOLEAN_TRUE : query;
}

private void validateVersioningColumnsUsage() {
public static void validateVersioningColumnsUsage(Symbol query) {
if (Symbols.containsColumn(query, DocSysColumns.SEQ_NO)) {
if (!Symbols.containsColumn(query, DocSysColumns.PRIMARY_TERM)) {
throw VersioninigValidationException.seqNoAndPrimaryTermUsage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.google.common.collect.Sets;
import io.crate.expression.eval.EvaluatingNormalizer;
import io.crate.expression.operator.AndOperator;
import io.crate.expression.operator.EqOperator;
import io.crate.expression.operator.Operator;
import io.crate.expression.operator.Operators;
Expand Down Expand Up @@ -86,6 +87,8 @@ public List<List<Symbol>> extractParentMatches(List<ColumnIdent> columns, Symbol
* columns: [x]
* x = ? -> [[?]]
* x = 1 or x = 2 -> [[1], [2]]
* x = 1 and some_scalar(z, ...) = 2 -> [[1]]
* x = 1 or (x = 2 and z = 20) -> [[1], [2]]
*
* columns: [x, y]
* x = $1 and y = $2 -> [[$1, $2]]
Expand All @@ -95,6 +98,7 @@ public List<List<Symbol>> extractParentMatches(List<ColumnIdent> columns, Symbol
*
* columns: [x]
* x = 10 or y = 20 -> null (y not inside columns)
* x = 10 and match(z, ...) = 'foo' -> null (MATCH predicate can only be applied on lucene)
* </pre>
*/
@Nullable
Expand Down Expand Up @@ -359,6 +363,7 @@ static class Context {
private LinkedHashMap<ColumnIdent, Comparison> comparisons;
private boolean proxyBelow;
private boolean seenUnknown = false;
private boolean ignoreUnknown = false;
private final boolean exact;

private Context(Collection<ColumnIdent> references, boolean exact) {
Expand Down Expand Up @@ -428,6 +433,7 @@ public Symbol visitFunction(Function function, Context context) {
} else if (Operators.LOGICAL_OPERATORS.contains(functionName)) {
boolean proxyBelowPre = context.proxyBelow;
boolean proxyBelowPost = proxyBelowPre;
context.ignoreUnknown = context.ignoreUnknown || functionName.equals(AndOperator.NAME);
ArrayList<Symbol> newArgs = new ArrayList<>(arguments.size());
for (Symbol arg : arguments) {
context.proxyBelow = proxyBelowPre;
Expand All @@ -440,7 +446,10 @@ public Symbol visitFunction(Function function, Context context) {
}
return new Function(function.signature(), newArgs, function.valueType());
}
context.seenUnknown = true;
if (context.ignoreUnknown == false
|| functionName.equals(io.crate.expression.predicate.MatchPredicate.NAME)) {
context.seenUnknown = true;
}
return Literal.BOOLEAN_TRUE;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.crate.expression.symbol.Function;
import io.crate.expression.symbol.ScopedSymbol;
import io.crate.expression.symbol.Symbol;
import io.crate.expression.symbol.SymbolType;
import io.crate.expression.symbol.SymbolVisitor;
import io.crate.expression.symbol.WindowFunction;
import io.crate.metadata.FunctionType;
Expand Down Expand Up @@ -164,7 +165,7 @@ private static void validateSysReference(Context context, Set<String> requiredFu
}
assert function.arguments().size() == 2 : "function's number of arguments must be 2";
Symbol right = function.arguments().get(1);
if (!right.symbolType().isValueSymbol()) {
if (!right.symbolType().isValueSymbol() && right.symbolType() != SymbolType.PARAMETER) {
throw error.get();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.crate.expression.symbol.Symbol;
import io.crate.expression.symbol.SymbolVisitors;
import io.crate.expression.symbol.Symbols;
import io.crate.metadata.RowGranularity;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

Expand All @@ -41,6 +42,7 @@
public class EvalProjection extends Projection {

private final List<Symbol> outputs;
private RowGranularity requiredGranularity = RowGranularity.CLUSTER;

public EvalProjection(List<Symbol> outputs) {
assert outputs.stream().noneMatch(
Expand All @@ -58,6 +60,15 @@ public ProjectionType projectionType() {
return ProjectionType.EVAL;
}

@Override
public RowGranularity requiredGranularity() {
return requiredGranularity;
}

public void requiredGranularity(RowGranularity requiredRowGranularity) {
this.requiredGranularity = requiredRowGranularity;
}

@Override
public <C, R> R accept(ProjectionVisitor<C, R> visitor, C context) {
return visitor.visitEvalProjection(this, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public static DetailedQuery optimize(EvaluatingNormalizer normalizer,
if (!query.equals(queryGenColsProcessed)) {
query = normalizer.normalize(queryGenColsProcessed, txnCtx);
}
WhereClause.validateVersioningColumnsUsage(query);

boolean versionInQuery = Symbols.containsColumn(query, DocSysColumns.VERSION);
boolean sequenceVersioningInQuery = Symbols.containsColumn(query, DocSysColumns.SEQ_NO) &&
Expand Down Expand Up @@ -192,9 +193,7 @@ public static DetailedQuery optimize(EvaluatingNormalizer normalizer,
}
}
}
if (docKeys == null) {
WhereClauseValidator.validate(query);
}
WhereClauseValidator.validate(query);
return new DetailedQuery(query, docKeys, partitionValues, clusteredBy);
}

Expand Down
63 changes: 55 additions & 8 deletions server/src/main/java/io/crate/planner/operators/Get.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,19 @@
import io.crate.common.collections.Lists2;
import io.crate.data.Row;
import io.crate.execution.dsl.phases.PKLookupPhase;
import io.crate.execution.dsl.projection.EvalProjection;
import io.crate.execution.dsl.projection.builder.ProjectionBuilder;
import io.crate.execution.engine.pipeline.TopN;
import io.crate.expression.symbol.InputColumn;
import io.crate.expression.symbol.RefVisitor;
import io.crate.expression.symbol.SelectSymbol;
import io.crate.expression.symbol.Symbol;
import io.crate.metadata.IndexParts;
import io.crate.metadata.PartitionName;
import io.crate.metadata.Reference;
import io.crate.metadata.RelationName;
import io.crate.metadata.RowGranularity;
import io.crate.metadata.doc.DocSysColumns;
import io.crate.metadata.doc.DocTableInfo;
import io.crate.planner.ExecutionPlan;
import io.crate.planner.PlannerContext;
Expand All @@ -52,21 +58,29 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

public class Get implements LogicalPlan {

final DocTableRelation tableRelation;
final DocKeys docKeys;
final Symbol query;
final long estimatedSizePerRow;
private final List<Symbol> outputs;

public Get(DocTableRelation table, DocKeys docKeys, List<Symbol> outputs, long estimatedSizePerRow) {
this.outputs = outputs;
public Get(DocTableRelation table,
DocKeys docKeys,
Symbol query,
List<Symbol> outputs,
long estimatedSizePerRow) {
this.tableRelation = table;
this.docKeys = docKeys;
this.query = query;
this.estimatedSizePerRow = estimatedSizePerRow;
this.outputs = outputs;
}

@Override
Expand All @@ -85,8 +99,6 @@ public ExecutionPlan build(PlannerContext plannerContext,
SubQueryResults subQueryResults) {
HashMap<String, Map<ShardId, List<PKAndVersion>>> idsByShardByNode = new HashMap<>();
DocTableInfo docTableInfo = tableRelation.tableInfo();
List<Symbol> boundOutputs = Lists2.map(
outputs, s -> SubQueryAndParamBinder.convert(s, params, subQueryResults));
for (DocKeys.DocKey docKey : docKeys) {
String id = docKey.getId(plannerContext.transactionContext(), plannerContext.nodeContext(), params, subQueryResults);
if (id == null) {
Expand Down Expand Up @@ -135,20 +147,55 @@ public ExecutionPlan build(PlannerContext plannerContext,
.orElse(SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
pkAndVersions.add(new PKAndVersion(id, version, sequenceNumber, primaryTerm));
}
return new Collect(

var docKeyColumns = new ArrayList<>(docTableInfo.primaryKey());
docKeyColumns.addAll(docTableInfo.partitionedBy());
docKeyColumns.add(docTableInfo.clusteredBy());
docKeyColumns.add(DocSysColumns.VERSION);
docKeyColumns.add(DocSysColumns.SEQ_NO);
docKeyColumns.add(DocSysColumns.PRIMARY_TERM);

List<Symbol> boundOutputs = Lists2.map(
outputs, s -> SubQueryAndParamBinder.convert(s, params, subQueryResults));
var boundQuery = SubQueryAndParamBinder.convert(query, params, subQueryResults);

var toCollect = new LinkedHashSet<>(boundOutputs);
Consumer<Reference> addRefIfMatch = ref -> {
if (ref.ident().tableIdent().equals(tableRelation.relationName())
&& docKeyColumns.contains(ref.column()) == false) {
toCollect.add(ref);
}
};
RefVisitor.visitRefs(boundQuery, addRefIfMatch);

var collect = new Collect(
new PKLookupPhase(
plannerContext.jobId(),
plannerContext.nextExecutionPhaseId(),
docTableInfo.partitionedBy(),
boundOutputs,
List.copyOf(toCollect),
idsByShardByNode
),
TopN.NO_LIMIT,
0,
boundOutputs.size(),
toCollect.size(),
docKeys.size(),
null
);

var requiresAdditionalFilteringOnNonDocKeyColumns = toCollect.size() != boundOutputs.size();
if (requiresAdditionalFilteringOnNonDocKeyColumns) {
var filterProjection = ProjectionBuilder.filterProjection(toCollect, boundQuery);
filterProjection.requiredGranularity(RowGranularity.SHARD);
collect.addProjection(filterProjection);

// reduce outputs which have been added due to filtering
var evalProjection = new EvalProjection(InputColumn.mapToInputColumns(boundOutputs));
evalProjection.requiredGranularity(RowGranularity.SHARD);
collect.addProjection(evalProjection);
}

return collect;
}

@Override
Expand Down Expand Up @@ -184,7 +231,7 @@ public LogicalPlan pruneOutputsExcept(TableStats tableStats, Collection<Symbol>
}
}
if (excludedAny) {
return new Get(tableRelation, docKeys, newOutputs, estimatedSizePerRow);
return new Get(tableRelation, docKeys, query, newOutputs, estimatedSizePerRow);
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,13 @@ public LogicalPlan apply(Collect collect,
Optional<DocKeys> docKeys = detailedQuery.docKeys();
//noinspection OptionalIsPresent no capturing lambda allocation
if (docKeys.isPresent()) {
return new Get(relation, docKeys.get(), collect.outputs(), tableStats.estimatedSizePerRow(relation.relationName()));
return new Get(
relation,
docKeys.get(),
detailedQuery.query(),
collect.outputs(),
tableStats.estimatedSizePerRow(relation.relationName())
);
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@
import io.crate.execution.engine.pipeline.TopN;
import io.crate.execution.support.OneRowActionListener;
import io.crate.expression.eval.EvaluatingNormalizer;
import io.crate.expression.symbol.DynamicReference;
import io.crate.expression.symbol.Function;
import io.crate.expression.symbol.InputColumn;
import io.crate.expression.symbol.Literal;
import io.crate.expression.symbol.Symbol;
import io.crate.expression.symbol.SymbolVisitor;
import io.crate.metadata.IndexParts;
import io.crate.metadata.Reference;
import io.crate.metadata.Routing;
Expand Down Expand Up @@ -78,6 +81,8 @@

public final class DeletePlanner {

private static final NonPartitionReferenceDetector NON_PARTITION_BY_DETECTOR = new NonPartitionReferenceDetector();

public static Plan planDelete(AnalyzedDeleteStatement delete,
SubqueryPlanner subqueryPlanner,
PlannerContext context) {
Expand All @@ -93,7 +98,12 @@ private static Plan planDelete(AnalyzedDeleteStatement delete, PlannerContext co
normalizer, delete.query(), table, context.transactionContext(), context.nodeContext());

if (!detailedQuery.partitions().isEmpty()) {
return new DeletePartitions(table.ident(), detailedQuery.partitions());
// deleting whole partitions is only valid if the query only contains filters based on partition-by cols
var hasNonPartitionReferences = detailedQuery.query()
.accept(NON_PARTITION_BY_DETECTOR, table.partitionedByColumns());
if (hasNonPartitionReferences == false) {
return new DeletePartitions(table.ident(), detailedQuery.partitions());
}
}
if (detailedQuery.docKeys().isPresent()) {
return new DeleteById(tableRel.tableInfo(), detailedQuery.docKeys().get());
Expand Down Expand Up @@ -193,4 +203,32 @@ private static ExecutionPlan deleteByQuery(DocTableRelation table, PlannerContex
Collect collect = new Collect(collectPhase, TopN.NO_LIMIT, 0, 1, 1, null);
return Merge.ensureOnHandler(collect, context, Collections.singletonList(MergeCountProjection.INSTANCE));
}

private static class NonPartitionReferenceDetector extends SymbolVisitor<List<Reference>, Boolean> {
@Override
public Boolean visitFunction(Function symbol, List<Reference> partitionedByColumns) {
for (var arg : symbol.arguments()) {
if (arg.accept(this, partitionedByColumns)) {
return true;
}
}
return false;
}

@Override
public Boolean visitReference(Reference symbol, List<Reference> partitionedByColumns) {
return !partitionedByColumns.contains(symbol);
}

@Override
public Boolean visitDynamicReference(DynamicReference symbol,
List<Reference> context) {
return visitReference(symbol, context);
}

@Override
protected Boolean visitSymbol(Symbol symbol, List<Reference> partitionedByColumns) {
return false;
}
}
}
Loading

0 comments on commit 514fe79

Please sign in to comment.