Skip to content

Commit

Permalink
MB-48266 Improve previous fix
Browse files Browse the repository at this point in the history
There are other places in the code where we check for subset,
e.g. index conditions, etc. Improve the fix to cover more
cases.

Change-Id: Ib534c8460a73419b268d1843a6ad97bf5cd532b4
Reviewed-on: http://review.couchbase.org/c/query/+/160676
Reviewed-by: Sitaram Vemulapalli <sitaram.vemulapalli@couchbase.com>
Tested-by: Bingjie Miao <bingjie.miao@couchbase.com>
Reviewed-on: http://review.couchbase.org/c/query/+/160678
Well-Formed: Restriction Checker
  • Loading branch information
miaobingjie committed Sep 2, 2021
1 parent 8644cbb commit c9abaf2
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 38 deletions.
7 changes: 7 additions & 0 deletions planner/build_join_index.go
Expand Up @@ -154,6 +154,13 @@ func (this *builder) buildJoinScan(keyspace datastore.Keyspace, node *algebra.Ke
expression.NewFieldName("id", false)),
}

if len(this.context.NamedArgs()) > 0 || len(this.context.PositionalArgs()) > 0 {
subset, err = base.ReplaceParameters(subset, this.context.NamedArgs(), this.context.PositionalArgs())
if err != nil {
return nil, nil, nil, err
}
}

sargables, _, _, _, err := this.sargableIndexes(indexes, pred, subset, primaryKey, formalizer, nil, false)
if err != nil {
return nil, nil, nil, err
Expand Down
23 changes: 21 additions & 2 deletions planner/build_scan.go
Expand Up @@ -310,6 +310,14 @@ func (this *builder) buildTermScan(node *algebra.KeyspaceTerm,
pred = baseKeyspace.Onclause()
}

subset := pred
if len(this.context.NamedArgs()) > 0 || len(this.context.PositionalArgs()) > 0 {
subset, err = base.ReplaceParameters(subset, this.context.NamedArgs(), this.context.PositionalArgs())
if err != nil {
return
}
}

// collect UNNEST bindings when HINT indexes has FTS index
var ubs expression.Bindings
if this.hintIndexes && this.from != nil {
Expand All @@ -323,7 +331,7 @@ func (this *builder) buildTermScan(node *algebra.KeyspaceTerm,
}
}

sargables, all, arrays, flex, err := this.sargableIndexes(indexes, pred, pred, primaryKey,
sargables, all, arrays, flex, err := this.sargableIndexes(indexes, pred, subset, primaryKey,
formalizer, ubs, node.IsUnderNL())
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -395,7 +403,7 @@ func (this *builder) buildTermScan(node *algebra.KeyspaceTerm,
this.restoreIndexPushDowns(indexPushDowns, true)
hasDeltaKeyspace := this.context.HasDeltaKeyspace(baseKeyspace.Keyspace())

unnest, unnestSargLength, err := this.buildUnnestScan(node, this.from, pred, all, hasDeltaKeyspace)
unnest, unnestSargLength, err := this.buildUnnestScan(node, this.from, pred, subset, all, hasDeltaKeyspace)
if err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -777,6 +785,17 @@ func allIndexes(keyspace datastore.Keyspace, skip, virtualIndexes []datastore.In
return indexes, nil
}

func checkSubset(pred, cond expression.Expression, context *PrepareContext) bool {
if context != nil && (len(context.NamedArgs()) > 0 || len(context.PositionalArgs()) > 0) {
var err error
pred, err = base.ReplaceParameters(pred, context.NamedArgs(), context.PositionalArgs())
if err != nil {
return false
}
}
return base.SubsetOf(pred, cond)
}

var _INDEX_POOL = datastore.NewIndexPool(256)
var _SKIP_POOL = datastore.NewIndexBoolPool(32)
var _EMPTY_PLAN = plan.NewValueScan(algebra.Pairs{}, OPT_COST_NOT_AVAIL, OPT_CARD_NOT_AVAIL, OPT_SIZE_NOT_AVAIL, OPT_COST_NOT_AVAIL)
11 changes: 10 additions & 1 deletion planner/build_scan_dynamic.go
Expand Up @@ -44,9 +44,18 @@ func (this *builder) buildDynamicScan(node *algebra.KeyspaceTerm,
alias := expression.NewIdentifier(node.Alias())
alias.SetKeyspaceAlias(true)

subset := pred
if len(this.context.NamedArgs()) > 0 || len(this.context.PositionalArgs()) > 0 {
var err error
subset, err = base.ReplaceParameters(subset, this.context.NamedArgs(), this.context.PositionalArgs())
if err != nil {
return nil, 0, err
}
}

outer:
for i, e := range arrays {
if e.cond != nil && !base.SubsetOf(pred, e.cond) {
if e.cond != nil && !base.SubsetOf(subset, e.cond) {
continue
}

Expand Down
8 changes: 1 addition & 7 deletions planner/build_scan_secondary.go
Expand Up @@ -254,13 +254,7 @@ func (this *builder) sargableIndexes(indexes []datastore.Index, pred, subset exp

flexPred := pred
if len(this.context.NamedArgs()) > 0 || len(this.context.PositionalArgs()) > 0 {
namedArgs := this.context.NamedArgs()
positionalArgs := this.context.PositionalArgs()
subset, err = base.ReplaceParameters(subset, namedArgs, positionalArgs)
if err != nil {
return
}
flexPred, err = base.ReplaceParameters(flexPred, namedArgs, positionalArgs)
flexPred, err = base.ReplaceParameters(flexPred, this.context.NamedArgs(), this.context.PositionalArgs())
if err != nil {
return
}
Expand Down
18 changes: 9 additions & 9 deletions planner/build_scan_unnest.go
Expand Up @@ -56,7 +56,7 @@ dimension.
*/
func (this *builder) buildUnnestScan(node *algebra.KeyspaceTerm, from algebra.FromTerm,
pred expression.Expression, indexes map[datastore.Index]*indexEntry, hasDeltaKeyspace bool) (
pred, subset expression.Expression, indexes map[datastore.Index]*indexEntry, hasDeltaKeyspace bool) (
op plan.SecondaryScan, sargLength int, err error) {

if pred == nil {
Expand All @@ -83,15 +83,15 @@ func (this *builder) buildUnnestScan(node *algebra.KeyspaceTerm, from algebra.Fr
}

// Enumerate candidate array indexes
unnestIndexes, arrayKeys := collectUnnestIndexes(pred, indexes)
unnestIndexes, arrayKeys := collectUnnestIndexes(subset, indexes)
if nil != unnestIndexes {
defer _INDEX_POOL.Put(unnestIndexes)
}
if len(unnestIndexes) == 0 {
return nil, 0, nil
}

cop, sargLength, err := this.buildCoveringUnnestScan(node, pred, indexes, unnestIndexes, arrayKeys,
cop, sargLength, err := this.buildCoveringUnnestScan(node, pred, subset, indexes, unnestIndexes, arrayKeys,
unnests, hasDeltaKeyspace)
if cop != nil || err != nil {
return cop, sargLength, err
Expand All @@ -105,7 +105,7 @@ func (this *builder) buildUnnestScan(node *algebra.KeyspaceTerm, from algebra.Fr
for _, unnest := range primaryUnnests {
for _, index := range unnestIndexes {
arrayKey := arrayKeys[index]
op, _, _, n, err = this.matchUnnest(node, pred, unnest, indexes[index],
op, _, _, n, err = this.matchUnnest(node, pred, subset, unnest, indexes[index],
arrayKey, unnests, hasDeltaKeyspace)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -258,7 +258,7 @@ func collectPrimaryUnnests(from algebra.FromTerm, unnests []*algebra.Unnest) []*
/*
Enumerate array indexes for UNNEST.
*/
func collectUnnestIndexes(pred expression.Expression, indexes map[datastore.Index]*indexEntry) (
func collectUnnestIndexes(subset expression.Expression, indexes map[datastore.Index]*indexEntry) (
[]datastore.Index, map[datastore.Index]*expression.All) {
var unnestIndexes []datastore.Index

Expand All @@ -276,7 +276,7 @@ func collectUnnestIndexes(pred expression.Expression, indexes map[datastore.Inde
}

if entry.cond != nil &&
!base.SubsetOf(pred, entry.cond) {
!base.SubsetOf(subset, entry.cond) {
continue
}

Expand All @@ -287,7 +287,7 @@ func collectUnnestIndexes(pred expression.Expression, indexes map[datastore.Inde
return unnestIndexes, arrayKeys
}

func (this *builder) matchUnnest(node *algebra.KeyspaceTerm, pred expression.Expression, unnest *algebra.Unnest,
func (this *builder) matchUnnest(node *algebra.KeyspaceTerm, pred, subset expression.Expression, unnest *algebra.Unnest,
entry *indexEntry, arrayKey *expression.All, unnests []*algebra.Unnest, hasDeltaKeyspace bool) (
plan.SecondaryScan, *algebra.Unnest, *expression.All, int, error) {

Expand Down Expand Up @@ -341,7 +341,7 @@ func (this *builder) matchUnnest(node *algebra.KeyspaceTerm, pred expression.Exp
}
}

if when != nil && !base.SubsetOf(pred, when) {
if when != nil && !base.SubsetOf(subset, when) {
return nil, nil, nil, 0, nil
}

Expand All @@ -353,7 +353,7 @@ func (this *builder) matchUnnest(node *algebra.KeyspaceTerm, pred expression.Exp
continue
}

op, un, nArrayKey, n, err := this.matchUnnest(node, pred, u, entry,
op, un, nArrayKey, n, err := this.matchUnnest(node, pred, subset, u, entry,
nestedArrayKey, unnests, hasDeltaKeyspace)
if err != nil {
return nil, nil, nil, 0, err
Expand Down
8 changes: 4 additions & 4 deletions planner/build_scan_unnest_cov.go
Expand Up @@ -16,7 +16,7 @@ import (
base "github.com/couchbase/query/plannerbase"
)

func (this *builder) buildCoveringUnnestScan(node *algebra.KeyspaceTerm, pred expression.Expression,
func (this *builder) buildCoveringUnnestScan(node *algebra.KeyspaceTerm, pred, subset expression.Expression,
indexes map[datastore.Index]*indexEntry, unnestIndexes []datastore.Index,
arrayKeys map[datastore.Index]*expression.All, unnests []*algebra.Unnest, hasDeltaKeyspace bool) (
plan.SecondaryScan, int, error) {
Expand All @@ -36,7 +36,7 @@ func (this *builder) buildCoveringUnnestScan(node *algebra.KeyspaceTerm, pred ex
this.restoreIndexPushDowns(indexPushDowns, true)

entry := indexes[index]
cop, cun, un, err := this.buildOneCoveringUnnestScan(node, pred, entry, arrayKeys[index],
cop, cun, un, err := this.buildOneCoveringUnnestScan(node, pred, subset, entry, arrayKeys[index],
unnests, hasDeltaKeyspace)
if err != nil {
return nil, 0, err
Expand Down Expand Up @@ -87,12 +87,12 @@ func (this *builder) buildCoveringUnnestScan(node *algebra.KeyspaceTerm, pred ex
return nil, 0, nil
}

func (this *builder) buildOneCoveringUnnestScan(node *algebra.KeyspaceTerm, pred expression.Expression,
func (this *builder) buildOneCoveringUnnestScan(node *algebra.KeyspaceTerm, pred, subset expression.Expression,
entry *indexEntry, arrKey *expression.All, unnests []*algebra.Unnest, hasDeltaKeyspace bool) (
plan.SecondaryScan, map[*algebra.Unnest]bool, *algebra.Unnest, error) {

// Sarg and populate spans
op, unnest, arrayKey, _, err := this.matchUnnest(node, pred, unnests[0], entry, arrKey, unnests, hasDeltaKeyspace)
op, unnest, arrayKey, _, err := this.matchUnnest(node, pred, subset, unnests[0], entry, arrKey, unnests, hasDeltaKeyspace)
if op == nil || err != nil {
return nil, nil, nil, err
}
Expand Down
11 changes: 0 additions & 11 deletions planner/sarg.go
Expand Up @@ -285,14 +285,3 @@ func getSargSpans(pred expression.Expression, sargKeys expression.Expressions, i

return sargSpans, exactSpan, nil
}

func sargCheckWhen(pred, when expression.Expression, context *PrepareContext) bool {
if context != nil && (len(context.NamedArgs()) > 0 || len(context.PositionalArgs()) > 0) {
var err error
pred, err = base.ReplaceParameters(pred, context.NamedArgs(), context.PositionalArgs())
if err != nil {
return false
}
}
return base.SubsetOf(pred, when)
}
2 changes: 1 addition & 1 deletion planner/sarg_any.go
Expand Up @@ -65,7 +65,7 @@ func (this *sarg) VisitAny(pred *expression.Any) (interface{}, error) {
return nil, err
}

if array.When() != nil && !sargCheckWhen(satisfies, array.When(), this.context) {
if array.When() != nil && !checkSubset(satisfies, array.When(), this.context) {
return sp, nil
}

Expand Down
2 changes: 1 addition & 1 deletion planner/sarg_any_every.go
Expand Up @@ -63,7 +63,7 @@ func (this *sarg) VisitAnyEvery(pred *expression.AnyEvery) (interface{}, error)
return nil, err
}

if array.When() != nil && !sargCheckWhen(satisfies, array.When(), this.context) {
if array.When() != nil && !checkSubset(satisfies, array.When(), this.context) {
return sp, nil
}

Expand Down
2 changes: 1 addition & 1 deletion planner/sargable_any.go
Expand Up @@ -41,7 +41,7 @@ func (this *sargable) VisitAny(pred *expression.Any) (interface{}, error) {
return nil, err
}

if array.When() != nil && !sargCheckWhen(satisfies, array.When(), this.context) {
if array.When() != nil && !checkSubset(satisfies, array.When(), this.context) {
return false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion planner/sargable_any_every.go
Expand Up @@ -41,7 +41,7 @@ func (this *sargable) VisitAnyEvery(pred *expression.AnyEvery) (interface{}, err
return nil, err
}

if array.When() != nil && !sargCheckWhen(satisfies, array.When(), this.context) {
if array.When() != nil && !checkSubset(satisfies, array.When(), this.context) {
return false, nil
}

Expand Down

0 comments on commit c9abaf2

Please sign in to comment.