Skip to content

Commit

Permalink
changefeedccl: Correctly handle stable CDC query functions
Browse files Browse the repository at this point in the history
Custom CDC query functions rely on having "annotations" configured.
Prior to this change, these annotations were configured when the
CDC query was being evaluated (for each event).  However, CDC
query also needs to be evaluated when e.g. the changefeed is
being created.  In this case, the correct annotations were not
configured, resulting in failure to create changefeed
that use stable, custom CDC function.  At this point, there
is only one such function: `changefeed_creation_time`.

This PR refactors and cleans up how semantic and evalution
contexts are configured.  This now happens in a single place --
namely the `withPlanner` helper so that correct information
is configured at all times.

Fixes #115245

Release note (enterprise change): Fix CDC query to correctly
handle `changefeed_creation_time()` function.
  • Loading branch information
Yevgeniy Miretskiy committed Jan 8, 2024
1 parent 58502d1 commit 3ddb521
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 44 deletions.
43 changes: 21 additions & 22 deletions pkg/ccl/changefeedccl/cdceval/expr_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,13 @@ type familyEvaluator struct {
rowCh chan tree.Datums
projection cdcevent.Projection

statementTS hlc.Timestamp
withDiff bool

// rowEvalCtx contains state necessary to evaluate expressions.
// updated for each row.
rowEvalCtx rowEvalContext
// Initialized during preparePlan().
rowEvalCtx *rowEvalContext
}

// NewEvaluator constructs new evaluator for changefeed expression.
Expand Down Expand Up @@ -113,10 +117,10 @@ func newFamilyEvaluator(
norm: &NormalizedSelectClause{
SelectClause: sc,
},
rowCh: make(chan tree.Datums, 1),
rowCh: make(chan tree.Datums, 1),
statementTS: statementTS,
withDiff: withDiff,
}
e.rowEvalCtx.startTime = statementTS
e.rowEvalCtx.withDiff = withDiff

// Arrange to be notified when event does not match predicate.
predicateAsProjection(e.norm)
Expand Down Expand Up @@ -282,18 +286,12 @@ func (e *familyEvaluator) preparePlan(
e.cleanup = nil
}

err = withPlanner(
ctx, e.execCfg, e.user, e.currDesc.SchemaTS, e.sessionData,
err = withPlanner(ctx, e.execCfg, e.statementTS, e.user, e.currDesc.SchemaTS, e.sessionData,
func(ctx context.Context, execCtx sql.JobExecContext, cleanup func()) error {
e.cleanup = cleanup
semaCtx := execCtx.SemaCtx()
semaCtx.FunctionResolver = newCDCFunctionResolver(semaCtx.FunctionResolver)
semaCtx.Properties.Require("cdc", rejectInvalidCDCExprs)
semaCtx.Annotations = tree.MakeAnnotations(cdcAnnotationAddr)

evalCtx := execCtx.ExtendedEvalContext().Context
evalCtx.Annotations = &semaCtx.Annotations
evalCtx.Annotations.Set(cdcAnnotationAddr, &e.rowEvalCtx)
e.rowEvalCtx = rowEvalContextFromEvalContext(&execCtx.ExtendedEvalContext().Context)
e.rowEvalCtx.withDiff = e.withDiff
e.rowEvalCtx.creationTime = e.statementTS

e.norm.desc = e.currDesc
requiresPrev := e.prevDesc != nil
Expand All @@ -310,8 +308,7 @@ func (e *familyEvaluator) preparePlan(

plan, err = sql.PlanCDCExpression(ctx, execCtx, e.norm.SelectStatementForFamily(), opts...)
return err
},
)
})
if err != nil {
return sql.CDCExpressionPlan{}, nil, err
}
Expand Down Expand Up @@ -506,11 +503,11 @@ func (e *familyEvaluator) closeErr() error {

// rowEvalContext represents the context needed to evaluate row expressions.
type rowEvalContext struct {
ctx context.Context
startTime hlc.Timestamp
withDiff bool
updatedRow cdcevent.Row
op tree.Datum
ctx context.Context
creationTime hlc.Timestamp
withDiff bool
updatedRow cdcevent.Row
op tree.Datum
}

// cdcAnnotationAddr is the address used to store relevant information
Expand All @@ -528,10 +525,12 @@ const rejectInvalidCDCExprs = tree.RejectAggregates | tree.RejectGenerators |

// configSemaForCDC configures existing semaCtx to be used for CDC expression
// evaluation; returns cleanup function which restores previous configuration.
func configSemaForCDC(semaCtx *tree.SemaContext) func() {
func configSemaForCDC(semaCtx *tree.SemaContext, statementTS hlc.Timestamp) func() {
origProps, origResolver := semaCtx.Properties, semaCtx.FunctionResolver
semaCtx.FunctionResolver = newCDCFunctionResolver(semaCtx.FunctionResolver)
semaCtx.Properties.Require("cdc", rejectInvalidCDCExprs)
semaCtx.Annotations = tree.MakeAnnotations(cdcAnnotationAddr)
semaCtx.Annotations.Set(cdcAnnotationAddr, &rowEvalContext{creationTime: statementTS})

return func() {
semaCtx.Properties.Restore(origProps)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/cdceval/func_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -81,9 +82,8 @@ $$`)
for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
var funcDef *tree.ResolvedFunctionDefinition
err := withPlanner(
context.Background(), &execCfg, username.RootUserName(),
s.Clock().Now(), defaultDBSessionData,
err := withPlanner(context.Background(), &execCfg, hlc.Timestamp{},
username.RootUserName(), s.Clock().Now(), defaultDBSessionData,
func(ctx context.Context, execCtx sql.JobExecContext, cleanup func()) (err error) {
defer cleanup()
semaCtx := execCtx.SemaCtx()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdceval/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{
volatility.Stable,
types.Decimal,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
return rowEvalCtx.startTime
return rowEvalCtx.creationTime
},
),
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/cdceval/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) {
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)

semaCtx := tree.MakeSemaContext()
defer configSemaForCDC(&semaCtx)()
defer configSemaForCDC(&semaCtx, hlc.Timestamp{})()

t.Run("time", func(t *testing.T) {
expectTSTZ := func(ts hlc.Timestamp) tree.Datum {
Expand Down Expand Up @@ -433,7 +433,7 @@ func newEvaluator(
return nil, err
}

defer configSemaForCDC(semaCtx)()
defer configSemaForCDC(semaCtx, hlc.Timestamp{})()
norm, err := normalizeSelectClause(context.Background(), semaCtx, sc, ed)
if err != nil {
return nil, err
Expand Down
27 changes: 15 additions & 12 deletions pkg/ccl/changefeedccl/cdceval/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ func NormalizeExpression(
) (norm *NormalizedSelectClause, withDiff bool, _ error) {
// Even though we have a job exec context, we shouldn't muck with it.
// Make our own copy of the planner instead.
if err := withPlanner(
ctx, execCtx.ExecCfg(), execCtx.User(), schemaTS, execCtx.SessionData(),
if err := withPlanner(ctx, execCtx.ExecCfg(), schemaTS, execCtx.User(), schemaTS, execCtx.SessionData(),
func(ctx context.Context, execCtx sql.JobExecContext, cleanup func()) (err error) {
defer cleanup()
norm, withDiff, err = normalizeExpression(ctx, execCtx, descr, schemaTS, target, sc, splitFams)
return err
},
); err != nil {
}); err != nil {
return nil, false, withErrorHint(err, target.FamilyName, descr.NumFamilies() > 1)
}
return
Expand All @@ -73,8 +71,6 @@ func normalizeExpression(
return nil, false, changefeedbase.WithTerminalError(err)
}

defer configSemaForCDC(execCtx.SemaCtx())()

// Add cdc_prev column; we may or may not need it, but we'll check below.
prevCol, err := newPrevColumnForDesc(norm.desc)
if err != nil {
Expand Down Expand Up @@ -119,10 +115,9 @@ func SpansForExpression(
}

var plan sql.CDCExpressionPlan
if err := withPlanner(ctx, execCfg, user, schemaTS, sd,
if err := withPlanner(ctx, execCfg, hlc.Timestamp{}, user, schemaTS, sd,
func(ctx context.Context, execCtx sql.JobExecContext, cleanup func()) error {
defer cleanup()
defer configSemaForCDC(execCtx.SemaCtx())()
norm := &NormalizedSelectClause{SelectClause: sc, desc: d}

// Add cdc_prev column; we may or may not need it, add it just in case
Expand All @@ -136,8 +131,7 @@ func SpansForExpression(
norm.SelectStatementForFamily(), sql.WithExtraColumn(prevCol))
return err

},
); err != nil {
}); err != nil {
return nil, withErrorHint(err, d.FamilyName, d.HasOtherFamilies)
}

Expand All @@ -164,6 +158,7 @@ func withErrorHint(err error, targetFamily string, multiFamily bool) error {
func withPlanner(
ctx context.Context,
execCfg *sql.ExecutorConfig,
statementTS hlc.Timestamp,
user username.SQLUsername,
schemaTS hlc.Timestamp,
sd *sessiondata.SessionData,
Expand All @@ -177,14 +172,22 @@ func withPlanner(
// Current implementation relies on row-by-row evaluation;
// so, ensure vectorized engine is off.
sd.VectorizeMode = sessiondatapb.VectorizeOff
planner, cleanup := sql.NewInternalPlanner(
planner, plannerCleanup := sql.NewInternalPlanner(
"cdc-expr", txn.KV(),
user,
&sql.MemoryMetrics{}, // TODO(yevgeniy): Use appropriate metrics.
execCfg,
sd,
sql.WithDescCollection(col),
)
return fn(ctx, planner.(sql.JobExecContext), cleanup)

execCtx := planner.(sql.JobExecContext)
semaCleanup := configSemaForCDC(execCtx.SemaCtx(), statementTS)
cleanup := func() {
semaCleanup()
plannerCleanup()
}

return fn(ctx, execCtx, cleanup)
})
}
6 changes: 2 additions & 4 deletions pkg/ccl/changefeedccl/cdceval/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,9 @@ func normalizeAndPlan(
sc *tree.SelectClause,
splitFams bool,
) (norm *NormalizedSelectClause, withDiff bool, plan sql.CDCExpressionPlan, err error) {
if err := withPlanner(ctx, execCfg, user, schemaTS, sd,
if err := withPlanner(ctx, execCfg, schemaTS, user, schemaTS, sd,
func(ctx context.Context, execCtx sql.JobExecContext, cleanup func()) error {
defer cleanup()
defer configSemaForCDC(execCtx.SemaCtx())()

norm, withDiff, err = NormalizeExpression(ctx, execCtx, descr, schemaTS, target, sc, splitFams)
if err != nil {
Expand All @@ -338,8 +337,7 @@ func normalizeAndPlan(
plan, err = sql.PlanCDCExpression(ctx, execCtx,
norm.SelectStatementForFamily(), sql.WithExtraColumn(prevCol))
return err
},
); err != nil {
}); err != nil {
return nil, false, sql.CDCExpressionPlan{}, err
}
return norm, withDiff, plan, nil
Expand Down
16 changes: 16 additions & 0 deletions pkg/ccl/changefeedccl/cdceval/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package cdceval

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand All @@ -18,9 +19,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -206,6 +209,19 @@ func TestNormalizeAndValidate(t *testing.T) {
stmt: "SELECT *, cdc_prev() FROM foo AS bar",
expectErr: `unknown function: cdc_prev()`,
},
{ // Regression for #115245
name: "changefeed_creation_timestamp",
desc: fooDesc,
stmt: "SELECT * FROM foo WHERE changefeed_creation_timestamp() != changefeed_creation_timestamp()",
expectErr: `does not match any rows`,
},
{ // Regression for #115245
name: "changefeed_creation_timestamp",
desc: fooDesc,
stmt: fmt.Sprintf("SELECT * FROM foo WHERE changefeed_creation_timestamp() = %s",
tree.AsStringWithFlags(eval.TimestampToDecimalDatum(hlc.Timestamp{WallTime: 42}), tree.FmtExport)),
expectErr: `does not match any rows`,
},
} {
t.Run(tc.name, func(t *testing.T) {
sc, err := ParseChangefeedExpression(tc.stmt)
Expand Down

0 comments on commit 3ddb521

Please sign in to comment.