Skip to content

Commit

Permalink
Merge #42198
Browse files Browse the repository at this point in the history
42198: colexec: clean up remapping of indexed vars for filter expressions r=yuzefovich a=yuzefovich

**colexec: fix handling of filters by joiners**

Previously, in order to handle ON expression of the joiners, we would
modify the expression itself (i.e. we would remap the IndexedVars inside
of the expression). However, this approach is error-prone and doesn't
work in all cases (consider an example when we have a filter like
"@1 = 'abc@2def'" - @2 is not an ordinal, but it would get treated as an
IndexedVar with index 1). Now we have enhanced the IndexedVarHelper to
handle the remapping internally when the IndexedVar is being bound to
a container. This way no modifications to the actual expressions are
needed, and such approach should work in all cases.

Fixes: #41407.
Fixes: #41944.
Fixes: #42100.

**sem/tree: remove unused field from IndexedVar**

This commit removes 'bindInPlace' field from IndexedVar struct since it
is not being used anywhere.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Nov 6, 2019
2 parents cb07f2a + baa64b7 commit e9dceea
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 102 deletions.
101 changes: 32 additions & 69 deletions pkg/sql/colexec/execplan.go
Expand Up @@ -15,7 +15,6 @@ import (
"fmt"
"math"
"reflect"
"strings"

"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror"
Expand Down Expand Up @@ -196,9 +195,12 @@ func createJoiner(

if !post.Filter.Empty() {
planningState.postFilterPlanning = makeFilterPlanningState(len(leftTypes), len(rightTypes))
leftOutCols, rightOutCols = planningState.postFilterPlanning.renderAllNeededCols(
leftOutCols, rightOutCols, err = planningState.postFilterPlanning.renderAllNeededCols(
post.Filter, leftOutCols, rightOutCols,
)
if err != nil {
return err
}
}

var (
Expand All @@ -215,8 +217,7 @@ func createJoiner(
result.setProjectedByJoinerColumnTypes(spec, leftOutCols, rightOutCols)

if onExpr != nil && joinType == sqlbase.JoinType_INNER {
remappedOnExpr := onExprPlanning.remapIVars(*onExpr)
err = result.planFilterExpr(flowCtx.NewEvalCtx(), remappedOnExpr)
err = result.planFilterExpr(flowCtx.NewEvalCtx(), *onExpr, onExprPlanning.indexVarMap)
onExprPlanning.projectOutExtraCols(result)
}
return err
Expand Down Expand Up @@ -434,9 +435,12 @@ func NewColOperator(
}
onExpr = &core.HashJoiner.OnExpr
onExprPlanning = makeFilterPlanningState(len(leftTypes), len(rightTypes))
leftOutCols, rightOutCols = onExprPlanning.renderAllNeededCols(
leftOutCols, rightOutCols, err = onExprPlanning.renderAllNeededCols(
*onExpr, leftOutCols, rightOutCols,
)
if err != nil {
return onExpr, onExprPlanning, leftOutCols, rightOutCols, err
}
}

result.Op, err = NewEqHashJoinerOp(
Expand Down Expand Up @@ -492,26 +496,29 @@ func NewColOperator(
onExprPlanning = makeFilterPlanningState(len(leftTypes), len(rightTypes))
switch core.MergeJoiner.Type {
case sqlbase.JoinType_INNER:
leftOutCols, rightOutCols = onExprPlanning.renderAllNeededCols(
leftOutCols, rightOutCols, err = onExprPlanning.renderAllNeededCols(
*onExpr, leftOutCols, rightOutCols,
)
case sqlbase.JoinType_LEFT_SEMI, sqlbase.JoinType_LEFT_ANTI:
filterOnlyOnLeft = onExprPlanning.isFilterOnlyOnLeft(*onExpr)
filterOnlyOnLeft, err = onExprPlanning.isFilterOnlyOnLeft(*onExpr)
filterConstructor = func(op Operator) (Operator, error) {
r := NewColOperatorResult{
Op: op,
ColumnTypes: append(spec.Input[0].ColumnTypes, spec.Input[1].ColumnTypes...),
}
// We don't need to remap the indexed vars in onExpr because the
// filter will be run alongside the merge joiner, and it will have
// access to all of the columns from both sides.
err := r.planFilterExpr(flowCtx.NewEvalCtx(), *onExpr)
// We don't need to specify indexVarMap because the filter will be
// run alongside the merge joiner, and it will have access to all
// of the columns from both sides.
err := r.planFilterExpr(flowCtx.NewEvalCtx(), *onExpr, nil /* indexVarMap */)
return r.Op, err
}
default:
return onExpr, onExprPlanning, leftOutCols, rightOutCols, errors.Errorf("can only plan INNER, LEFT SEMI, and LEFT ANTI merge joins with ON expressions")
}
}
if err != nil {
return onExpr, onExprPlanning, leftOutCols, rightOutCols, err
}

result.Op, err = NewMergeJoinOp(
core.MergeJoiner.Type,
Expand Down Expand Up @@ -692,8 +699,7 @@ func NewColOperator(
// legal for empty rows to be passed between processors).

if !post.Filter.Empty() {
filterExpr := planningState.postFilterPlanning.remapIVars(post.Filter)
if err = result.planFilterExpr(flowCtx.NewEvalCtx(), filterExpr); err != nil {
if err = result.planFilterExpr(flowCtx.NewEvalCtx(), post.Filter, planningState.postFilterPlanning.indexVarMap); err != nil {
return result, err
}
planningState.postFilterPlanning.projectOutExtraCols(&result)
Expand Down Expand Up @@ -773,12 +779,15 @@ func makeFilterPlanningState(numLeftInputCols, numRightInputCols int) filterPlan
// NOTE: projectOutExtraCols must be called after the filter has been run.
func (p *filterPlanningState) renderAllNeededCols(
filter execinfrapb.Expression, leftOutCols []uint32, rightOutCols []uint32,
) ([]uint32, []uint32) {
neededColumnsForFilter := findIVarsInRange(
) ([]uint32, []uint32, error) {
neededColumnsForFilter, err := findIVarsInRange(
filter,
0, /* start */
p.numLeftInputCols+p.numRightInputCols,
)
if err != nil {
return nil, nil, errors.Errorf("error parsing filter expression %q: %s", filter, err)
}
if len(neededColumnsForFilter) > 0 {
// Store the original out columns to be restored later.
p.originalLeftOutCols = leftOutCols
Expand Down Expand Up @@ -842,66 +851,20 @@ func (p *filterPlanningState) renderAllNeededCols(
}
}
}
return leftOutCols, rightOutCols
return leftOutCols, rightOutCols, nil
}

// isFilterOnlyOnLeft returns whether the filter expression doesn't use columns
// from the right side.
func (p *filterPlanningState) isFilterOnlyOnLeft(filter execinfrapb.Expression) bool {
func (p *filterPlanningState) isFilterOnlyOnLeft(filter execinfrapb.Expression) (bool, error) {
// Find all needed columns for filter only from the right side.
neededColumnsForFilter := findIVarsInRange(
neededColumnsForFilter, err := findIVarsInRange(
filter, p.numLeftInputCols, p.numLeftInputCols+p.numRightInputCols,
)
return len(neededColumnsForFilter) == 0
}

// remapIVars remaps tree.IndexedVars in expr using p.indexVarMap. Note that if
// the remapping is needed, then a new remapped expression is returned, but if
// the remapping is not needed (which is the case when all needed by the filter
// columns were part of the projection), then the same expression is returned.
func (p *filterPlanningState) remapIVars(expr execinfrapb.Expression) execinfrapb.Expression {
if p.indexVarMap == nil {
// If p.indexVarMap is nil, then there is no remapping to do.
return expr
}
ret := execinfrapb.Expression{}
if expr.LocalExpr != nil {
ret.LocalExpr = sqlbase.RemapIVarsInTypedExpr(expr.LocalExpr, p.indexVarMap)
} else {
ret.Expr = expr.Expr
// We iterate in the reverse order so that the multiple digit numbers are
// handled correctly (consider an expression like @1 AND @11).
//
// In order not to confuse the newly replaced ordinals with the original
// ones we first remap all ordinals using "custom" ordinal symbol first
// (namely, instead of using `@1` we will use `@#1`). Consider an example
// `@2 = @4` with p.idxVarMap = {-1, 0, -1, 1}. If we didn't do this custom
// ordinal remapping, then we would get into a situation of `@2 = @2` in
// which the first @2 is original and needs to be replaced whereas the
// second one should not be touched. After the first loop, we will have
// `@#1 = @#2`.
for idx := len(p.indexVarMap) - 1; idx >= 0; idx-- {
if p.indexVarMap[idx] != -1 {
// We need +1 below because the ordinals are counting from 1.
ret.Expr = strings.ReplaceAll(
ret.Expr,
fmt.Sprintf("@%d", idx+1),
fmt.Sprintf("@#%d", p.indexVarMap[idx]+1),
)
}
}
// Now we simply need to convert the "custom" ordinal symbol by removing
// the pound sign (in the example above, after this loop we will have
// `@1 = @2`).
for idx := len(p.indexVarMap); idx > 0; idx-- {
ret.Expr = strings.ReplaceAll(
ret.Expr,
fmt.Sprintf("@#%d", idx),
fmt.Sprintf("@%d", idx),
)
}
if err != nil {
return false, errors.Errorf("error parsing filter expression %q: %s", filter, err)
}
return ret
return len(neededColumnsForFilter) == 0, nil
}

// projectOutExtraCols, possibly, adds a projection to remove all the extra
Expand Down Expand Up @@ -947,13 +910,13 @@ func (r *NewColOperatorResult) setProjectedByJoinerColumnTypes(
}

func (r *NewColOperatorResult) planFilterExpr(
evalCtx *tree.EvalContext, filter execinfrapb.Expression,
evalCtx *tree.EvalContext, filter execinfrapb.Expression, indexVarMap []int,
) error {
var (
helper execinfra.ExprHelper
selectionMem int
)
err := helper.Init(filter, r.ColumnTypes, evalCtx)
err := helper.InitWithRemapping(filter, r.ColumnTypes, evalCtx, indexVarMap)
if err != nil {
return err
}
Expand Down
34 changes: 17 additions & 17 deletions pkg/sql/colexec/expr.go
Expand Up @@ -11,36 +11,36 @@
package colexec

import (
"fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
)

// findIVarsInRange searches Expr for presence of tree.IndexedVars with indices
// in range [start, end). It returns a slice containing all such indices.
func findIVarsInRange(expr execinfrapb.Expression, start int, end int) []uint32 {
func findIVarsInRange(expr execinfrapb.Expression, start int, end int) ([]uint32, error) {
res := make([]uint32, 0)
if start >= end {
return res
return res, nil
}
var exprToWalk tree.Expr
if expr.LocalExpr != nil {
visitor := ivarExpressionVisitor{ivarSeen: make([]bool, end)}
_, _ = tree.WalkExpr(visitor, expr.LocalExpr)
for i := start; i < end; i++ {
if visitor.ivarSeen[i] {
res = append(res, uint32(i))
}
}
exprToWalk = expr.LocalExpr
} else {
for i := start; i < end; i++ {
if strings.Contains(expr.Expr, fmt.Sprintf("@%d", i+1)) {
res = append(res, uint32(i))
}
e, err := parser.ParseExpr(expr.Expr)
if err != nil {
return nil, err
}
exprToWalk = e
}
visitor := ivarExpressionVisitor{ivarSeen: make([]bool, end)}
_, _ = tree.WalkExpr(visitor, exprToWalk)
for i := start; i < end; i++ {
if visitor.ivarSeen[i] {
res = append(res, uint32(i))
}
}
return res
return res, nil
}

type ivarExpressionVisitor struct {
Expand Down
11 changes: 10 additions & 1 deletion pkg/sql/execinfra/expr.go
Expand Up @@ -142,13 +142,22 @@ func (eh *ExprHelper) IndexedVarNodeFormatter(idx int) tree.NodeFormatter {
// Init initializes the ExprHelper.
func (eh *ExprHelper) Init(
expr execinfrapb.Expression, types []types.T, evalCtx *tree.EvalContext,
) error {
return eh.InitWithRemapping(expr, types, evalCtx, nil /* indexVarMap */)
}

// InitWithRemapping initializes the ExprHelper.
// indexVarMap specifies an optional (i.e. it can be left nil) map that will be
// used to remap the indices of IndexedVars before binding them to a container.
func (eh *ExprHelper) InitWithRemapping(
expr execinfrapb.Expression, types []types.T, evalCtx *tree.EvalContext, indexVarMap []int,
) error {
if expr.Empty() {
return nil
}
eh.evalCtx = evalCtx
eh.Types = types
eh.Vars = tree.MakeIndexedVarHelper(eh, len(types))
eh.Vars = tree.MakeIndexedVarHelperWithRemapping(eh, len(types), indexVarMap)

if expr.LocalExpr != nil {
eh.Expr = expr.LocalExpr
Expand Down
31 changes: 31 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/exec_hash_join
Expand Up @@ -124,3 +124,34 @@ x y
NULL NULL
NULL 42
NULL 44

# Regression test for #41407.
statement ok
CREATE TABLE t41407 AS
SELECT
g AS _float8,
g % 0 = 0 AS _bool,
g AS _decimal,
g AS _string,
g AS _bytes
FROM
generate_series(NULL, NULL) AS g;

query TRTTRRBR
SELECT
tab_1688._bytes,
tab_1688._float8,
tab_1689._string,
tab_1689._string,
tab_1688._float8,
tab_1688._float8,
tab_1689._bool,
tab_1690._decimal
FROM
t41407 AS tab_1688
JOIN t41407 AS tab_1689
JOIN t41407 AS tab_1690 ON
tab_1689._bool = tab_1690._bool ON
tab_1688._float8 = tab_1690._float8
AND tab_1688._bool = tab_1689._bool;
----

0 comments on commit e9dceea

Please sign in to comment.