diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 7d744d6711e7..2d32b70934f0 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -281,4 +281,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 22.1-22 set the active cluster version in the format '.' +version version 22.1-24 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 65564aac86f5..afb103b53e7f 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -212,6 +212,6 @@ trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabledbooleantrueif set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion22.1-22set the active cluster version in the format '.' +versionversion22.1-24set the active cluster version in the format '.' diff --git a/docs/generated/sql/bnf/create_changefeed_stmt.bnf b/docs/generated/sql/bnf/create_changefeed_stmt.bnf index 3355080f1760..7727823eb58a 100644 --- a/docs/generated/sql/bnf/create_changefeed_stmt.bnf +++ b/docs/generated/sql/bnf/create_changefeed_stmt.bnf @@ -4,3 +4,8 @@ create_changefeed_stmt ::= | 'CREATE' 'CHANGEFEED' 'FOR' changefeed_target ( ( ',' changefeed_target ) )* 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )* | 'CREATE' 'CHANGEFEED' 'FOR' changefeed_target ( ( ',' changefeed_target ) )* 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )* | 'CREATE' 'CHANGEFEED' 'FOR' changefeed_target ( ( ',' changefeed_target ) )* 'INTO' sink + | 'CREATE' 'CHANGEFEED' 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )* 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause + | 'CREATE' 'CHANGEFEED' 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )* 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause + | 'CREATE' 'CHANGEFEED' 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )* 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause + | 'CREATE' 'CHANGEFEED' 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )* 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause + | 'CREATE' 'CHANGEFEED' 'INTO' sink 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index f81c685cab94..46f1299fb6b6 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -537,6 +537,7 @@ create_schedule_for_backup_stmt ::= create_changefeed_stmt ::= 'CREATE' 'CHANGEFEED' 'FOR' changefeed_targets opt_changefeed_sink opt_with_options + | 'CREATE' 'CHANGEFEED' opt_changefeed_sink opt_with_options 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause create_extension_stmt ::= 'CREATE' 'EXTENSION' 'IF' 'NOT' 'EXISTS' name @@ -1644,6 +1645,12 @@ changefeed_targets ::= opt_changefeed_sink ::= 'INTO' string_or_placeholder +target_list ::= + ( target_elem ) ( ( ',' target_elem ) )* + +changefeed_target_expr ::= + insert_target + with_clause ::= 'WITH' cte_list | 'WITH' 'RECURSIVE' cte_list @@ -1660,9 +1667,6 @@ limit_clause ::= | 'FETCH' first_or_next select_fetch_first_value row_or_rows 'ONLY' | 'FETCH' first_or_next row_or_rows 'ONLY' -target_list ::= - ( target_elem ) ( ( ',' target_elem ) )* - drop_database_stmt ::= 'DROP' 'DATABASE' database_name opt_drop_behavior | 'DROP' 'DATABASE' 'IF' 'EXISTS' database_name opt_drop_behavior @@ -2263,10 +2267,13 @@ opt_sequence_option_list ::= | changefeed_target ::= - 'TABLE' table_name - | table_name - | 'TABLE' table_name 'FAMILY' family_name - | table_name 'FAMILY' family_name + opt_table_prefix table_name opt_changefeed_family + +target_elem ::= + a_expr 'AS' target_name + | a_expr 'identifier' + | a_expr + | '*' cte_list ::= ( common_table_expr ) ( ( ',' common_table_expr ) )* @@ -2301,12 +2308,6 @@ row_or_rows ::= 'ROW' | 'ROWS' -target_elem ::= - a_expr 'AS' target_name - | a_expr 'identifier' - | a_expr - | '*' - table_index_name_list ::= ( table_index_name ) ( ( ',' table_index_name ) )* @@ -2787,8 +2788,16 @@ create_as_table_defs ::= enum_val_list ::= ( 'SCONST' ) ( ( ',' 'SCONST' ) )* -family_name ::= - name +opt_table_prefix ::= + 'TABLE' + | + +opt_changefeed_family ::= + 'FAMILY' family_name + | + +target_name ::= + unrestricted_name common_table_expr ::= table_alias_name opt_column_list 'AS' '(' preparable_stmt ')' @@ -2806,9 +2815,6 @@ only_signed_fconst ::= '+' 'FCONST' | '-' 'FCONST' -target_name ::= - unrestricted_name - scrub_option ::= 'INDEX' 'ALL' | 'INDEX' '(' name_list ')' @@ -3151,6 +3157,9 @@ family_def ::= create_as_constraint_def ::= create_as_constraint_elem +family_name ::= + name + materialize_clause ::= 'MATERIALIZED' | 'NOT' 'MATERIALIZED' diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 7b4863dddd48..5fc6ccfaa90a 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -34,10 +34,10 @@ go_library( deps = [ "//pkg/base", "//pkg/ccl/backupccl/backupresolver", + "//pkg/ccl/changefeedccl/cdceval", "//pkg/ccl/changefeedccl/cdcevent", "//pkg/ccl/changefeedccl/cdcutils", "//pkg/ccl/changefeedccl/changefeedbase", - "//pkg/ccl/changefeedccl/changefeeddist", "//pkg/ccl/changefeedccl/changefeedvalidators", "//pkg/ccl/changefeedccl/kvevent", "//pkg/ccl/changefeedccl/kvfeed", @@ -77,6 +77,7 @@ go_library( "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/pgwire/pgnotice", + "//pkg/sql/physicalplan", "//pkg/sql/privilege", "//pkg/sql/roleoption", "//pkg/sql/rowenc", @@ -157,10 +158,10 @@ go_test( deps = [ "//pkg/base", "//pkg/blobs", + "//pkg/ccl/changefeedccl/cdceval", "//pkg/ccl/changefeedccl/cdcevent", "//pkg/ccl/changefeedccl/cdctest", "//pkg/ccl/changefeedccl/changefeedbase", - "//pkg/ccl/changefeedccl/changefeeddist", "//pkg/ccl/changefeedccl/kvevent", "//pkg/ccl/changefeedccl/kvfeed", "//pkg/ccl/changefeedccl/schemafeed", @@ -193,6 +194,7 @@ go_test( "//pkg/settings/cluster", "//pkg/spanconfig", "//pkg/spanconfig/spanconfigptsreader", + "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/colinfo", @@ -215,6 +217,7 @@ go_test( "//pkg/sql/sem/tree", "//pkg/sql/sem/volatility", "//pkg/sql/sessiondata", + "//pkg/sql/sessiondatapb", "//pkg/sql/sqlliveness", "//pkg/sql/sqlliveness/sqllivenesstestutils", "//pkg/sql/tests", diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index aa9e9ba1679f..3e6807b70318 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -1280,6 +1281,13 @@ func TestAlterChangefeedUpdateFilter(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + // Skip this test for now. It used to test alter changefeed with + // now deprecated and removed 'primary_key_filter' option. + // Since predicates and projections are no longer a "string" option, + // alter statement implementation (and grammar) needs to be updated, and + // this test modified and re-enabled. + skip.WithIssue(t, 82491) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) diff --git a/pkg/ccl/changefeedccl/bench_test.go b/pkg/ccl/changefeedccl/bench_test.go index 53d97078dfca..1982c82c0710 100644 --- a/pkg/ccl/changefeedccl/bench_test.go +++ b/pkg/ccl/changefeedccl/bench_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/distsql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -247,8 +248,10 @@ func createBenchmarkChangefeed( return nil, nil, err } serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig - eventConsumer, err := newKVEventToRowConsumer(ctx, &serverCfg, sf, initialHighWater, - sink, encoder, makeChangefeedConfigFromJobDetails(details), TestingKnobs{}, nil) + eventConsumer, err := newKVEventToRowConsumer(ctx, &serverCfg, nil, sf, initialHighWater, + sink, encoder, makeChangefeedConfigFromJobDetails(details), + execinfrapb.Expression{}, TestingKnobs{}, nil) + if err != nil { return nil, nil, err } diff --git a/pkg/ccl/changefeedccl/cdceval/BUILD.bazel b/pkg/ccl/changefeedccl/cdceval/BUILD.bazel index dad2a2730ba2..b4aa76078235 100644 --- a/pkg/ccl/changefeedccl/cdceval/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdceval/BUILD.bazel @@ -3,16 +3,27 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "cdceval", srcs = [ + "constraint.go", "doc.go", "expr_eval.go", "functions.go", + "parse.go", + "validation.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval", visibility = ["//visibility:public"], deps = [ "//pkg/ccl/changefeedccl/cdcevent", + "//pkg/clusterversion", + "//pkg/jobs/jobspb", + "//pkg/keys", + "//pkg/roachpb", + "//pkg/sql", + "//pkg/sql/catalog", "//pkg/sql/catalog/colinfo", + "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/schemaexpr", + "//pkg/sql/parser", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/sem/builtins", @@ -27,6 +38,7 @@ go_library( "//pkg/util/json", "//pkg/util/log", "@com_github_cockroachdb_errors//:errors", + "@com_github_lib_pq//oid", ], ) @@ -36,6 +48,7 @@ go_test( "expr_eval_test.go", "functions_test.go", "main_test.go", + "validation_test.go", ], embed = [":cdceval"], deps = [ @@ -43,13 +56,14 @@ go_test( "//pkg/ccl/changefeedccl/cdcevent", "//pkg/ccl/changefeedccl/cdctest", "//pkg/ccl/changefeedccl/changefeedbase", - "//pkg/ccl/utilccl", "//pkg/jobs/jobspb", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", + "//pkg/security/username", "//pkg/server", "//pkg/settings/cluster", + "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", "//pkg/sql/distsql", @@ -58,6 +72,7 @@ go_test( "//pkg/sql/rowenc", "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", "//pkg/sql/sessiondatapb", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", diff --git a/pkg/ccl/changefeedccl/cdceval/constraint.go b/pkg/ccl/changefeedccl/cdceval/constraint.go new file mode 100644 index 000000000000..09773d1f0348 --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/constraint.go @@ -0,0 +1,108 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdceval + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +// ConstrainPrimaryIndexSpanByFilter attempts to constrain table primary +// index span if changefeed expression (select clause) is specified. +// Returns possibly constrained spans, and a possibly modified (optimized) +// select clause. +func ConstrainPrimaryIndexSpanByFilter( + ctx context.Context, + execCtx sql.JobExecContext, + selectClause string, + descr catalog.TableDescriptor, + target jobspb.ChangefeedTargetSpecification, + includeVirtual bool, +) (_ []roachpb.Span, updatedSelect string, _ error) { + if selectClause == "" { + return nil, "", errors.AssertionFailedf("unexpected empty filter") + } + sc, err := ParseChangefeedExpression(selectClause) + if err != nil { + return nil, "", pgerror.Wrap(err, pgcode.InvalidParameterValue, + "could not parse changefeed expression") + } + + ed, err := newEventDescriptorForTarget(descr, target, schemaTS(execCtx), includeVirtual) + if err != nil { + return nil, "", err + } + + evalCtx := &execCtx.ExtendedEvalContext().Context + spans, remainingFilter, err := constrainSpansBySelectClause( + ctx, execCtx, evalCtx, execCtx.ExecCfg().Codec, sc, ed) + if err != nil { + return nil, "", err + } + + if remainingFilter != nil { + // non-nil remainingFilter implies we had sc.Where clause. + if remainingFilter == tree.DBoolTrue { + sc.Where = nil + } else { + sc.Where.Expr = remainingFilter + } + } + + return spans, AsStringUnredacted(sc), nil +} + +// constrainSpansBySelectClause is a helper that attempts to constrain primary +// index spans by the filter in the select clause. Returns constrained spans, +// along with the remaining filter. +func constrainSpansBySelectClause( + ctx context.Context, + sc sql.SpanConstrainer, + evalCtx *eval.Context, + codec keys.SQLCodec, + selectClause *tree.SelectClause, + ed *cdcevent.EventDescriptor, +) ([]roachpb.Span, tree.Expr, error) { + // Predicate changefeed currently works on a single table only. + // Verify this assumption. + if len(selectClause.From.Tables) != 1 { + return nil, nil, errors.AssertionFailedf( + "expected 1 table expression, found %d", len(selectClause.From.Tables)) + } + + if selectClause.Where == nil { + // Nothing to constrain. + return []roachpb.Span{ed.TableDescriptor().PrimaryIndexSpan(codec)}, nil, nil + } + + tableName := tableNameOrAlias(ed.TableName, selectClause.From.Tables[0]) + semaCtx := newSemaCtx(ed) + return sc.ConstrainPrimaryIndexSpanByExpr( + ctx, sql.BestEffortConstrain, tableName, ed.TableDescriptor(), + evalCtx, semaCtx, selectClause.Where.Expr) +} + +func schemaTS(execCtx sql.JobExecContext) hlc.Timestamp { + if execCtx.Txn() != nil { + return execCtx.Txn().ReadTimestamp() + } + return execCtx.ExecCfg().Clock.Now() +} diff --git a/pkg/ccl/changefeedccl/cdceval/doc.go b/pkg/ccl/changefeedccl/cdceval/doc.go index cca6e6ea7b74..a909ae81c8c9 100644 --- a/pkg/ccl/changefeedccl/cdceval/doc.go +++ b/pkg/ccl/changefeedccl/cdceval/doc.go @@ -18,9 +18,9 @@ Namely, this package concerns itself with 3 things: Evaluator is the gateway into the evaluation logic; it has 3 methods matching the above use cases. Before filtering and projection can be used, Evaluator must -be configured with appropriate predicate and filtering expressions via ConfigureProjection. +be configured with appropriate predicate and filtering expressions via configureProjection. -If the Evaluator is not configured with ConfigureProjection, then each event is assumed +If the Evaluator is not configured with configureProjection, then each event is assumed to match filter by default, and projection operation is an identity operation returning input row. @@ -55,53 +55,3 @@ We also provide custom, CDC specific functions, such as cdc_prev() which returns a JSONB record. See functions.go for more details. ***/ - -// TODO(yevgeniy): Various notes/questions/issues and todos. -// 1. Options issues: -// * key_in_value: makes no sense; just "select *" -// * key_only: currently unsupported by this flavor; would be nice to support it though -// i.e. you only want the key, but you need "where" clause filtering. Not clear how to express in sql.y -// * VirtualColumnVisibility: null or omit -- both can be accomplished -// * null: currently emitting null, but you can choose to emit null via "select ... null as vcolumn" -// * omit: well, just don't select. -// * On the other hand, we can also support "emit" mode, where we can compute vcolumn expression. -// * updated and mvcc_timestamp options -- both supported via select -// * Wrapped option -- does it make sense here. -// 3. Probably need to add more custom functions. -// * Determine what to do with stable function overrides (now()) vs cdc_mvcc_timestamp. Keep both? drop one? -// 4. How to surface previous row -- it's an open question. -// * Option 1: provide cdc_prev() builtin which returns JSON encoding of previous row. -// One of the negatives is that we are adding an additional JSONB encoding cost, though, this may not -// be that horrible. One interesting thing we could do with this approach is to also have a function -// cdc_delta which reduces JSONB to contain only modified columns (cdc_delta(cdc_prev()). -// Of course you could do something like this with "prev" table, but you'd have to "(case ...)" select -// for each table column. -// And since composition is so cool, you could use cdc_delta to determine if an update is not actually -// and update, but an upsert event. -// * Option 2: provide "prev" table. Also has negatives. Name resolution becomes more complex. You could -// legitimately have "prev" table, so you'd always need to alias the "real prev" table. The prev table -// is not specified via sql.y, so that's confusing. -// * Regardless of the option, keep in mind that sometimes prev is not set -- e.g. w/out diff option -// (here, we can return an error), but also not set during initial scan. So, the query must handle -// nulls in prev value. Just something to keep in mind. -// 5. We must be able to return permanent errors from this code that cause changefeed to fail. -// If filtering fails for a row (e.g. "select ... where col_a/col_b > 0" results in divide by 0), -// this will fail forever, and so we must be able to return permanent error. -// 6. Related to 5, we must have poison message handling so we won't kill feed in cases like that. -// 7. Schema changes could cause permanent failures. -// 8. Multiple *'s are allowed. But should they? -// 9. It is interesting to consider what access to prev does when we then send that data to encoder. -// Right now, we hard code before/after datums; with predicates, we should probably change how things are encoded. -// I.e. no "before"/"after" fields in json/avro -- instead, you select what you want to select. -// 10. Multi family support -- sort of breaks down because you get datums only for 1 family at a time. Any expressions -// comparing columns across families will fail. -// 11. Span constraints -- arguably the "holy grail" -- something that depends on the optiizer, but perhaps we -// can find a way of using that w/out significant refactor to expose entirety of changefeed to opt. -// Basically, given the set of predicates over primary key span, try to narrow the span(s) to those that can -// satisfy predicates. -// 12. UI/Usability: Simple contradictions are detected -- but not all. Even w/out contradiction, the user -// may want to know which events match/not match, and how does the data look like. We might need a mode -// where the data always emitted, but it is marked somehow, indicating how the data will be handled. -// 13. We should walk expressions to determine if we need to turn on an option. E.g. if we know user wants to filter -// out deletes, we could push this predicate down to KV (once kv supports filtering). -// Another idea is potentially detect if cdc_prev() is used and if so, turn on with diff option. diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval.go b/pkg/ccl/changefeedccl/cdceval/expr_eval.go index 422aeb624bb9..3273eccbbd5d 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval.go @@ -27,11 +27,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" + "github.com/lib/pq/oid" ) // Evaluator is a responsible for evaluating expressions in CDC. type Evaluator struct { selectors []tree.SelectExpr + from tree.TableExpr where tree.Expr evalCtx *eval.Context @@ -40,38 +42,23 @@ type Evaluator struct { evaluator *exprEval } -// NewEvaluator returns new evaluator instance. -func NewEvaluator(evalCtx *eval.Context) Evaluator { - return Evaluator{evalCtx: evalCtx.Copy()} -} +// NewEvaluator returns evaluator configured to process specified +// select expression. +func NewEvaluator(evalCtx *eval.Context, sc *tree.SelectClause) (*Evaluator, error) { + e := &Evaluator{evalCtx: evalCtx.Copy()} -// ConfigureProjection configures this evaluator to evaluate projection -func (e *Evaluator) ConfigureProjection(exprs tree.SelectExprs) error { - if len(exprs) == 0 { - return pgerror.New(pgcode.InvalidParameterValue, "expected at least 1 projection") - } - e.selectors = exprs - for _, se := range e.selectors { - expr, err := validateExpressionForCDC(se.Expr) - if err != nil { - return err + if len(sc.From.Tables) > 0 { // 0 tables used only in tests. + if len(sc.From.Tables) != 1 { + return nil, errors.AssertionFailedf("expected 1 table, found %d", len(sc.From.Tables)) } - se.Expr = expr + e.from = sc.From.Tables[0] } - return nil -} -// ConfigureFilter configures this evaluator to match rows against filter expression. -func (e *Evaluator) ConfigureFilter(filter tree.Expr) error { - if filter == nil { - return nil - } - expr, err := validateExpressionForCDC(filter) - if err != nil { - return err + if err := e.initSelectClause(sc); err != nil { + return nil, err } - e.where = expr - return nil + + return e, nil } // ComputeVirtualColumns updates row with computed values for all virtual columns. @@ -112,13 +99,69 @@ func (e *Evaluator) Projection( return e.evaluator.evalProjection(ctx, updatedRow, mvccTS, prevRow) } +// initSelectClause configures this evaluator to evaluate specified select clause. +func (e *Evaluator) initSelectClause(sc *tree.SelectClause) error { + if len(sc.Exprs) == 0 { // Shouldn't happen, but be defensive. + return pgerror.New(pgcode.InvalidParameterValue, + "expected at least 1 projection") + } + + e.selectors = sc.Exprs + for _, se := range e.selectors { + expr, err := validateExpressionForCDC(se.Expr) + if err != nil { + return err + } + se.Expr = expr + } + + if sc.Where != nil { + expr, err := validateExpressionForCDC(sc.Where.Expr) + if err != nil { + return err + } + e.where = expr + } + + return nil +} + // initEval initializes evaluator for the specified event descriptor. func (e *Evaluator) initEval(ctx context.Context, d *cdcevent.EventDescriptor) error { - if e.evaluator != nil && d.Equals(e.evaluator.EventDescriptor) { - return nil + if e.evaluator != nil { + sameVersion, sameTypes := d.EqualsWithUDTCheck(e.evaluator.EventDescriptor) + if sameVersion && sameTypes { + // Event descriptor and UDT types are the same -- re-use the same evaluator. + return nil + } + + if sameVersion { + // Here, we know that even though descriptor versions are the same, the + // check for equality with UDT type check failed. Thus, we know some user + // defined types have changed. + // The previously parsed select & filter expressions have type annotations, + // and those may now be incorrect. So, parse and re-initialize evaluator + // expressions. + var where *tree.Where + if e.where != nil { + where = tree.NewWhere(tree.AstWhere, e.where) + } + sc, err := ParseChangefeedExpression(AsStringUnredacted(&tree.SelectClause{ + From: tree.From{Tables: tree.TableExprs{e.from}}, + Exprs: e.selectors, + Where: where, + })) + if err != nil { + return err + } + if err := e.initSelectClause(sc); err != nil { + return err + } + // Fall through to re-create e.evaluator. + } } - evaluator := newExprEval(e.evalCtx, d) + evaluator := newExprEval(e.evalCtx, d, tableNameOrAlias(d.TableName, e.from)) for _, selector := range e.selectors { if err := evaluator.addSelector(ctx, selector, len(e.selectors)); err != nil { return err @@ -135,17 +178,17 @@ func (e *Evaluator) initEval(ctx context.Context, d *cdcevent.EventDescriptor) e type exprEval struct { *cdcevent.EventDescriptor - semaCtx tree.SemaContext + semaCtx *tree.SemaContext evalCtx *eval.Context evalHelper *rowContainer // evalHelper is a container tree.IndexedVarContainer. iVarHelper tree.IndexedVarHelper // iVarHelper helps create indexed variables bound to evalHelper. resolver cdcNameResolver // resolver responsible for performing function name resolution. - starProjection bool - selectors []tree.TypedExpr // set of expressions to evaluate when performing evalProjection. - projection cdcevent.Projection - filter tree.TypedExpr // where clause filter + starProjection bool // Set to true if we have a single '*' projection. + selectors []tree.TypedExpr // set of expressions to evaluate when performing evalProjection. + projection cdcevent.Projection // cdcevent.Projects helps construct projection results. + filter tree.TypedExpr // where clause filter // keep track of number of times particular column name was used // in selectors. Since the data produced by CDC gets converted @@ -158,11 +201,13 @@ type exprEval struct { rowEvalCtx rowEvalContext } -func newExprEval(evalCtx *eval.Context, ed *cdcevent.EventDescriptor) *exprEval { +func newExprEval( + evalCtx *eval.Context, ed *cdcevent.EventDescriptor, tableName *tree.TableName, +) *exprEval { cols := ed.ResultColumns() e := &exprEval{ EventDescriptor: ed, - semaCtx: tree.MakeSemaContext(), + semaCtx: newSemaCtx(ed), evalCtx: evalCtx.Copy(), evalHelper: &rowContainer{cols: cols}, projection: cdcevent.MakeProjection(ed), @@ -172,7 +217,7 @@ func newExprEval(evalCtx *eval.Context, ed *cdcevent.EventDescriptor) *exprEval evalCtx = nil // From this point, only e.evalCtx should be used. // Configure semantic context. - e.semaCtx.SearchPath = &cdcCustomFunctionResolver{SearchPath: sessiondata.DefaultSearchPath} + e.semaCtx.SearchPath = &cdcCustomFunctionResolver{SearchPath: &sessiondata.DefaultSearchPath} e.semaCtx.Properties.Require("cdc", tree.RejectAggregates|tree.RejectGenerators|tree.RejectWindowApplications|tree.RejectNestedGenerators, ) @@ -197,10 +242,7 @@ func newExprEval(evalCtx *eval.Context, ed *cdcevent.EventDescriptor) *exprEval e.resolver = cdcNameResolver{ EventDescriptor: ed, NameResolutionVisitor: schemaexpr.MakeNameResolutionVisitor( - colinfo.NewSourceInfoForSingleTable( - tree.MakeUnqualifiedTableName(tree.Name(ed.TableName)), - nakedResultColumns(), - ), + colinfo.NewSourceInfoForSingleTable(*tableName, nakedResultColumns()), e.iVarHelper, ), } @@ -374,7 +416,8 @@ func (e *exprEval) typeCheck( ) (tree.TypedExpr, error) { // If we have variable free immutable expressions, then we can just evaluate it right away. typedExpr, err := schemaexpr.SanitizeVarFreeExpr( - ctx, expr, targetType, "cdc", &e.semaCtx, volatility.Immutable, false) + ctx, expr, targetType, "cdc", e.semaCtx, + volatility.Immutable, true) if err == nil { d, err := eval.Expr(e.evalCtx, typedExpr) if err != nil { @@ -395,7 +438,7 @@ func (e *exprEval) typeCheck( } // Run type check & normalize. - typedExpr, err = expr.TypeCheck(ctx, &e.semaCtx, targetType) + typedExpr, err = expr.TypeCheck(ctx, e.semaCtx, targetType) if err != nil { return nil, err } @@ -423,7 +466,7 @@ func (e *exprEval) evalExpr( return nil, v.err } - typedExpr, err := tree.TypeCheck(ctx, newExpr, &e.semaCtx, targetType) + typedExpr, err := tree.TypeCheck(ctx, newExpr, e.semaCtx, targetType) if err != nil { return nil, err } @@ -669,6 +712,72 @@ func (v *replaceIndexVarVisitor) VisitPost(expr tree.Expr) (newNode tree.Expr) { // in the Annotation field of evalCtx when evaluating expressions. const cdcAnnotationAddr tree.AnnotationIdx = iota + 1 +// rowEvalContextFromEvalContext returns rowEvalContext stored as an annotation +// in evalCtx. func rowEvalContextFromEvalContext(evalCtx *eval.Context) *rowEvalContext { return evalCtx.Annotations.Get(cdcAnnotationAddr).(*rowEvalContext) } + +const rejectInvalidCDCExprs = (tree.RejectAggregates | tree.RejectGenerators | + tree.RejectWindowApplications | tree.RejectNestedGenerators) + +// newSemaCtx returns new tree.SemaCtx configured for cdc. +func newSemaCtx(d *cdcevent.EventDescriptor) *tree.SemaContext { + sema := tree.MakeSemaContext() + sema.SearchPath = &cdcCustomFunctionResolver{SearchPath: &sessiondata.DefaultSearchPath} + sema.Properties.Require("cdc", rejectInvalidCDCExprs) + + if d.HasUserDefinedTypes() { + sema.TypeResolver = newTypeReferenceResolver(d) + } + return &sema +} + +// cdcTypeReferenceReesolver is responsible for resolving user defined types. +type cdcTypeReferenceResolver struct { + byName map[string]*types.T + byOID map[oid.Oid]*types.T +} + +var _ tree.TypeReferenceResolver = (*cdcTypeReferenceResolver)(nil) + +func newTypeReferenceResolver(d *cdcevent.EventDescriptor) tree.TypeReferenceResolver { + // Because EventDescriptor is built with hydrated table descriptors, and the + // expression must have been normalized, we don't need to do any fancy + // resolution; just go through user defined columns in the descriptor and + // build the lookup maps. + r := &cdcTypeReferenceResolver{ + byName: make(map[string]*types.T), + byOID: make(map[oid.Oid]*types.T), + } + + for _, c := range d.ResultColumns() { + if c.Typ.UserDefined() { + r.byName[c.Typ.TypeMeta.Name.FQName()] = c.Typ + r.byOID[c.Typ.Oid()] = c.Typ + } + } + return r +} + +// ResolveType implements tree.TypeReferenceResolver. +func (r *cdcTypeReferenceResolver) ResolveType( + ctx context.Context, name *tree.UnresolvedObjectName, +) (*types.T, error) { + // NB: normalization step fully qualifies types, so use the full name to + // lookup. + if typ, found := r.byName[name.String()]; found { + return typ, nil + } + return nil, pgerror.Newf(pgcode.UndefinedObject, "undefined object %s", name) +} + +// ResolveTypeByOID implements tree.TypeReferenceResolver. +func (r *cdcTypeReferenceResolver) ResolveTypeByOID( + ctx context.Context, oid oid.Oid, +) (*types.T, error) { + if typ, found := r.byOID[oid]; found { + return typ, nil + } + return nil, pgerror.Newf(pgcode.UndefinedObject, "undefined object with OID %d", oid) +} diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go index 448c839ee1f3..cafb49eb0562 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go @@ -39,68 +39,11 @@ import ( "github.com/stretchr/testify/require" ) -func TestNoopPredicate(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) - defer s.Stopper().Stop(context.Background()) - - sqlDB := sqlutils.MakeSQLRunner(db) - sqlDB.Exec(t, - "CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d INT, FAMILY most (a,b,c), FAMILY only_d (d))") - desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") - - serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig - ctx := context.Background() - decoder, err := cdcevent.NewEventDecoder( - ctx, &serverCfg, - changefeedbase.Targets{ - { - Type: jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY, - TableID: desc.GetID(), - FamilyName: "most", - }, - }, false) - require.NoError(t, err) - - popRow, cleanup := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), desc) - defer cleanup() - sqlDB.Exec(t, "INSERT INTO foo (a, b, d) VALUES (1, 'one', -1)") - testRow := decodeRow(t, decoder, popRow(t), false) - - e, err := makeEvaluator(t, s.ClusterSettings(), "") - require.NoError(t, err) - - matches, err := e.MatchesFilter(ctx, testRow, hlc.Timestamp{}, testRow) - require.NoError(t, err) - require.True(t, matches) - - projection, err := e.Projection(ctx, testRow, hlc.Timestamp{}, testRow) - require.NoError(t, err) - require.Equal(t, testRow.EventDescriptor, projection.EventDescriptor) -} - -// readSortedRangeFeedValues reads n values, and sorts them based on key order. -func readSortedRangeFeedValues( - t *testing.T, n int, row func(t *testing.T) *roachpb.RangeFeedValue, -) (res []roachpb.RangeFeedValue) { - t.Helper() - for i := 0; i < n; i++ { - v := row(t) - res = append(res, *v) - } - sort.Slice(res, func(i, j int) bool { - return res[i].Key.Compare(res[j].Key) < 0 - }) - return res -} - func TestEvaluator(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(context.Background()) sqlDB := sqlutils.MakeSQLRunner(db) @@ -116,7 +59,7 @@ CREATE TABLE foo ( FAMILY main (a, b, e), FAMILY only_c (c) )`) - desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "foo") type decodeExpectation struct { expectUnwatchedErr bool @@ -208,6 +151,18 @@ CREATE TABLE foo ( }, }, }, + { + testName: "main/projection_aliased", + familyName: "main", + actions: []string{"INSERT INTO foo (a, b) VALUES (3, '3rd test')"}, + predicate: "SELECT bar.e, a FROM foo AS bar", + expectMainFamily: []decodeExpectation{ + { + keyValues: []string{"3rd test", "3"}, + allValues: map[string]string{"a": "3", "e": "inactive"}, + }, + }, + }, { testName: "main/not_closed", familyName: "main", @@ -502,7 +457,8 @@ CREATE TABLE foo ( require.NoError(t, err) if expect.expectFiltered { - require.Equal(t, expect.keyValues, slurpKeys(t, updatedRow), "isDelete=%t fid=%d", updatedRow.IsDeleted(), eventFamilyID) + require.Equal(t, expect.keyValues, slurpKeys(t, updatedRow), + "isDelete=%t fid=%d", updatedRow.IsDeleted(), eventFamilyID) matches, err := evaluator.MatchesFilter(ctx, updatedRow, v.Timestamp(), prevRow) require.NoError(t, err) require.False(t, matches, "keys: %v", slurpKeys(t, updatedRow)) @@ -574,14 +530,14 @@ func TestEvaluatesProjection(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(context.Background()) sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, ""+ "CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d INT, "+ "FAMILY most (a,b,c), FAMILY only_d (d))") - desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "foo") testRow := cdcevent.TestingMakeEventRow(desc, 0, randEncDatumRow(t, desc, 0), false) verifyConstantsFolded := func(p *exprEval) { @@ -653,26 +609,13 @@ func TestEvaluatesProjection(t *testing.T) { // makeEvaluator creates Evaluator and configures it with specified // select statement predicate. -func makeEvaluator(t *testing.T, st *cluster.Settings, selectStr string) (Evaluator, error) { +func makeEvaluator(t *testing.T, st *cluster.Settings, selectStr string) (*Evaluator, error) { t.Helper() - evalCtx := eval.MakeTestingEvalContext(st) - e := NewEvaluator(&evalCtx) - if selectStr == "" { - return e, nil - } s, err := parser.ParseOne(selectStr) require.NoError(t, err) slct := s.AST.(*tree.Select).Select.(*tree.SelectClause) - if err := e.ConfigureProjection(slct.Exprs); err != nil { - return Evaluator{}, err - } - - if slct.Where != nil { - if err := e.ConfigureFilter(slct.Where.Expr); err != nil { - return Evaluator{}, err - } - } - return e, nil + evalCtx := eval.MakeTestingEvalContext(st) + return NewEvaluator(&evalCtx, slct) } func makeExprEval( @@ -749,3 +692,18 @@ func makeEncDatumRow(datums ...tree.Datum) (row rowenc.EncDatumRow) { } return row } + +// readSortedRangeFeedValues reads n values, and sorts them based on key order. +func readSortedRangeFeedValues( + t *testing.T, n int, row func(t *testing.T) *roachpb.RangeFeedValue, +) (res []roachpb.RangeFeedValue) { + t.Helper() + for i := 0; i < n; i++ { + v := row(t) + res = append(res, *v) + } + sort.Slice(res, func(i, j int) bool { + return res[i].Key.Compare(res[j].Key) < 0 + }) + return res +} diff --git a/pkg/ccl/changefeedccl/cdceval/functions.go b/pkg/ccl/changefeedccl/cdceval/functions.go index f781f5fd24ca..91cb91c84263 100644 --- a/pkg/ccl/changefeedccl/cdceval/functions.go +++ b/pkg/ccl/changefeedccl/cdceval/functions.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/volatility" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -153,7 +152,7 @@ func prevRowAsJSON(evalCtx *eval.Context, _ tree.Datums) (tree.Datum, error) { } type cdcCustomFunctionResolver struct { - sessiondata.SearchPath + tree.SearchPath } // Resolve implements tree.CustomFunctionDefinitionResolver diff --git a/pkg/ccl/changefeedccl/cdceval/functions_test.go b/pkg/ccl/changefeedccl/cdceval/functions_test.go index 2eb31d7c74cd..fabcf5e4095c 100644 --- a/pkg/ccl/changefeedccl/cdceval/functions_test.go +++ b/pkg/ccl/changefeedccl/cdceval/functions_test.go @@ -33,14 +33,14 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(context.Background()) sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, ""+ "CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d INT, "+ "FAMILY most (a,b,c), FAMILY only_d (d))") - desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "foo") ctx := context.Background() diff --git a/pkg/ccl/changefeedccl/cdceval/main_test.go b/pkg/ccl/changefeedccl/cdceval/main_test.go index 35321d8f87af..5735dd2b1bb5 100644 --- a/pkg/ccl/changefeedccl/cdceval/main_test.go +++ b/pkg/ccl/changefeedccl/cdceval/main_test.go @@ -12,7 +12,6 @@ import ( "os" "testing" - "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/security/securityassets" "github.com/cockroachdb/cockroach/pkg/security/securitytest" "github.com/cockroachdb/cockroach/pkg/server" @@ -22,7 +21,6 @@ import ( ) func TestMain(m *testing.M) { - defer utilccl.TestingEnableEnterprise()() securityassets.SetLoader(securitytest.EmbeddedAssets) randutil.SeedForTests() serverutils.InitTestServerFactory(server.TestServerFactory) diff --git a/pkg/ccl/changefeedccl/cdceval/parse.go b/pkg/ccl/changefeedccl/cdceval/parse.go new file mode 100644 index 000000000000..404ce04c54e5 --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/parse.go @@ -0,0 +1,47 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdceval + +import ( + "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/errors" +) + +// AsStringUnredacted returns unredacted string representation. +// Method is intended to be used when serializing node formatter to be stored +// in protocol messages. +func AsStringUnredacted(n tree.NodeFormatter) string { + return tree.AsStringWithFlags(n, tree.FmtParsable|tree.FmtShowPasswords) +} + +// ParseChangefeedExpression is a helper to parse changefeed "select clause". +func ParseChangefeedExpression(selectClause string) (*tree.SelectClause, error) { + stmt, err := parser.ParseOne(selectClause) + if err != nil { + return nil, err + } + if slct, ok := stmt.AST.(*tree.Select); ok { + if sc, ok := slct.Select.(*tree.SelectClause); ok { + return sc, nil + } + } + return nil, errors.AssertionFailedf("expected select clause, found %T", stmt.AST) +} + +// tableNameOrAlias returns tree.TableName for the table expression. +func tableNameOrAlias(name string, expr tree.TableExpr) *tree.TableName { + switch t := expr.(type) { + case *tree.AliasedTableExpr: + return tree.NewUnqualifiedTableName(t.As.Alias) + case *tree.TableRef: + return tree.NewUnqualifiedTableName(t.As.Alias) + } + return tree.NewUnqualifiedTableName(tree.Name(name)) +} diff --git a/pkg/ccl/changefeedccl/cdceval/validation.go b/pkg/ccl/changefeedccl/cdceval/validation.go new file mode 100644 index 000000000000..496487fe54b0 --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/validation.go @@ -0,0 +1,250 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdceval + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" + "github.com/lib/pq/oid" +) + +// NormalizeAndValidateSelectForTarget normalizes select expression and verifies +// expression is valid for a table and target family. includeVirtual indicates +// if virtual columns should be considered valid in the expressions. +// Normalization steps include: +// * Table name replaces with table reference +// * UDTs values replaced with their physical representation (to keep expression stable +// across data type changes). +// The normalized (updated) select clause expression can be serialized into protocol +// buffer using cdceval.AsStringUnredacted. +func NormalizeAndValidateSelectForTarget( + ctx context.Context, + execCtx sql.JobExecContext, + desc catalog.TableDescriptor, + target jobspb.ChangefeedTargetSpecification, + sc *tree.SelectClause, + includeVirtual bool, +) error { + execCtx.SemaCtx() + execCfg := execCtx.ExecCfg() + if !execCfg.Settings.Version.IsActive(ctx, clusterversion.EnablePredicateProjectionChangefeed) { + return errors.Newf( + `filters and projections not supported until upgrade to version %s or higher is finalized`, + clusterversion.EnablePredicateProjectionChangefeed.String()) + } + + // This really shouldn't happen as it's enforced by sql.y. + if len(sc.From.Tables) != 1 { + return pgerror.Newf(pgcode.Syntax, "invalid CDC expression: only 1 table supported") + } + + // Sanity check target and descriptor refer to the same table. + if target.TableID != desc.GetID() { + return errors.AssertionFailedf("target table id (%d) does not match descriptor id (%d)", + target.TableID, desc.GetID()) + } + + // This method is meant to be called early on when changefeed is created -- + // i.e. during planning. As such, we expect execution context to have + // associated Txn() -- without which we cannot perform normalization. Verify + // this assumption (txn is needed for type resolution). + if execCtx.Txn() == nil { + return errors.AssertionFailedf("expected non-nil transaction") + } + + // Perform normalization. + var err error + sc, err = normalizeSelectClause(ctx, *execCtx.SemaCtx(), sc, desc) + if err != nil { + return err + } + + ed, err := newEventDescriptorForTarget(desc, target, schemaTS(execCtx), includeVirtual) + if err != nil { + return err + } + + evalCtx := &execCtx.ExtendedEvalContext().Context + // Try to constrain spans by select clause. We don't care about constrained + // spans here, but constraining spans kicks off optimizer which detects many + // errors. + if _, _, err := constrainSpansBySelectClause( + ctx, execCtx, evalCtx, execCfg.Codec, sc, ed, + ); err != nil { + return err + } + + // Construct and initialize evaluator. This performs some static checks, + // and (importantly) type checks expressions. + evaluator, err := NewEvaluator(evalCtx, sc) + if err != nil { + return err + } + + return evaluator.initEval(ctx, ed) +} + +func newEventDescriptorForTarget( + desc catalog.TableDescriptor, + target jobspb.ChangefeedTargetSpecification, + schemaTS hlc.Timestamp, + includeVirtual bool, +) (*cdcevent.EventDescriptor, error) { + family, err := getTargetFamilyDescriptor(desc, target) + if err != nil { + return nil, err + } + return cdcevent.NewEventDescriptor(desc, family, includeVirtual, schemaTS) +} + +func getTargetFamilyDescriptor( + desc catalog.TableDescriptor, target jobspb.ChangefeedTargetSpecification, +) (*descpb.ColumnFamilyDescriptor, error) { + switch target.Type { + case jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY: + return desc.FindFamilyByID(0) + case jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY: + var fd *descpb.ColumnFamilyDescriptor + for _, family := range desc.GetFamilies() { + if family.Name == target.FamilyName { + fd = &family + break + } + } + if fd == nil { + return nil, pgerror.Newf(pgcode.InvalidParameterValue, "no such family %s", target.FamilyName) + } + return fd, nil + case jobspb.ChangefeedTargetSpecification_EACH_FAMILY: + // TODO(yevgeniy): Relax this restriction; some predicates/projectsion + // are entirely fine to use (e.g "*"). + return nil, pgerror.Newf(pgcode.InvalidParameterValue, + "projections and filter cannot be used when running against multifamily table (table has %d families)", + desc.NumFamilies()) + default: + return nil, errors.AssertionFailedf("invalid target type %v", target.Type) + } +} + +// normalizeSelectClause performs normalization step for select clause. +// Returns normalized select clause. +func normalizeSelectClause( + ctx context.Context, + semaCtx tree.SemaContext, + sc *tree.SelectClause, + desc catalog.TableDescriptor, +) (normalizedSelectClause *tree.SelectClause, _ error) { + // Turn FROM clause to table reference. + // Note: must specify AliasClause for TableRef expression; otherwise we + // won't be able to deserialize string representation (grammar requires + // "select ... from [table_id as alias]") + var alias tree.AliasClause + switch t := sc.From.Tables[0].(type) { + case *tree.AliasedTableExpr: + alias = t.As + case tree.TablePattern: + default: + // This is verified by sql.y -- but be safe. + return nil, errors.AssertionFailedf("unexpected table expression type %T", + sc.From.Tables[0]) + } + + if alias.Alias == "" { + alias.Alias = tree.Name(desc.GetName()) + } + sc.From.Tables[0] = &tree.TableRef{ + TableID: int64(desc.GetID()), + As: alias, + } + + // Setup sema ctx to handle cdc expressions. We want to make sure we only + // override some properties, while keeping other properties (type resolver) + // intact. + semaCtx.SearchPath = &cdcCustomFunctionResolver{SearchPath: semaCtx.SearchPath} + semaCtx.Properties.Require("cdc", rejectInvalidCDCExprs) + + resolveType := func(ref tree.ResolvableTypeReference) (tree.ResolvableTypeReference, error) { + typ, err := tree.ResolveType(ctx, ref, semaCtx.GetTypeResolver()) + if err != nil { + return nil, pgerror.Wrapf(err, pgcode.IndeterminateDatatype, + "could not resolve type %s", ref.SQLString()) + } + return &tree.OIDTypeReference{OID: typ.Oid()}, nil + } + + // Verify that any UDTs used in the statement reference only the UDTs that are + // part of the target table descriptor. + v := &tree.TypeCollectorVisitor{ + OIDs: make(map[oid.Oid]struct{}), + } + + stmt, err := tree.SimpleStmtVisit(sc, func(expr tree.Expr) (recurse bool, newExpr tree.Expr, err error) { + // Replace type references with resolved type. + switch e := expr.(type) { + case *tree.AnnotateTypeExpr: + typ, err := resolveType(e.Type) + if err != nil { + return false, expr, err + } + e.Type = typ + case *tree.CastExpr: + typ, err := resolveType(e.Type) + if err != nil { + return false, expr, err + } + e.Type = typ + } + + // Collect resolved type OIDs. + recurse, newExpr = v.VisitPre(expr) + return recurse, newExpr, nil + }) + + if err != nil { + return nil, err + } + switch t := stmt.(type) { + case *tree.SelectClause: + normalizedSelectClause = t + default: + // We walked tree.SelectClause -- getting anything else would be surprising. + return nil, errors.AssertionFailedf("unexpected result type %T", stmt) + } + + if len(v.OIDs) == 0 { + return normalizedSelectClause, nil + } + + // Verify that the only user defined types used are the types referenced by + // target table. + allowedOIDs := make(map[oid.Oid]struct{}) + for _, c := range desc.UserDefinedTypeColumns() { + allowedOIDs[c.GetType().Oid()] = struct{}{} + } + + for id := range v.OIDs { + if _, isAllowed := allowedOIDs[id]; !isAllowed { + return nil, pgerror.Newf(pgcode.FeatureNotSupported, + "use of user defined types not referenced by target table is not supported") + } + } + + return normalizedSelectClause, nil +} diff --git a/pkg/ccl/changefeedccl/cdceval/validation_test.go b/pkg/ccl/changefeedccl/cdceval/validation_test.go new file mode 100644 index 000000000000..50a4600dd1ee --- /dev/null +++ b/pkg/ccl/changefeedccl/cdceval/validation_test.go @@ -0,0 +1,144 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package cdceval + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestNormalizeAndValidate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.ExecMultiple(t, + `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`, + `CREATE TYPE unused AS ENUM ('do not use')`, + `CREATE SCHEMA alt`, + `CREATE TYPE alt.status AS ENUM ('alt_open', 'alt_closed', 'alt_inactive')`, + `CREATE TYPE alt.unused AS ENUM ('really', 'do', 'not', 'use')`, + `CREATE TABLE foo (a INT PRIMARY KEY, status status, alt alt.status)`, + `CREATE DATABASE other`, + `CREATE TABLE other.foo (a INT)`, + ) + + fooDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "foo") + otherFooDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "other", "foo") + + ctx := context.Background() + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + p, cleanup := sql.NewInternalPlanner("test", + kvDB.NewTxn(ctx, "test-planner"), + username.RootUserName(), &sql.MemoryMetrics{}, &execCfg, + sessiondatapb.SessionData{ + Database: "defaultdb", + SearchPath: sessiondata.DefaultSearchPath.GetPathArray(), + }) + defer cleanup() + execCtx := p.(sql.JobExecContext) + + for _, tc := range []struct { + name string + desc catalog.TableDescriptor + stmt string + expectErr string + expectStmt string + }{ + { + name: "reject multiple tables", + desc: fooDesc, + stmt: "SELECT * FROM foo, other.foo", + expectErr: "invalid CDC expression: only 1 table supported", + }, + { + name: "reject contradiction", + desc: fooDesc, + stmt: "SELECT * FROM foo WHERE a IS NULL", + expectErr: `filter "a IS NULL" is a contradiction`, + }, + { + name: "enum must be referenced", + desc: fooDesc, + stmt: "SELECT 'open'::status, 'do not use':::unused FROM foo", + expectErr: `use of user defined types not referenced by target table is not supported`, + }, + { + name: "replaces table name with ref", + desc: fooDesc, + stmt: "SELECT * FROM foo", + expectStmt: fmt.Sprintf("SELECT * FROM [%d AS foo]", fooDesc.GetID()), + }, + { + name: "replaces table name with other.ref", + desc: otherFooDesc, + stmt: "SELECT * FROM other.foo", + expectStmt: fmt.Sprintf("SELECT * FROM [%d AS foo]", otherFooDesc.GetID()), + }, + { + name: "replaces table name with ref aliased", + desc: fooDesc, + stmt: "SELECT * FROM foo AS bar", + expectStmt: fmt.Sprintf("SELECT * FROM [%d AS bar]", fooDesc.GetID()), + }, + { + name: "UDTs fully qualified", + desc: fooDesc, + stmt: "SELECT *, 'inactive':::status FROM foo AS bar WHERE status = 'open':::status", + expectStmt: fmt.Sprintf( + "SELECT *, 'inactive':::defaultdb.public.status "+ + "FROM [%d AS bar] WHERE status = 'open':::defaultdb.public.status", + fooDesc.GetID()), + }, + } { + t.Run(tc.name, func(t *testing.T) { + sc, err := ParseChangefeedExpression(tc.stmt) + require.NoError(t, err) + target := jobspb.ChangefeedTargetSpecification{ + TableID: tc.desc.GetID(), + StatementTimeName: tc.desc.GetName(), + } + + err = NormalizeAndValidateSelectForTarget(ctx, execCtx, tc.desc, target, sc, false) + if tc.expectErr != "" { + require.Regexp(t, tc.expectErr, err) + return + } + + require.NoError(t, err) + serialized := AsStringUnredacted(sc) + log.Infof(context.Background(), "DEBUG: %s", tree.StmtDebugString(sc)) + log.Infof(context.Background(), "Serialized: %s", serialized) + require.Equal(t, tc.expectStmt, serialized) + + // Make sure we can deserialize back. + _, err = ParseChangefeedExpression(serialized) + require.NoError(t, err) + }) + } +} diff --git a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel index f9215568fdaf..44fc98709a11 100644 --- a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel @@ -46,7 +46,6 @@ go_test( "//pkg/base", "//pkg/ccl/changefeedccl/cdctest", "//pkg/ccl/changefeedccl/changefeedbase", - "//pkg/ccl/utilccl", "//pkg/jobs/jobspb", "//pkg/roachpb", "//pkg/security/securityassets", diff --git a/pkg/ccl/changefeedccl/cdcevent/event.go b/pkg/ccl/changefeedccl/cdcevent/event.go index d6be4241d2a5..cc2d0ef7db3e 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event.go +++ b/pkg/ccl/changefeedccl/cdcevent/event.go @@ -177,6 +177,8 @@ func (c ResultColumn) Ordinal() int { type EventDescriptor struct { Metadata + td catalog.TableDescriptor + // List of result columns produced by this descriptor. // This may be different from the table descriptors public columns // (e.g. in case of projection). @@ -188,7 +190,8 @@ type EventDescriptor struct { udtCols []int // Columns containing UDTs. } -func newEventDescriptor( +// NewEventDescriptor returns EventDescriptor for specified table and family descriptors. +func NewEventDescriptor( desc catalog.TableDescriptor, family *descpb.ColumnFamilyDescriptor, includeVirtualColumns bool, @@ -204,6 +207,7 @@ func newEventDescriptor( HasOtherFamilies: desc.NumFamilies() > 1, SchemaTS: schemaTS, }, + td: desc, } // addColumn is a helper to add a column to this descriptor. @@ -281,9 +285,34 @@ func (d *EventDescriptor) ResultColumns() []ResultColumn { return d.cols } -// Equals returns true if this descriptor equals other. -func (d *EventDescriptor) Equals(other *EventDescriptor) bool { - return other != nil && d.TableID == other.TableID && d.Version == other.Version && d.FamilyID == other.FamilyID +// EqualsVersion returns true if this descriptor equals other. +func (d *EventDescriptor) EqualsVersion(other *EventDescriptor) bool { + return d.TableID == other.TableID && + d.Version == other.Version && + d.FamilyID == other.FamilyID +} + +// EqualsWithUDTCheck returns true if event descriptors are the same version and +// their user defined types (if any) are also matching. +func (d *EventDescriptor) EqualsWithUDTCheck( + other *EventDescriptor, +) (sameVersion bool, typesHaveSameVersion bool) { + if d.EqualsVersion(other) { + return true, catalog.UserDefinedTypeColsHaveSameVersion(d.td, other.td) + } + return false, false +} + +// HasUserDefinedTypes returns true if this descriptor contains user defined columns. +func (d *EventDescriptor) HasUserDefinedTypes() bool { + return len(d.udtCols) > 0 +} + +// TableDescriptor returns underlying table descriptor. This method is exposed +// to make it easier to integrate with the rest of descriptor APIs; prefer to use +// higher level methods/structs (e.g. Metadata) instead. +func (d *EventDescriptor) TableDescriptor() catalog.TableDescriptor { + return d.td } type eventDescriptorFactory func( @@ -323,19 +352,12 @@ func getEventDescriptorCached( if v, ok := cache.Get(idVer); ok { ed := v.(*EventDescriptor) - - // Normally, this is a no-op since majority of changefeeds do not use UDTs. - // However, in case we do, we must update cached UDT information based on this - // descriptor since it has up-to-date type information. - for _, udtColIdx := range ed.udtCols { - ord := ed.cols[udtColIdx].ord - ed.cols[udtColIdx].Typ = desc.PublicColumns()[ord].GetType() + if catalog.UserDefinedTypeColsHaveSameVersion(ed.td, desc) { + return ed, nil } - - return ed, nil } - ed, err := newEventDescriptor(desc, family, includeVirtual, schemaTS) + ed, err := NewEventDescriptor(desc, family, includeVirtual, schemaTS) if err != nil { return nil, err } @@ -490,7 +512,7 @@ func TestingMakeEventRow( panic(err) // primary column family always exists. } const includeVirtual = false - ed, err := newEventDescriptor(desc, family, includeVirtual, hlc.Timestamp{}) + ed, err := NewEventDescriptor(desc, family, includeVirtual, hlc.Timestamp{}) if err != nil { panic(err) } diff --git a/pkg/ccl/changefeedccl/cdcevent/event_test.go b/pkg/ccl/changefeedccl/cdcevent/event_test.go index 337dabbb3f47..dbe740a7496a 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event_test.go +++ b/pkg/ccl/changefeedccl/cdcevent/event_test.go @@ -34,7 +34,7 @@ func TestEventDescriptor(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(context.Background()) sqlDB := sqlutils.MakeSQLRunner(db) @@ -51,7 +51,7 @@ CREATE TABLE foo ( FAMILY only_c (c) )`) - tableDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + tableDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "foo") mainFamily := mustGetFamily(t, tableDesc, 0) cFamily := mustGetFamily(t, tableDesc, 1) @@ -90,7 +90,7 @@ CREATE TABLE foo ( }, } { t.Run(fmt.Sprintf("%s/includeVirtual=%t", tc.family.Name, tc.includeVirtual), func(t *testing.T) { - ed, err := newEventDescriptor(tableDesc, tc.family, tc.includeVirtual, s.Clock().Now()) + ed, err := NewEventDescriptor(tableDesc, tc.family, tc.includeVirtual, s.Clock().Now()) require.NoError(t, err) // Verify Metadata information for event descriptor. @@ -112,7 +112,7 @@ func TestEventDecoder(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(context.Background()) sqlDB := sqlutils.MakeSQLRunner(db) @@ -129,7 +129,7 @@ CREATE TABLE foo ( FAMILY only_c (c) )`) - tableDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + tableDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "foo") popRow, cleanup := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), tableDesc) defer cleanup() diff --git a/pkg/ccl/changefeedccl/cdcevent/main_test.go b/pkg/ccl/changefeedccl/cdcevent/main_test.go index 2709542d8a4b..59bc12ea39e7 100644 --- a/pkg/ccl/changefeedccl/cdcevent/main_test.go +++ b/pkg/ccl/changefeedccl/cdcevent/main_test.go @@ -12,7 +12,6 @@ import ( "os" "testing" - "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/security/securityassets" "github.com/cockroachdb/cockroach/pkg/security/securitytest" "github.com/cockroachdb/cockroach/pkg/server" @@ -22,7 +21,6 @@ import ( ) func TestMain(m *testing.M) { - defer utilccl.TestingEnableEnterprise()() securityassets.SetLoader(securitytest.EmbeddedAssets) randutil.SeedForTests() serverutils.InitTestServerFactory(server.TestServerFactory) diff --git a/pkg/ccl/changefeedccl/cdcevent/projection_test.go b/pkg/ccl/changefeedccl/cdcevent/projection_test.go index c54c11c2617d..9f530316adb0 100644 --- a/pkg/ccl/changefeedccl/cdcevent/projection_test.go +++ b/pkg/ccl/changefeedccl/cdcevent/projection_test.go @@ -28,7 +28,7 @@ func TestProjection(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(context.Background()) sqlDB := sqlutils.MakeSQLRunner(db) @@ -41,7 +41,7 @@ CREATE TABLE foo ( PRIMARY KEY (b, a) )`) - desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), kvDB, "foo") + desc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "foo") encDatums := makeEncDatumRow(tree.NewDInt(1), tree.NewDString("one"), tree.DNull) t.Run("row_was_deleted", func(t *testing.T) { diff --git a/pkg/ccl/changefeedccl/cdctest/row.go b/pkg/ccl/changefeedccl/cdctest/row.go index d6ab247aea84..f91f987abee7 100644 --- a/pkg/ccl/changefeedccl/cdctest/row.go +++ b/pkg/ccl/changefeedccl/cdctest/row.go @@ -83,15 +83,29 @@ func MakeRangeFeedValueReader( // GetHydratedTableDescriptor returns a table descriptor for the specified // table. The descriptor is "hydrated" if it has user defined data types. func GetHydratedTableDescriptor( - t *testing.T, execCfgI interface{}, kvDB *kv.DB, tableName tree.Name, + t *testing.T, execCfgI interface{}, parts ...tree.Name, ) (td catalog.TableDescriptor) { t.Helper() + dbName, scName, tableName := func() (tree.Name, tree.Name, tree.Name) { + switch len(parts) { + case 1: + return "defaultdb", "public", parts[0] + case 2: + return parts[0], "public", parts[1] + case 3: + return parts[0], parts[1], parts[2] + default: + t.Fatal("invalid length") + return "", "", "" + } + }() + execCfg := execCfgI.(sql.ExecutorConfig) var found bool require.NoError(t, sql.DescsTxn(context.Background(), &execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) (err error) { found, td, err = col.GetImmutableTableByName(ctx, txn, - tree.NewTableNameWithSchema("defaultdb", "public", tableName), + tree.NewTableNameWithSchema(dbName, scName, tableName), tree.ObjectLookupFlags{ CommonLookupFlags: tree.CommonLookupFlags{ Required: true, diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index d8a533f4a837..bcff811cfa32 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -11,8 +11,8 @@ package changefeedccl import ( "context" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" - "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -20,11 +20,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" - "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -87,16 +89,14 @@ func distChangefeedFlow( } } - execCfg := execCtx.ExecCfg() var initialHighWater hlc.Timestamp - var trackedSpans []roachpb.Span + schemaTS := details.StatementTime { - spansTS := details.StatementTime if h := progress.GetHighWater(); h != nil && !h.IsEmpty() { initialHighWater = *h // If we have a high-water set, use it to compute the spans, since the // ones at the statement time may have been garbage collected by now. - spansTS = initialHighWater + schemaTS = initialHighWater } // We want to fetch the target spans as of the timestamp following the @@ -108,32 +108,7 @@ func distChangefeedFlow( // schema change and thus should see the side-effect of the schema change. isRestartAfterCheckpointOrNoInitialScan := progress.GetHighWater() != nil if isRestartAfterCheckpointOrNoInitialScan { - spansTS = spansTS.Next() - } - var err error - var tableDescs []catalog.TableDescriptor - tableDescs, err = fetchTableDescriptors(ctx, execCfg, AllTargets(details), spansTS) - if err != nil { - return err - } - - filters := opts.GetFilters() - - if filters.WithPredicate { - if len(tableDescs) > 1 { - return pgerror.Newf(pgcode.InvalidParameterValue, - "option %s can only be used with 1 changefeed target (found %d)", - changefeedbase.OptPrimaryKeyFilter, len(tableDescs), - ) - } - trackedSpans, err = constrainSpansByExpression(ctx, execCtx, filters.PrimaryKeyFilter, tableDescs[0]) - if err != nil { - return err - } - } else { - for _, d := range tableDescs { - trackedSpans = append(trackedSpans, d.PrimaryIndexSpan(execCfg.Codec)) - } + schemaTS = schemaTS.Next() } } @@ -142,13 +117,8 @@ func distChangefeedFlow( checkpoint = *cf.Checkpoint } - var distflowKnobs changefeeddist.TestingKnobs - if knobs, ok := execCfg.DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs); ok && knobs != nil { - distflowKnobs = knobs.DistflowKnobs - } - - return changefeeddist.StartDistChangefeed( - ctx, execCtx, jobID, details, trackedSpans, initialHighWater, checkpoint, resultsCh, distflowKnobs) + return startDistChangefeed( + ctx, execCtx, jobID, schemaTS, details, initialHighWater, checkpoint, resultsCh) } func fetchTableDescriptors( @@ -191,23 +161,218 @@ func fetchTableDescriptors( return targetDescs, nil } -func constrainSpansByExpression( - ctx context.Context, execCtx sql.JobExecContext, filterStr string, descr catalog.TableDescriptor, -) ([]roachpb.Span, error) { - if filterStr == "" { - return nil, pgerror.Newf(pgcode.InvalidParameterValue, - "option %s must not be empty", changefeedbase.OptPrimaryKeyFilter, - ) +// changefeedResultTypes is the types returned by changefeed stream. +var changefeedResultTypes = []*types.T{ + types.Bytes, // aggregator progress update + types.String, // topic + types.Bytes, // key + types.Bytes, // value +} + +// fetchSpansForTable returns the set of spans for the specified table. +// Usually, this is just the primary index span. +// However, if details.Select is not empty, the set of spans returned may be +// restricted to satisfy predicate in the select clause. In that case, +// possibly updated select clause returned representing the remaining expression +// that still needs to be applied to the events. +func fetchSpansForTables( + ctx context.Context, + execCtx sql.JobExecContext, + tableDescs []catalog.TableDescriptor, + details jobspb.ChangefeedDetails, +) (_ []roachpb.Span, updatedExpression string, _ error) { + var trackedSpans []roachpb.Span + if details.Select == "" { + for _, d := range tableDescs { + trackedSpans = append(trackedSpans, d.PrimaryIndexSpan(execCtx.ExecCfg().Codec)) + } + return trackedSpans, "", nil + } + + if len(tableDescs) != 1 { + return nil, "", pgerror.Newf(pgcode.InvalidParameterValue, + "filter can only be used with single target (found %d)", + len(tableDescs)) } + target := details.TargetSpecifications[0] + includeVirtual := details.Opts[changefeedbase.OptVirtualColumns] == string(changefeedbase.OptVirtualColumnsNull) + return cdceval.ConstrainPrimaryIndexSpanByFilter( + ctx, execCtx, details.Select, tableDescs[0], target, includeVirtual) +} - filterExpr, err := parser.ParseExpr(filterStr) +// startDistChangefeed starts distributed changefeed execution. +func startDistChangefeed( + ctx context.Context, + execCtx sql.JobExecContext, + jobID jobspb.JobID, + schemaTS hlc.Timestamp, + details jobspb.ChangefeedDetails, + initialHighWater hlc.Timestamp, + checkpoint jobspb.ChangefeedProgress_Checkpoint, + resultsCh chan<- tree.Datums, +) error { + execCfg := execCtx.ExecCfg() + tableDescs, err := fetchTableDescriptors(ctx, execCfg, AllTargets(details), schemaTS) if err != nil { - return nil, pgerror.Wrapf(err, pgcode.InvalidParameterValue, - "filter expression %s must be a valid SQL expression", changefeedbase.OptPrimaryKeyFilter) + return err + } + trackedSpans, selectClause, err := fetchSpansForTables(ctx, execCtx, tableDescs, details) + if err != nil { + return err + } + + // Changefeed flows handle transactional consistency themselves. + var noTxn *kv.Txn + + dsp := execCtx.DistSQLPlanner() + evalCtx := execCtx.ExtendedEvalContext() + planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, noTxn, + sql.DistributionTypeAlways) + + var spanPartitions []sql.SpanPartition + if details.SinkURI == `` { + // Sinkless feeds get one ChangeAggregator on the gateway. + spanPartitions = []sql.SpanPartition{{SQLInstanceID: dsp.GatewayID(), Spans: trackedSpans}} + } else { + // All other feeds get a ChangeAggregator local on the leaseholder. + var err error + spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, trackedSpans) + if err != nil { + return err + } + } + + // Use the same checkpoint for all aggregators; each aggregator will only look at + // spans that are assigned to it. + // We could compute per-aggregator checkpoint, but that's probably an overkill. + aggregatorCheckpoint := execinfrapb.ChangeAggregatorSpec_Checkpoint{ + Spans: checkpoint.Spans, + Timestamp: checkpoint.Timestamp, + } + + var checkpointSpanGroup roachpb.SpanGroup + checkpointSpanGroup.Add(checkpoint.Spans...) + + aggregatorSpecs := make([]*execinfrapb.ChangeAggregatorSpec, len(spanPartitions)) + for i, sp := range spanPartitions { + watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans)) + for watchIdx, nodeSpan := range sp.Spans { + initialResolved := initialHighWater + if checkpointSpanGroup.Encloses(nodeSpan) { + initialResolved = checkpoint.Timestamp + } + watches[watchIdx] = execinfrapb.ChangeAggregatorSpec_Watch{ + Span: nodeSpan, + InitialResolved: initialResolved, + } + } + + aggregatorSpecs[i] = &execinfrapb.ChangeAggregatorSpec{ + Watches: watches, + Checkpoint: aggregatorCheckpoint, + Feed: details, + UserProto: execCtx.User().EncodeProto(), + JobID: jobID, + Select: execinfrapb.Expression{Expr: selectClause}, + } + } + + // NB: This SpanFrontier processor depends on the set of tracked spans being + // static. Currently there is no way for them to change after the changefeed + // is created, even if it is paused and unpaused, but #28982 describes some + // ways that this might happen in the future. + changeFrontierSpec := execinfrapb.ChangeFrontierSpec{ + TrackedSpans: trackedSpans, + Feed: details, + JobID: jobID, + UserProto: execCtx.User().EncodeProto(), + } + + cfKnobs := execCfg.DistSQLSrv.TestingKnobs.Changefeed + if knobs, ok := cfKnobs.(*TestingKnobs); ok && knobs != nil && knobs.OnDistflowSpec != nil { + knobs.OnDistflowSpec(aggregatorSpecs, &changeFrontierSpec) } - semaCtx := tree.MakeSemaContext() - spans, _, err := execCtx.ConstrainPrimaryIndexSpanByExpr( - ctx, sql.MustFullyConstrain, nil, descr, &execCtx.ExtendedEvalContext().Context, &semaCtx, filterExpr) - return spans, err + aggregatorCorePlacement := make([]physicalplan.ProcessorCorePlacement, len(spanPartitions)) + for i, sp := range spanPartitions { + aggregatorCorePlacement[i].SQLInstanceID = sp.SQLInstanceID + aggregatorCorePlacement[i].Core.ChangeAggregator = aggregatorSpecs[i] + } + + p := planCtx.NewPhysicalPlan() + p.AddNoInputStage(aggregatorCorePlacement, execinfrapb.PostProcessSpec{}, changefeedResultTypes, execinfrapb.Ordering{}) + p.AddSingleGroupStage( + dsp.GatewayID(), + execinfrapb.ProcessorCoreUnion{ChangeFrontier: &changeFrontierSpec}, + execinfrapb.PostProcessSpec{}, + changefeedResultTypes, + ) + + p.PlanToStreamColMap = []int{1, 2, 3} + dsp.FinalizePlan(planCtx, p) + + resultRows := makeChangefeedResultWriter(resultsCh) + recv := sql.MakeDistSQLReceiver( + ctx, + resultRows, + tree.Rows, + execCtx.ExecCfg().RangeDescriptorCache, + noTxn, + nil, /* clockUpdater */ + evalCtx.Tracing, + execCtx.ExecCfg().ContentionRegistry, + nil, /* testingPushCallback */ + ) + defer recv.Release() + + var finishedSetupFn func() + if details.SinkURI != `` { + // We abuse the job's results channel to make CREATE CHANGEFEED wait for + // this before returning to the user to ensure the setup went okay. Job + // resumption doesn't have the same hack, but at the moment ignores + // results and so is currently okay. Return nil instead of anything + // meaningful so that if we start doing anything with the results + // returned by resumed jobs, then it breaks instead of returning + // nonsense. + finishedSetupFn = func() { resultsCh <- tree.Datums(nil) } + } + + // Copy the evalCtx, as dsp.Run() might change it. + evalCtxCopy := *evalCtx + dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn)() + return resultRows.Err() +} + +// changefeedResultWriter implements the `sql.rowResultWriter` that sends +// the received rows back over the given channel. +type changefeedResultWriter struct { + rowsCh chan<- tree.Datums + rowsAffected int + err error +} + +func makeChangefeedResultWriter(rowsCh chan<- tree.Datums) *changefeedResultWriter { + return &changefeedResultWriter{rowsCh: rowsCh} +} + +func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) error { + // Copy the row because it's not guaranteed to exist after this function + // returns. + row = append(tree.Datums(nil), row...) + + select { + case <-ctx.Done(): + return ctx.Err() + case w.rowsCh <- row: + return nil + } +} +func (w *changefeedResultWriter) IncrementRowsAffected(ctx context.Context, n int) { + w.rowsAffected += n +} +func (w *changefeedResultWriter) SetError(err error) { + w.err = err +} +func (w *changefeedResultWriter) Err() error { + return w.err } diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index e5fab34bebde..8e39c7252182 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -15,7 +15,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" - "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed" @@ -147,7 +146,7 @@ func newChangeAggregatorProcessor( if err := ca.Init( ca, post, - changefeeddist.ChangefeedResultTypes, + changefeedResultTypes, flowCtx, processorID, output, @@ -306,8 +305,8 @@ func (ca *changeAggregator) Start(ctx context.Context) { } ca.eventConsumer, err = newKVEventToRowConsumer( - ctx, ca.flowCtx.Cfg, ca.frontier.SpanFrontier(), kvFeedHighWater, - ca.sink, ca.encoder, feed, ca.knobs, ca.topicNamer) + ctx, ca.flowCtx.Cfg, ca.flowCtx.EvalCtx, ca.frontier.SpanFrontier(), kvFeedHighWater, + ca.sink, ca.encoder, feed, ca.spec.Select, ca.knobs, ca.topicNamer) if err != nil { // Early abort in the case that there is an error setting up the consumption. @@ -375,9 +374,8 @@ func (ca *changeAggregator) makeKVFeedCfg( filters := opts.GetFilters() cfg := ca.flowCtx.Cfg - var sf schemafeed.SchemaFeed - initialScanOnly := endTime.EqOrdering(initialHighWater) + var sf schemafeed.SchemaFeed if schemaChange.Policy == changefeedbase.OptSchemaChangePolicyIgnore || initialScanOnly { sf = schemafeed.DoNothingSchemaFeed @@ -1080,7 +1078,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad } func (cf *changeFrontier) noteAggregatorProgress(d rowenc.EncDatum) error { - if err := d.EnsureDecoded(changefeeddist.ChangefeedResultTypes[0], &cf.a); err != nil { + if err := d.EnsureDecoded(changefeedResultTypes[0], &cf.a); err != nil { return err } raw, ok := d.Datum.(*tree.DBytes) diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 874e6f54fb31..c95c3c931817 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -17,6 +17,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backupresolver" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" @@ -293,7 +294,7 @@ func createChangefeedJobRecord( p.BufferClientNotice(ctx, pgnotice.Newf("%s", warning)) } - jobDescription, err := changefeedJobDescription(p, changefeedStmt.CreateChangefeed, sinkURI, opts) + jobDescription, err := changefeedJobDescription(changefeedStmt.CreateChangefeed, sinkURI, opts) if err != nil { return nil, err } @@ -376,6 +377,45 @@ func createChangefeedJobRecord( } } + if changefeedStmt.Select != nil { + // Serialize changefeed expression. + if err := validateAndNormalizeChangefeedExpression( + ctx, p, changefeedStmt.Select, targetDescs, targets, opts.IncludeVirtual(), + ); err != nil { + return nil, err + } + details.Select = cdceval.AsStringUnredacted(changefeedStmt.Select) + } + + // TODO(dan): In an attempt to present the most helpful error message to the + // user, the ordering requirements between all these usage validations have + // become extremely fragile and non-obvious. + // + // - `validateDetails` has to run first to fill in defaults for `envelope` + // and `format` if the user didn't specify them. + // - Then `getEncoder` is run to return any configuration errors. + // - Then the changefeed is opted in to `OptKeyInValue` for any cloud + // storage sink or webhook sink. Kafka etc have a key and value field in + // each message but cloud storage sinks and webhook sinks don't have + // anywhere to put the key. So if the key is not in the value, then for + // DELETEs there is no way to recover which key was deleted. We could make + // the user explicitly pass this option for every cloud storage sink/ + // webhook sink and error if they don't, but that seems user-hostile for + // insufficient reason. We can't do this any earlier, because we might + // return errors about `key_in_value` being incompatible which is + // confusing when the user didn't type that option. + // This is the same for the topic and webhook sink, which uses + // `topic_in_value` to embed the topic in the value by default, since it + // has no other avenue to express the topic. + // - Finally, we create a "canary" sink to test sink configuration and + // connectivity. This has to go last because it is strange to return sink + // connectivity errors before we've finished validating all the other + // options. We should probably split sink configuration checking and sink + // connectivity checking into separate methods. + // + // The only upside in all this nonsense is the tests are decent. I've tuned + // this particular order simply by rearranging stuff until the changefeedccl + // tests all pass. parsedSink, err := url.Parse(sinkURI) if err != nil { return nil, err @@ -391,23 +431,6 @@ func createChangefeedJobRecord( return nil, err } - filters := opts.GetFilters() - - if filters.WithPredicate { - policy, err := opts.GetSchemaChangeHandlingOptions() - if err != nil { - return nil, err - } - if policy.Policy != changefeedbase.OptSchemaChangePolicyStop { - return nil, errors.Newf("option %s can only be used with %s=%s", - changefeedbase.OptPrimaryKeyFilter, changefeedbase.OptSchemaChangePolicy, - changefeedbase.OptSchemaChangePolicyStop) - } - if err := validatePrimaryKeyFilterExpression(ctx, p, filters.PrimaryKeyFilter, targetDescs); err != nil { - return nil, err - } - } - encodingOpts, err := opts.GetEncodingOptions() if err != nil { return nil, err @@ -713,10 +736,7 @@ func validateSink( } func changefeedJobDescription( - p sql.PlanHookState, - changefeed *tree.CreateChangefeed, - sinkURI string, - opts changefeedbase.StatementOptions, + changefeed *tree.CreateChangefeed, sinkURI string, opts changefeedbase.StatementOptions, ) (string, error) { cleanedSinkURI, err := cloud.SanitizeExternalStorageURI(sinkURI, []string{ changefeedbase.SinkParamSASLPassword, @@ -733,6 +753,7 @@ func changefeedJobDescription( c := &tree.CreateChangefeed{ Targets: changefeed.Targets, SinkURI: tree.NewDString(cleanedSinkURI), + Select: changefeed.Select, } opts.ForEachWithRedaction(func(k string, v string) { opt := tree.KVOption{Key: tree.Name(k)} @@ -742,8 +763,7 @@ func changefeedJobDescription( c.Options = append(c.Options, opt) }) sort.Slice(c.Options, func(i, j int) bool { return c.Options[i].Key < c.Options[j].Key }) - ann := p.ExtendedEvalContext().Annotations - return tree.AsStringWithFQNames(c, ann), nil + return tree.AsString(c), nil } func redactUser(uri string) string { @@ -780,29 +800,37 @@ func validateDetailsAndOptions( ) } } + + { + if details.Select != "" { + if len(details.TargetSpecifications) != 1 { + return errors.Errorf( + "CREATE CHANGEFEED ... AS SELECT ... is not supported for more than 1 table") + } + } + } return nil } -func validatePrimaryKeyFilterExpression( +// validateAndNormalizeChangefeedExpression validates and normalizes changefeed expressions. +// This method modifies passed in select clause to reflect normalization step. +func validateAndNormalizeChangefeedExpression( ctx context.Context, execCtx sql.JobExecContext, - filterExpr string, + sc *tree.SelectClause, descriptors map[tree.TablePattern]catalog.Descriptor, + targets []jobspb.ChangefeedTargetSpecification, + includeVirtual bool, ) error { - if len(descriptors) > 1 { - return pgerror.Newf(pgcode.InvalidParameterValue, - "option %s can only be used with 1 changefeed target (found %d)", - changefeedbase.OptPrimaryKeyFilter, len(descriptors), - ) + if len(descriptors) != 1 || len(targets) != 1 { + return pgerror.Newf(pgcode.InvalidParameterValue, "CDC expressions require single table") } - var tableDescr catalog.TableDescriptor for _, d := range descriptors { tableDescr = d.(catalog.TableDescriptor) } - - _, err := constrainSpansByExpression(ctx, execCtx, filterExpr, tableDescr) - return err + return cdceval.NormalizeAndValidateSelectForTarget( + ctx, execCtx, tableDescr, targets[0], sc, includeVirtual) } type changefeedResumer struct { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 9995e6f3f95f..82140e09618d 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -29,9 +29,9 @@ import ( "github.com/Shopify/sarama" "github.com/cockroachdb/cockroach-go/v2/crdb" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" - "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // multi-tenant tests _ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl" // locality-related table mutations @@ -55,6 +55,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" @@ -62,6 +63,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils" "github.com/cockroachdb/cockroach/pkg/storage" @@ -4019,7 +4023,9 @@ func TestChangefeedDescription(t *testing.T) { defer stopServer() sqlDB := sqlutils.MakeSQLRunner(s.DB) - sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) + // Create enum to ensure enum values displayed correctly in the summary. + sqlDB.Exec(t, `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, status status)`) sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`) sink, cleanup := sqlutils.PGUrl(t, s.Server.SQLAddr(), t.Name(), url.User(username.RootUser)) @@ -4027,19 +4033,49 @@ func TestChangefeedDescription(t *testing.T) { sink.Scheme = changefeedbase.SinkSchemeExperimentalSQL sink.Path = `d` - var jobID jobspb.JobID - sqlDB.QueryRow(t, - `CREATE CHANGEFEED FOR foo INTO $1 WITH updated, envelope = $2`, sink.String(), `wrapped`, - ).Scan(&jobID) - - var description string - sqlDB.QueryRow(t, - `SELECT description FROM [SHOW JOBS] WHERE job_id = $1`, jobID, - ).Scan(&description) redactedSink := strings.Replace(sink.String(), username.RootUser, `redacted`, 1) - expected := `CREATE CHANGEFEED FOR TABLE foo INTO '` + redactedSink + - `' WITH envelope = 'wrapped', updated` - require.Equal(t, expected, description) + for _, tc := range []struct { + create string + descr string + }{ + { + create: "CREATE CHANGEFEED FOR foo INTO $1 WITH updated, envelope = $2", + descr: `CREATE CHANGEFEED FOR TABLE foo INTO '` + redactedSink + `' WITH envelope = 'wrapped', updated`, + }, + { + create: "CREATE CHANGEFEED FOR public.foo INTO $1 WITH updated, envelope = $2", + descr: `CREATE CHANGEFEED FOR TABLE public.foo INTO '` + redactedSink + `' WITH envelope = 'wrapped', updated`, + }, + { + create: "CREATE CHANGEFEED FOR d.public.foo INTO $1 WITH updated, envelope = $2", + descr: `CREATE CHANGEFEED FOR TABLE d.public.foo INTO '` + redactedSink + `' WITH envelope = 'wrapped', updated`, + }, + { + create: "CREATE CHANGEFEED INTO $1 WITH updated, envelope = $2 AS SELECT a FROM foo WHERE a % 2 = 0", + descr: `CREATE CHANGEFEED INTO '` + redactedSink + `' WITH envelope = 'wrapped', updated AS SELECT a FROM foo WHERE (a % 2) = 0`, + }, + { + create: "CREATE CHANGEFEED INTO $1 WITH updated, envelope = $2 AS SELECT a FROM public.foo AS bar WHERE a % 2 = 0", + descr: `CREATE CHANGEFEED INTO '` + redactedSink + `' WITH envelope = 'wrapped', updated AS SELECT a FROM public.foo AS bar WHERE (a % 2) = 0`, + }, + { + create: "CREATE CHANGEFEED INTO $1 WITH updated, envelope = $2 AS SELECT a FROM foo WHERE status IN ('open', 'closed')", + descr: `CREATE CHANGEFEED INTO '` + redactedSink + `' WITH envelope = 'wrapped', updated AS SELECT a FROM foo WHERE status IN ('open', 'closed')`, + }, + } { + t.Run(tc.create, func(t *testing.T) { + var jobID jobspb.JobID + sqlDB.QueryRow(t, tc.create, sink.String(), `wrapped`).Scan(&jobID) + + var description string + sqlDB.QueryRow(t, + `SELECT description FROM [SHOW JOBS] WHERE job_id = $1`, jobID, + ).Scan(&description) + + require.Equal(t, tc.descr, description) + }) + } + } func TestChangefeedPauseUnpause(t *testing.T) { @@ -5990,53 +6026,404 @@ func TestChangefeedOnlyInitialScanCSVSinkless(t *testing.T) { cdcTest(t, testFn, feedTestForceSink("sinkless")) } -func TestChangefeedPrimaryKeyFilter(t *testing.T) { +func TestChangefeedPredicates(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(alias string) cdcTestFn { + return func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`) + sqlDB.Exec(t, ` +CREATE TABLE foo ( + a INT, + b STRING, + c STRING, + d STRING AS (concat(b, c)) VIRTUAL, + e status DEFAULT 'inactive', + PRIMARY KEY (a, b) +)`) + + sqlDB.Exec(t, ` +INSERT INTO foo (a, b) VALUES (0, 'zero'), (1, 'one'); +INSERT INTO foo (a, b, e) VALUES (2, 'two', 'closed'); +`) + topic, fromClause := "foo", "foo" + if alias != "" { + topic, fromClause = "foo", "foo AS "+alias + } + feed := feed(t, f, ` +CREATE CHANGEFEED +AS SELECT * FROM `+fromClause+` +WHERE e IN ('open', 'closed') AND NOT cdc_is_delete()`) + defer closeFeed(t, feed) + + assertPayloads(t, feed, []string{ + topic + `: [2, "two"]->{"after": {"a": 2, "b": "two", "c": null, "e": "closed"}}`, + }) + + sqlDB.Exec(t, ` +UPDATE foo SET e = 'open', c = 'really open' WHERE a=0; -- should be emitted +DELETE FROM foo WHERE a=2; -- should be skipped +INSERT INTO foo (a, b, e) VALUES (3, 'tres', 'closed'); -- should be emitted +`) + + assertPayloads(t, feed, []string{ + topic + `: [0, "zero"]->{"after": {"a": 0, "b": "zero", "c": "really open", "e": "open"}}`, + topic + `: [3, "tres"]->{"after": {"a": 3, "b": "tres", "c": null, "e": "closed"}}`, + }) + } + } + + testutils.RunTrueAndFalse(t, "alias", func(t *testing.T, useAlias bool) { + alias := "" + if useAlias { + alias = "bar" + } + cdcTest(t, testFn(alias)) + }) +} + +// Verify when running predicate changefeed, the set of spans is constrained +// based on predicate expression. +func TestChangefeedConstrainsSpansBasedOnPredicate(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(s.DB) - sqlDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY, b string)") - sqlDB.Exec(t, "CREATE TABLE bar (a INT PRIMARY KEY, b string)") - sqlDB.Exec(t, "INSERT INTO foo SELECT * FROM generate_series(1, 20)") - - sqlDB.ExpectErr(t, "can only be used with schema_change_policy=stop", - `CREATE CHANGEFEED FOR foo WITH primary_key_filter='a < 5 OR a > 18'`) + sqlDB.Exec(t, `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`) + sqlDB.Exec(t, ` +CREATE TABLE foo ( + a INT, + b STRING, + c STRING, + d STRING AS (concat(b, c)) VIRTUAL, + e status DEFAULT 'inactive', + PRIMARY KEY (a, b) +)`) - sqlDB.ExpectErr(t, `option primary_key_filter can only be used with 1 changefeed target`, - `CREATE CHANGEFEED FOR foo, bar WITH schema_change_policy='stop', primary_key_filter='a < 5 OR a > 18'`) + sqlDB.Exec(t, ` +INSERT INTO foo (a, b) VALUES (0, 'zero'), (1, 'one'); +INSERT INTO foo (a, b, e) VALUES (2, 'two', 'closed'); +INSERT INTO foo (a, b, e) VALUES (11, 'eleven', 'closed'); +`) + // Save change aggregator specs. + knobs := s.TestingKnobs. + DistSQL.(*execinfra.TestingKnobs). + Changefeed.(*TestingKnobs) + specs := make(chan []*execinfrapb.ChangeAggregatorSpec, 1) + knobs.OnDistflowSpec = func( + aggregatorSpecs []*execinfrapb.ChangeAggregatorSpec, _ *execinfrapb.ChangeFrontierSpec, + ) { + specs <- aggregatorSpecs + } - feed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH schema_change_policy='stop', primary_key_filter='a < 5 OR a > 18'`) + feed := feed(t, f, ` +CREATE CHANGEFEED +AS SELECT * FROM foo +WHERE a > 10 AND e IN ('open', 'closed') AND NOT cdc_is_delete()`) defer closeFeed(t, feed) assertPayloads(t, feed, []string{ - `foo: [1]->{"after": {"a": 1, "b": null}}`, - `foo: [2]->{"after": {"a": 2, "b": null}}`, - `foo: [3]->{"after": {"a": 3, "b": null}}`, - `foo: [4]->{"after": {"a": 4, "b": null}}`, - `foo: [19]->{"after": {"a": 19, "b": null}}`, - `foo: [20]->{"after": {"a": 20, "b": null}}`, + `foo: [11, "eleven"]->{"after": {"a": 11, "b": "eleven", "c": null, "e": "closed"}}`, }) - for i := 0; i < 22; i++ { - sqlDB.Exec(t, "UPSERT INTO foo VALUES ($1, $2)", i, strconv.Itoa(i)) - } + aggSpec := <-specs + require.Equal(t, 1, len(aggSpec)) + require.Equal(t, 1, len(aggSpec[0].Watches)) - assertPayloads(t, feed, []string{ - `foo: [0]->{"after": {"a": 0, "b": "0"}}`, - `foo: [1]->{"after": {"a": 1, "b": "1"}}`, - `foo: [2]->{"after": {"a": 2, "b": "2"}}`, - `foo: [3]->{"after": {"a": 3, "b": "3"}}`, - `foo: [4]->{"after": {"a": 4, "b": "4"}}`, - `foo: [19]->{"after": {"a": 19, "b": "19"}}`, - `foo: [20]->{"after": {"a": 20, "b": "20"}}`, - `foo: [21]->{"after": {"a": 21, "b": "21"}}`, - }) + // Verify span is "smaller" than the primary index span. + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + s.Server.ExecutorConfig().(sql.ExecutorConfig).DB, s.Codec, "d", "foo") + span := aggSpec[0].Watches[0].Span + require.Equal(t, -1, fooDesc.PrimaryIndexSpan(s.Codec).Key.Compare(span.Key)) + + // Aggregators should get modified select expression reflecting the fact + // that the set of spans was reduced (note: we no longer expect to see a > + // 10). + expectedExpr := normalizeCDCExpression(t, s.Server.ExecutorConfig(), + `SELECT * FROM foo WHERE (e IN ('open':::d.public.status, 'closed':::d.public.status)) AND (NOT cdc_is_delete())`) + require.Equal(t, expectedExpr, aggSpec[0].Select.Expr) } cdcTest(t, testFn) } +func normalizeCDCExpression(t *testing.T, execCfgI interface{}, exprStr string) string { + t.Helper() + + sc, err := cdceval.ParseChangefeedExpression(exprStr) + require.NoError(t, err) + + desc := cdctest.GetHydratedTableDescriptor(t, execCfgI, + "d", "public", tree.Name(tree.AsString(sc.From.Tables[0]))) + target := jobspb.ChangefeedTargetSpecification{TableID: desc.GetID()} + + ctx := context.Background() + execCfg := execCfgI.(sql.ExecutorConfig) + + p, cleanup := sql.NewInternalPlanner("test", + execCfg.DB.NewTxn(ctx, "test-planner"), + username.RootUserName(), &sql.MemoryMetrics{}, &execCfg, + sessiondatapb.SessionData{ + Database: "d", + SearchPath: sessiondata.DefaultSearchPath.GetPathArray(), + }) + defer cleanup() + + execCtx := p.(sql.JobExecContext) + require.NoError(t, cdceval.NormalizeAndValidateSelectForTarget( + context.Background(), execCtx, desc, target, sc, false, + )) + log.Infof(context.Background(), "PostNorm: %s", tree.StmtDebugString(sc)) + return cdceval.AsStringUnredacted(sc) +} + +// Some predicates and projections can be verified when creating changefeed. +// The types of errors that can be detected early on is restricted to simple checks +// (such as type checking, non-existent columns, etc). More complex errors detected +// during execution. +// Verify that's the case. +func TestChangefeedInvalidPredicate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + _, db, stopServer := startTestFullServer(t, feedTestOptions{}) + defer stopServer() + sqlDB := sqlutils.MakeSQLRunner(db) + + sqlDB.Exec(t, `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`) + sqlDB.Exec(t, ` +CREATE TABLE foo ( + a INT, + b STRING, + c STRING, + d STRING AS (concat(b, c)) VIRTUAL, + e status DEFAULT 'inactive', + PRIMARY KEY (a, b) +)`) + + for _, tc := range []struct { + name string + create string + err string + }{ + { + name: "no such column", + create: `CREATE CHANGEFEED INTO 'null://' AS SELECT no_such_column FROM foo`, + err: `column "no_such_column" does not exist`, + }, + { + name: "wrong type", + create: `CREATE CHANGEFEED INTO 'null://' AS SELECT * FROM foo WHERE a = 'wrong type'`, + err: `could not parse "wrong type" as type int`, + }, + { + name: "invalid enum value", + create: `CREATE CHANGEFEED INTO 'null://' AS SELECT * FROM foo WHERE e = 'bad'`, + err: `invalid input value for enum status: "bad"`, + }, + { + name: "contradiction: a > 1 && a < 1", + create: `CREATE CHANGEFEED INTO 'null://' AS SELECT * FROM foo WHERE a > 1 AND a < 1`, + err: `filter .* is a contradiction`, + }, + { + name: "contradiction: a IS null", + create: `CREATE CHANGEFEED INTO 'null://' AS SELECT * FROM foo WHERE a IS NULL`, + err: `filter .* is a contradiction`, + }, + { + name: "wrong table name", + create: `CREATE CHANGEFEED INTO 'null://' AS SELECT * FROM foo AS bar WHERE foo.a > 0`, + err: `no data source matches prefix: foo in this context`, + }, + } { + t.Run(tc.name, func(t *testing.T) { + sqlDB.ExpectErr(t, tc.err, tc.create) + }) + } +} + +func TestChangefeedPredicateWithSchemaChange(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRace(t, "takes too long under race") + + setupSQL := []string{ + `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`, + `CREATE SCHEMA alt`, + `CREATE TYPE alt.status AS ENUM ('alt_open', 'alt_closed')`, + `CREATE TABLE foo ( + a INT, + b STRING, + c STRING, + e status DEFAULT 'inactive', + PRIMARY KEY (a, b) +)`, + `INSERT INTO foo (a, b) VALUES (1, 'one')`, + `INSERT INTO foo (a, b, c, e) VALUES (2, 'two', 'c string', 'open')`, + } + initialPayload := []string{ + `foo: [1, "one"]->{"after": {"a": 1, "b": "one", "c": null, "e": "inactive"}}`, + `foo: [2, "two"]->{"after": {"a": 2, "b": "two", "c": "c string", "e": "open"}}`, + } + + type testCase struct { + name string + createFeedStmt string // Create changefeed statement. + initialPayload []string // Expected payload after create. + alterStmt string // Alter statement to execute. + afterAlterStmt string // Execute after alter statement. + expectErr string // Alter may result in changefeed terminating with error. + payload []string // Expect the following payload after executing afterAlterStmt. + } + + testFn := func(tc testCase) cdcTestFn { + return func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + sqlDB.ExecMultiple(t, setupSQL...) + foo := feed(t, f, tc.createFeedStmt) + feedJob := foo.(cdctest.EnterpriseTestFeed) + defer closeFeed(t, foo) + + assertPayloads(t, foo, tc.initialPayload) + + sqlDB.Exec(t, tc.alterStmt) + + // Execute afterAlterStmt immediately following alterStmt. Sometimes, we + // need to e.g. insert new rows in order to observe changefeed error. + if tc.afterAlterStmt != "" { + sqlDB.Exec(t, tc.afterAlterStmt) + } + + if tc.expectErr != "" { + require.NoError(t, feedJob.WaitForStatus( + func(s jobs.Status) bool { return s == jobs.StatusFailed })) + require.Regexp(t, tc.expectErr, feedJob.FetchTerminalJobErr()) + } else { + assertPayloads(t, foo, tc.payload) + } + } + } + + for _, tc := range []testCase{ + { + name: "add column", + createFeedStmt: "CREATE CHANGEFEED AS SELECT * FROM foo", + initialPayload: initialPayload, + alterStmt: "ALTER TABLE foo ADD COLUMN new STRING", + payload: []string{ + `foo: [1, "one"]->{"after": {"a": 1, "b": "one", "c": null, "e": "inactive", "new": null}}`, + `foo: [2, "two"]->{"after": {"a": 2, "b": "two", "c": "c string", "e": "open", "new": null}}`, + }, + }, + { + // This test adds a column with 'alt.status' type. The table already has a + // column "e" with "public.status" type. Verify that we correctly resolve + // enums with the same enum name. + name: "add alt.status", + createFeedStmt: "CREATE CHANGEFEED AS SELECT * FROM foo", + initialPayload: initialPayload, + alterStmt: "ALTER TABLE foo ADD COLUMN alt alt.status", + afterAlterStmt: "INSERT INTO foo (a, b, alt) VALUES (3, 'tres', 'alt_open')", + payload: []string{ + `foo: [1, "one"]->{"after": {"a": 1, "alt": null, "b": "one", "c": null, "e": "inactive"}}`, + `foo: [2, "two"]->{"after": {"a": 2, "alt": null, "b": "two", "c": "c string", "e": "open"}}`, + `foo: [3, "tres"]->{"after": {"a": 3, "alt": "alt_open", "b": "tres", "c": null, "e": "inactive"}}`, + }, + }, + { + name: "drop column", + createFeedStmt: "CREATE CHANGEFEED AS SELECT * FROM foo", + initialPayload: initialPayload, + alterStmt: "ALTER TABLE foo DROP COLUMN c", + afterAlterStmt: "INSERT INTO foo (a, b) VALUES (3, 'tres')", + payload: []string{ + `foo: [1, "one"]->{"after": {"a": 1, "b": "one", "e": "inactive"}}`, + `foo: [2, "two"]->{"after": {"a": 2, "b": "two", "e": "open"}}`, + `foo: [3, "tres"]->{"after": {"a": 3, "b": "tres", "e": "inactive"}}`, + }, + }, + { + name: "drop referenced column projection", + createFeedStmt: "CREATE CHANGEFEED AS SELECT a, b, c, e FROM foo", + initialPayload: initialPayload, + alterStmt: "ALTER TABLE foo DROP COLUMN c", + expectErr: `while evaluating projection: SELECT .*: column "c" does not exist`, + }, + { + name: "drop referenced column filter", + createFeedStmt: "CREATE CHANGEFEED AS SELECT * FROM foo WHERE c IS NOT NULL", + initialPayload: []string{ + `foo: [2, "two"]->{"after": {"a": 2, "b": "two", "c": "c string", "e": "open"}}`, + }, + alterStmt: "ALTER TABLE foo DROP COLUMN c", + expectErr: `while matching filter: SELECT .*: column "c" does not exist`, + }, + { + name: "rename referenced column projection", + createFeedStmt: "CREATE CHANGEFEED AS SELECT a, b, c, e FROM foo", + initialPayload: initialPayload, + alterStmt: "ALTER TABLE foo RENAME COLUMN c TO c_new", + afterAlterStmt: "INSERT INTO foo (a, b) VALUES (3, 'tres')", + expectErr: `while evaluating projection: SELECT .*: column "c" does not exist`, + }, + { + name: "rename referenced column filter", + createFeedStmt: "CREATE CHANGEFEED AS SELECT * FROM foo WHERE c IS NOT NULL", + initialPayload: []string{ + `foo: [2, "two"]->{"after": {"a": 2, "b": "two", "c": "c string", "e": "open"}}`, + }, + alterStmt: "ALTER TABLE foo RENAME COLUMN c TO c_new", + afterAlterStmt: "INSERT INTO foo (a, b) VALUES (3, 'tres')", + expectErr: `while matching filter: SELECT .*: column "c" does not exist`, + }, + { + name: "alter enum", + createFeedStmt: "CREATE CHANGEFEED AS SELECT * FROM foo", + initialPayload: initialPayload, + alterStmt: "ALTER TYPE status ADD VALUE 'pending'", + afterAlterStmt: "INSERT INTO foo (a, b, e) VALUES (3, 'tres', 'pending')", + payload: []string{ + `foo: [3, "tres"]->{"after": {"a": 3, "b": "tres", "c": null, "e": "pending"}}`, + }, + }, + { + name: "alter enum value fails", + createFeedStmt: "CREATE CHANGEFEED AS SELECT * FROM foo WHERE e = 'open'", + initialPayload: []string{ + `foo: [2, "two"]->{"after": {"a": 2, "b": "two", "c": "c string", "e": "open"}}`, + }, + alterStmt: "ALTER TYPE status RENAME VALUE 'open' TO 'active'", + afterAlterStmt: "INSERT INTO foo (a, b, e) VALUES (3, 'tres', 'active')", + expectErr: `invalid input value for enum status: "open"`, + }, + { + name: "alter enum use correct enum version", + createFeedStmt: "CREATE CHANGEFEED WITH diff AS SELECT e, cdc_prev()->'e' AS prev_e FROM foo", + initialPayload: []string{ + `foo: [1, "one"]->{"after": {"e": "inactive", "prev_e": null}, "before": null}`, + `foo: [2, "two"]->{"after": {"e": "open", "prev_e": null}, "before": null}`, + }, + alterStmt: "ALTER TYPE status ADD VALUE 'done'", + afterAlterStmt: "UPDATE foo SET e = 'done', c = 'c value' WHERE a = 1", + payload: []string{ + `foo: [1, "one"]->{"after": {"e": "done", "prev_e": "inactive"}, "before": null}`, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { + cdcTest(t, testFn(tc), feedTestEnterpriseSinks) + }) + }) + } +} + func startMonitorWithBudget(budget int64) *mon.BytesMonitor { mm := mon.NewMonitorWithLimit( "test-mm", mon.MemoryResource, budget, @@ -6179,18 +6566,17 @@ func TestChangefeedMultiPodTenantPlanning(t *testing.T) { // Record the number of aggregators in planning aggregatorCount := 0 - distflowKnobs := changefeeddist.TestingKnobs{ - OnDistflowSpec: func(aggregatorSpecs []*execinfrapb.ChangeAggregatorSpec, _ *execinfrapb.ChangeFrontierSpec) { - aggregatorCount = len(aggregatorSpecs) - }, - } // Create 2 connections of the same tenant on a cluster to have 2 pods tc, _, cleanupDB := startTestCluster(t) defer cleanupDB() tenantKnobs := base.TestingKnobs{ - DistSQL: &execinfra.TestingKnobs{Changefeed: &TestingKnobs{DistflowKnobs: distflowKnobs}}, + DistSQL: &execinfra.TestingKnobs{Changefeed: &TestingKnobs{ + OnDistflowSpec: func(aggregatorSpecs []*execinfrapb.ChangeAggregatorSpec, _ *execinfrapb.ChangeFrontierSpec) { + aggregatorCount = len(aggregatorSpecs) + }, + }}, JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), Server: &server.TestingKnobs{}, } diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 0e81e8f15366..bddf28ec5036 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -96,7 +96,6 @@ const ( OptOnError = `on_error` OptMetricsScope = `metrics_label` OptVirtualColumns = `virtual_columns` - OptPrimaryKeyFilter = `primary_key_filter` OptVirtualColumnsOmitted VirtualColumnVisibility = `omitted` OptVirtualColumnsNull VirtualColumnVisibility = `null` @@ -313,7 +312,6 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{ OptOnError: enum("pause", "fail"), OptMetricsScope: stringOption, OptVirtualColumns: enum("omitted", "null"), - OptPrimaryKeyFilter: stringOption, } // CommonOptions is options common to all sinks @@ -325,7 +323,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope, OptSchemaChangeEvents, OptSchemaChangePolicy, OptProtectDataFromGCOnPause, OptOnError, OptInitialScan, OptNoInitialScan, OptInitialScanOnly, - OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics, OptPrimaryKeyFilter) + OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics) // SQLValidOptions is options exclusive to SQL sink var SQLValidOptions map[string]struct{} = nil @@ -758,19 +756,14 @@ func (s StatementOptions) GetSchemaChangeHandlingOptions() (SchemaChangeHandling // Filters are aspects of the feed that the backing // kvfeed or rangefeed want to know about. type Filters struct { - WithDiff bool - WithPredicate bool - PrimaryKeyFilter string + WithDiff bool } // GetFilters returns a populated Filters. func (s StatementOptions) GetFilters() Filters { _, withDiff := s.m[OptDiff] - filter, withPredicate := s.m[OptPrimaryKeyFilter] return Filters{ - WithDiff: withDiff, - WithPredicate: withPredicate, - PrimaryKeyFilter: filter, + WithDiff: withDiff, } } diff --git a/pkg/ccl/changefeedccl/changefeeddist/BUILD.bazel b/pkg/ccl/changefeedccl/changefeeddist/BUILD.bazel deleted file mode 100644 index 2c2932fa9d9e..000000000000 --- a/pkg/ccl/changefeedccl/changefeeddist/BUILD.bazel +++ /dev/null @@ -1,22 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") - -go_library( - name = "changefeeddist", - srcs = [ - "distflow.go", - "testing_knobs.go", - ], - importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist", - visibility = ["//visibility:public"], - deps = [ - "//pkg/jobs/jobspb", - "//pkg/kv", - "//pkg/roachpb", - "//pkg/sql", - "//pkg/sql/execinfrapb", - "//pkg/sql/physicalplan", - "//pkg/sql/sem/tree", - "//pkg/sql/types", - "//pkg/util/hlc", - ], -) diff --git a/pkg/ccl/changefeedccl/changefeeddist/distflow.go b/pkg/ccl/changefeedccl/changefeeddist/distflow.go deleted file mode 100644 index 1b17dd3d1191..000000000000 --- a/pkg/ccl/changefeedccl/changefeeddist/distflow.go +++ /dev/null @@ -1,197 +0,0 @@ -// Copyright 2021 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package changefeeddist - -import ( - "context" - - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/types" - "github.com/cockroachdb/cockroach/pkg/util/hlc" -) - -// ChangefeedResultTypes is the types returned by changefeed stream. -var ChangefeedResultTypes = []*types.T{ - types.Bytes, // aggregator progress update - types.String, // topic - types.Bytes, // key - types.Bytes, // value -} - -// StartDistChangefeed starts distributed changefeed execution. -func StartDistChangefeed( - ctx context.Context, - execCtx sql.JobExecContext, - jobID jobspb.JobID, - details jobspb.ChangefeedDetails, - trackedSpans []roachpb.Span, - initialHighWater hlc.Timestamp, - checkpoint jobspb.ChangefeedProgress_Checkpoint, - resultsCh chan<- tree.Datums, - knobs TestingKnobs, -) error { - // Changefeed flows handle transactional consistency themselves. - var noTxn *kv.Txn - - dsp := execCtx.DistSQLPlanner() - evalCtx := execCtx.ExtendedEvalContext() - planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, noTxn, - sql.DistributionTypeAlways) - - var spanPartitions []sql.SpanPartition - if details.SinkURI == `` { - // Sinkless feeds get one ChangeAggregator on the gateway. - spanPartitions = []sql.SpanPartition{{SQLInstanceID: dsp.GatewayID(), Spans: trackedSpans}} - } else { - // All other feeds get a ChangeAggregator local on the leaseholder. - var err error - spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, trackedSpans) - if err != nil { - return err - } - } - - // Use the same checkpoint for all aggregators; each aggregator will only look at - // spans that are assigned to it. - // We could compute per-aggregator checkpoint, but that's probably an overkill. - aggregatorCheckpoint := execinfrapb.ChangeAggregatorSpec_Checkpoint{ - Spans: checkpoint.Spans, - Timestamp: checkpoint.Timestamp, - } - - var checkpointSpanGroup roachpb.SpanGroup - checkpointSpanGroup.Add(checkpoint.Spans...) - - aggregatorSpecs := make([]*execinfrapb.ChangeAggregatorSpec, len(spanPartitions)) - for i, sp := range spanPartitions { - watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans)) - for watchIdx, nodeSpan := range sp.Spans { - initialResolved := initialHighWater - if checkpointSpanGroup.Encloses(nodeSpan) { - initialResolved = checkpoint.Timestamp - } - watches[watchIdx] = execinfrapb.ChangeAggregatorSpec_Watch{ - Span: nodeSpan, - InitialResolved: initialResolved, - } - } - - aggregatorSpecs[i] = &execinfrapb.ChangeAggregatorSpec{ - Watches: watches, - Checkpoint: aggregatorCheckpoint, - Feed: details, - UserProto: execCtx.User().EncodeProto(), - JobID: jobID, - } - } - - // NB: This SpanFrontier processor depends on the set of tracked spans being - // static. Currently there is no way for them to change after the changefeed - // is created, even if it is paused and unpaused, but #28982 describes some - // ways that this might happen in the future. - changeFrontierSpec := execinfrapb.ChangeFrontierSpec{ - TrackedSpans: trackedSpans, - Feed: details, - JobID: jobID, - UserProto: execCtx.User().EncodeProto(), - } - - if knobs.OnDistflowSpec != nil { - knobs.OnDistflowSpec(aggregatorSpecs, &changeFrontierSpec) - } - - aggregatorCorePlacement := make([]physicalplan.ProcessorCorePlacement, len(spanPartitions)) - for i, sp := range spanPartitions { - aggregatorCorePlacement[i].SQLInstanceID = sp.SQLInstanceID - aggregatorCorePlacement[i].Core.ChangeAggregator = aggregatorSpecs[i] - } - - p := planCtx.NewPhysicalPlan() - p.AddNoInputStage(aggregatorCorePlacement, execinfrapb.PostProcessSpec{}, ChangefeedResultTypes, execinfrapb.Ordering{}) - p.AddSingleGroupStage( - dsp.GatewayID(), - execinfrapb.ProcessorCoreUnion{ChangeFrontier: &changeFrontierSpec}, - execinfrapb.PostProcessSpec{}, - ChangefeedResultTypes, - ) - - p.PlanToStreamColMap = []int{1, 2, 3} - dsp.FinalizePlan(planCtx, p) - - resultRows := makeChangefeedResultWriter(resultsCh) - recv := sql.MakeDistSQLReceiver( - ctx, - resultRows, - tree.Rows, - execCtx.ExecCfg().RangeDescriptorCache, - noTxn, - nil, /* clockUpdater */ - evalCtx.Tracing, - execCtx.ExecCfg().ContentionRegistry, - nil, /* testingPushCallback */ - ) - defer recv.Release() - - var finishedSetupFn func() - if details.SinkURI != `` { - // We abuse the job's results channel to make CREATE CHANGEFEED wait for - // this before returning to the user to ensure the setup went okay. Job - // resumption doesn't have the same hack, but at the moment ignores - // results and so is currently okay. Return nil instead of anything - // meaningful so that if we start doing anything with the results - // returned by resumed jobs, then it breaks instead of returning - // nonsense. - finishedSetupFn = func() { resultsCh <- tree.Datums(nil) } - } - - // Copy the evalCtx, as dsp.Run() might change it. - evalCtxCopy := *evalCtx - dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn)() - return resultRows.Err() -} - -// changefeedResultWriter implements the `sql.rowResultWriter` that sends -// the received rows back over the given channel. -type changefeedResultWriter struct { - rowsCh chan<- tree.Datums - rowsAffected int - err error -} - -func makeChangefeedResultWriter(rowsCh chan<- tree.Datums) *changefeedResultWriter { - return &changefeedResultWriter{rowsCh: rowsCh} -} - -func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) error { - // Copy the row because it's not guaranteed to exist after this function - // returns. - row = append(tree.Datums(nil), row...) - - select { - case <-ctx.Done(): - return ctx.Err() - case w.rowsCh <- row: - return nil - } -} -func (w *changefeedResultWriter) IncrementRowsAffected(ctx context.Context, n int) { - w.rowsAffected += n -} -func (w *changefeedResultWriter) SetError(err error) { - w.err = err -} -func (w *changefeedResultWriter) Err() error { - return w.err -} diff --git a/pkg/ccl/changefeedccl/changefeeddist/testing_knobs.go b/pkg/ccl/changefeedccl/changefeeddist/testing_knobs.go deleted file mode 100644 index e1787e677970..000000000000 --- a/pkg/ccl/changefeedccl/changefeeddist/testing_knobs.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package changefeeddist - -import "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - -// TestingKnobs are the testing knobs for changefeeddist. -type TestingKnobs struct { - // OnDistflowSpec is called when specs for distflow planning have been created - OnDistflowSpec func(aggregatorSpecs []*execinfrapb.ChangeAggregatorSpec, frontierSpec *execinfrapb.ChangeFrontierSpec) -} - -// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. -func (*TestingKnobs) ModuleTestingKnobs() {} diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index 94221147e5a8..67247905ab9a 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -11,10 +11,14 @@ package changefeedccl import ( "context" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -30,14 +34,16 @@ type eventContext struct { } type kvEventToRowConsumer struct { - frontier *span.Frontier - encoder Encoder - scratch bufalloc.ByteAllocator - sink Sink - cursor hlc.Timestamp - knobs TestingKnobs - decoder cdcevent.Decoder - details ChangefeedConfig + frontier *span.Frontier + encoder Encoder + scratch bufalloc.ByteAllocator + sink Sink + cursor hlc.Timestamp + knobs TestingKnobs + decoder cdcevent.Decoder + details ChangefeedConfig + evaluator *cdceval.Evaluator + safeExpr string topicDescriptorCache map[TopicIdentifier]TopicDescriptor topicNamer *TopicNamer @@ -46,19 +52,37 @@ type kvEventToRowConsumer struct { func newKVEventToRowConsumer( ctx context.Context, cfg *execinfra.ServerConfig, + evalCtx *eval.Context, frontier *span.Frontier, cursor hlc.Timestamp, sink Sink, encoder Encoder, details ChangefeedConfig, + expr execinfrapb.Expression, knobs TestingKnobs, topicNamer *TopicNamer, ) (*kvEventToRowConsumer, error) { includeVirtual := details.Opts.IncludeVirtual() decoder, err := cdcevent.NewEventDecoder(ctx, cfg, details.Targets, includeVirtual) + if err != nil { return nil, err } + + var evaluator *cdceval.Evaluator + var safeExpr string + if expr.Expr != "" { + expr, err := cdceval.ParseChangefeedExpression(expr.Expr) + if err != nil { + return nil, err + } + safeExpr = tree.AsString(expr) + evaluator, err = cdceval.NewEvaluator(evalCtx, expr) + if err != nil { + return nil, err + } + } + return &kvEventToRowConsumer{ frontier: frontier, encoder: encoder, @@ -69,6 +93,8 @@ func newKVEventToRowConsumer( knobs: knobs, topicDescriptorCache: make(map[TopicIdentifier]TopicDescriptor), topicNamer: topicNamer, + evaluator: evaluator, + safeExpr: safeExpr, }, nil } @@ -133,6 +159,26 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even return err } + if c.evaluator != nil { + matches, err := c.evaluator.MatchesFilter(ctx, updatedRow, mvccTimestamp, prevRow) + if err != nil { + return errors.Wrapf(err, "while matching filter: %s", c.safeExpr) + } + if !matches { + // TODO(yevgeniy): Add metrics + return nil + } + projection, err := c.evaluator.Projection(ctx, updatedRow, mvccTimestamp, prevRow) + if err != nil { + return errors.Wrapf(err, "while evaluating projection: %s", c.safeExpr) + } + updatedRow = projection + + // Clear out prevRow. Projection can already emit previous row; thus + // it would be superfluous to also encode prevRow. + prevRow = cdcevent.Row{} + } + topic, err := c.topicForEvent(updatedRow.Metadata) if err != nil { return err @@ -169,6 +215,8 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even return err } c.scratch, keyCopy = c.scratch.Copy(encodedKey, 0 /* extraCap */) + // TODO(yevgeniy): Some refactoring is needed in the encoder: namely, prevRow + // might not be available at all when working with changefeed expressions. encodedValue, err := c.encoder.EncodeValue(ctx, evCtx, updatedRow, prevRow) if err != nil { return err diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 6bb055244668..940de37470ba 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -175,12 +175,7 @@ func (c *sinklessFeed) start() error { return err } - // The syntax for a sinkless changefeed is `EXPERIMENTAL CHANGEFEED FOR ...` - // but it's convenient to accept the `CREATE CHANGEFEED` syntax from the - // test, so we can keep the current abstraction of running each test over - // both types. This bit turns what we received into the real sinkless - // syntax. - create := strings.Replace(c.create, `CREATE CHANGEFEED`, `EXPERIMENTAL CHANGEFEED`, 1) + create := c.create if !c.latestResolved.IsEmpty() { // NB: The TODO in Next means c.latestResolved is currently never set for // non-json feeds. @@ -584,6 +579,7 @@ type enterpriseFeedFactory struct { } func (e enterpriseFeedFactory) startFeedJob(f *jobFeed, create string, args ...interface{}) error { + log.Infof(context.Background(), "Starting feed job: %q", create) e.di.prepareJob(f) if err := e.db.QueryRow(create, args...).Scan(&f.jobID); err != nil { return err diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index 87af9d8cc29e..a74ba98abf4b 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -11,9 +11,9 @@ package changefeedccl import ( "context" - "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeeddist" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/mon" ) @@ -34,13 +34,14 @@ type TestingKnobs struct { // be skipped. ShouldSkipResolved func(resolved *jobspb.ResolvedSpan) bool // FeedKnobs are kvfeed testing knobs. - FeedKnobs kvfeed.TestingKnobs - DistflowKnobs changefeeddist.TestingKnobs + FeedKnobs kvfeed.TestingKnobs // NullSinkIsExternalIOAccounted controls whether we record // tenant usage for the null sink. By default the null sink is // not accounted but it is useful to treat it as accounted in // tests. NullSinkIsExternalIOAccounted bool + // OnDistflowSpec is called when specs for distflow planning have been created + OnDistflowSpec func(aggregatorSpecs []*execinfrapb.ChangeAggregatorSpec, frontierSpec *execinfrapb.ChangeFrontierSpec) } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 308c561f49a8..9fa2513ae98b 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -382,6 +382,9 @@ const ( AddSSTableTombstones // SystemPrivilegesTable adds system.privileges table. SystemPrivilegesTable + // EnablePredicateProjectionChangefeed indicates that changefeeds support + // predicates and projections. + EnablePredicateProjectionChangefeed // ************************************************* // Step (1): Add new versions here. @@ -679,6 +682,10 @@ var versionsSingleton = keyedVersions{ Key: SystemPrivilegesTable, Version: roachpb.Version{Major: 22, Minor: 1, Internal: 22}, }, + { + Key: EnablePredicateProjectionChangefeed, + Version: roachpb.Version{Major: 22, Minor: 1, Internal: 24}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 16f8fad19ff0..aac4fecd241b 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -73,11 +73,12 @@ func _() { _ = x[SampledStmtDiagReqs-62] _ = x[AddSSTableTombstones-63] _ = x[SystemPrivilegesTable-64] + _ = x[EnablePredicateProjectionChangefeed-65] } -const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersTenantSettingsTableEnablePebbleFormatVersionBlockPropertiesDisableSystemConfigGossipTriggerMVCCIndexBackfillerEnableLeaseHolderRemovalBackupResolutionInJobLooselyCoupledRaftLogTruncationChangefeedIdlenessBackupDoesNotOverwriteLatestAndCheckpointEnableDeclarativeSchemaChangerRowLevelTTLPebbleFormatSplitUserKeysMarkedIncrementalBackupSubdirDateStyleIntervalStyleCastRewriteEnableNewStoreRebalancerClusterLocksVirtualTableAutoStatsTableSettingsForecastStatsSuperRegionsEnableNewChangefeedOptionsSpanCountTablePreSeedSpanCountTableSeedSpanCountTableV22_1Start22_2LocalTimestampsEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTable" +const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersTenantSettingsTableEnablePebbleFormatVersionBlockPropertiesDisableSystemConfigGossipTriggerMVCCIndexBackfillerEnableLeaseHolderRemovalBackupResolutionInJobLooselyCoupledRaftLogTruncationChangefeedIdlenessBackupDoesNotOverwriteLatestAndCheckpointEnableDeclarativeSchemaChangerRowLevelTTLPebbleFormatSplitUserKeysMarkedIncrementalBackupSubdirDateStyleIntervalStyleCastRewriteEnableNewStoreRebalancerClusterLocksVirtualTableAutoStatsTableSettingsForecastStatsSuperRegionsEnableNewChangefeedOptionsSpanCountTablePreSeedSpanCountTableSeedSpanCountTableV22_1Start22_2LocalTimestampsEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeed" -var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 166, 207, 233, 252, 286, 298, 329, 353, 374, 402, 432, 460, 481, 494, 513, 547, 585, 619, 651, 687, 719, 755, 797, 816, 856, 888, 907, 931, 952, 983, 1001, 1042, 1072, 1083, 1114, 1137, 1170, 1194, 1218, 1240, 1253, 1265, 1291, 1305, 1326, 1344, 1349, 1358, 1373, 1407, 1441, 1463, 1483, 1502, 1535, 1554, 1574, 1595} +var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 166, 207, 233, 252, 286, 298, 329, 353, 374, 402, 432, 460, 481, 494, 513, 547, 585, 619, 651, 687, 719, 755, 797, 816, 856, 888, 907, 931, 952, 983, 1001, 1042, 1072, 1083, 1114, 1137, 1170, 1194, 1218, 1240, 1253, 1265, 1291, 1305, 1326, 1344, 1349, 1358, 1373, 1407, 1441, 1463, 1483, 1502, 1535, 1554, 1574, 1595, 1630} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 3e5864b1321f..9732548c476f 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -809,8 +809,10 @@ message ChangefeedDetails { util.hlc.Timestamp end_time = 9 [(gogoproto.nullable) = false]; repeated ChangefeedTargetSpecification target_specifications = 8 [(gogoproto.nullable) = false]; + string select = 10; reserved 1, 2, 5; reserved "targets"; + // NEXT ID: 11 } message ResolvedSpan { diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 45b334ae9e4d..66bc41a7cfa3 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -236,6 +236,12 @@ func (r *Registry) SetSessionBoundInternalExecutorFactory( r.sessionBoundInternalExecutorFactory = factory } +// NewSpanConstrainer returns an instance of sql.SpanConstrainer as an interface{}, +// and a cleanup function. +func (r *Registry) NewSpanConstrainer(user username.SQLUsername) (interface{}, func()) { + return r.execCtx("constrain-spans", user) +} + // MetricsStruct returns the metrics for production monitoring of each job type. // They're all stored as the `metric.Struct` interface because of dependency // cycles. diff --git a/pkg/sql/execinfrapb/processors_changefeeds.proto b/pkg/sql/execinfrapb/processors_changefeeds.proto index 849b94c4e93c..068a05e93c08 100644 --- a/pkg/sql/execinfrapb/processors_changefeeds.proto +++ b/pkg/sql/execinfrapb/processors_changefeeds.proto @@ -21,6 +21,7 @@ option go_package = "execinfrapb"; import "jobs/jobspb/jobs.proto"; import "roachpb/data.proto"; +import "sql/execinfrapb/data.proto"; import "util/hlc/timestamp.proto"; import "gogoproto/gogo.proto"; @@ -56,6 +57,8 @@ message ChangeAggregatorSpec { (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/jobs/jobspb.JobID" ]; + // select is the "select clause" for predicate changefeed. + optional Expression select = 6 [(gogoproto.nullable) = false]; } // ChangeFrontierSpec is the specification for a processor that receives diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 9ee0b0a7ff87..fdda81278a7d 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -1385,6 +1385,7 @@ func (u *sqlSymUnion) asTenantClause() tree.TenantID { %type <*tree.UpdateExpr> single_set_clause %type as_of_clause opt_as_of_clause %type opt_changefeed_sink +%type opt_changefeed_family %type explain_option_name %type <[]string> explain_option_list opt_enum_val_list enum_val_list @@ -1462,6 +1463,7 @@ func (u *sqlSymUnion) asTenantClause() tree.TenantID { %type targets targets_roles target_types %type changefeed_targets %type changefeed_target +%type changefeed_target_expr %type <*tree.TargetList> opt_on_targets_roles opt_backup_targets %type for_grantee_clause %type privileges @@ -3972,6 +3974,25 @@ create_changefeed_stmt: Options: $6.kvOptions(), } } +| CREATE CHANGEFEED /*$3=*/ opt_changefeed_sink /*$4=*/ opt_with_options + AS SELECT /*$7=*/target_list FROM /*$9=*/changefeed_target_expr /*$10=*/opt_where_clause + { + target, err := tree.ChangefeedTargetFromTableExpr($9.tblExpr()) + if err != nil { + return setErr(sqllex, err) + } + + $$.val = &tree.CreateChangefeed{ + SinkURI: $3.expr(), + Options: $4.kvOptions(), + Targets: tree.ChangefeedTargets{target}, + Select: &tree.SelectClause{ + Exprs: $7.selExprs(), + From: tree.From{Tables: tree.TableExprs{$9.tblExpr()}}, + Where: tree.NewWhere(tree.AstWhere, $10.expr()), + }, + } + } | EXPERIMENTAL CHANGEFEED FOR changefeed_targets opt_with_options { /* SKIP DOC */ @@ -3992,37 +4013,32 @@ changefeed_targets: } changefeed_target: - TABLE table_name - { - $$.val = tree.ChangefeedTarget{ - TableName: $2.unresolvedObjectName().ToUnresolvedName(), - } - } -| table_name + opt_table_prefix table_name opt_changefeed_family { $$.val = tree.ChangefeedTarget{ - TableName: $1.unresolvedObjectName().ToUnresolvedName(), - } + TableName: $2.unresolvedObjectName().ToUnresolvedName(), + FamilyName: tree.Name($3), + } } -| - TABLE table_name FAMILY family_name + +changefeed_target_expr: insert_target + +opt_table_prefix: + TABLE + {} +| /* EMPTY */ + {} + +opt_changefeed_family: + FAMILY family_name { - $$.val = tree.ChangefeedTarget{ - TableName: $2.unresolvedObjectName().ToUnresolvedName(), - FamilyName: tree.Name($4), - } + $$ = $2 } -| -table_name FAMILY family_name +| /* EMPTY */ { - $$.val = tree.ChangefeedTarget{ - TableName: $1.unresolvedObjectName().ToUnresolvedName(), - FamilyName: tree.Name($3), - } + $$ = "" } - - opt_changefeed_sink: INTO string_or_placeholder { diff --git a/pkg/sql/parser/testdata/changefeed b/pkg/sql/parser/testdata/changefeed index 3e69489599a7..96e3ccec16ba 100644 --- a/pkg/sql/parser/testdata/changefeed +++ b/pkg/sql/parser/testdata/changefeed @@ -66,3 +66,51 @@ CREATE CHANGEFEED FOR TABLE foo INTO 'sink' WITH bar = 'baz' CREATE CHANGEFEED FOR TABLE (foo) INTO ('sink') WITH bar = ('baz') -- fully parenthesized CREATE CHANGEFEED FOR TABLE foo INTO '_' WITH bar = '_' -- literals removed CREATE CHANGEFEED FOR TABLE _ INTO 'sink' WITH _ = 'baz' -- identifiers removed + +parse +CREATE CHANGEFEED AS SELECT * FROM foo +---- +CREATE CHANGEFEED AS SELECT * FROM foo +CREATE CHANGEFEED AS SELECT (*) FROM foo -- fully parenthesized +CREATE CHANGEFEED AS SELECT * FROM foo -- literals removed +CREATE CHANGEFEED AS SELECT * FROM _ -- identifiers removed + +parse +CREATE CHANGEFEED AS SELECT * FROM foo AS bar +---- +CREATE CHANGEFEED AS SELECT * FROM foo AS bar +CREATE CHANGEFEED AS SELECT (*) FROM foo AS bar -- fully parenthesized +CREATE CHANGEFEED AS SELECT * FROM foo AS bar -- literals removed +CREATE CHANGEFEED AS SELECT * FROM _ AS _ -- identifiers removed + +parse +CREATE CHANGEFEED AS SELECT a, b, c FROM foo +---- +CREATE CHANGEFEED AS SELECT a, b, c FROM foo +CREATE CHANGEFEED AS SELECT (a), (b), (c) FROM foo -- fully parenthesized +CREATE CHANGEFEED AS SELECT a, b, c FROM foo -- literals removed +CREATE CHANGEFEED AS SELECT _, _, _ FROM _ -- identifiers removed + +parse +CREATE CHANGEFEED AS SELECT * FROM foo WHERE a > b +---- +CREATE CHANGEFEED AS SELECT * FROM foo WHERE a > b -- normalized! +CREATE CHANGEFEED AS SELECT (*) FROM foo WHERE ((a) > (b)) -- fully parenthesized +CREATE CHANGEFEED AS SELECT * FROM foo WHERE a > b -- literals removed +CREATE CHANGEFEED AS SELECT * FROM _ WHERE _ > _ -- identifiers removed + +parse +CREATE CHANGEFEED WITH opt='val' AS SELECT * FROM foo WHERE a > b +---- +CREATE CHANGEFEED WITH opt = 'val' AS SELECT * FROM foo WHERE a > b -- normalized! +CREATE CHANGEFEED WITH opt = ('val') AS SELECT (*) FROM foo WHERE ((a) > (b)) -- fully parenthesized +CREATE CHANGEFEED WITH opt = '_' AS SELECT * FROM foo WHERE a > b -- literals removed +CREATE CHANGEFEED WITH _ = 'val' AS SELECT * FROM _ WHERE _ > _ -- identifiers removed + +parse +CREATE CHANGEFEED INTO 'null://' WITH opt='val' AS SELECT * FROM foo WHERE a > b +---- +CREATE CHANGEFEED INTO 'null://' WITH opt = 'val' AS SELECT * FROM foo WHERE a > b -- normalized! +CREATE CHANGEFEED INTO ('null://') WITH opt = ('val') AS SELECT (*) FROM foo WHERE ((a) > (b)) -- fully parenthesized +CREATE CHANGEFEED INTO '_' WITH opt = '_' AS SELECT * FROM foo WHERE a > b -- literals removed +CREATE CHANGEFEED INTO 'null://' WITH _ = 'val' AS SELECT * FROM _ WHERE _ > _ -- identifiers removed diff --git a/pkg/sql/sem/tree/changefeed.go b/pkg/sql/sem/tree/changefeed.go index 9d1e174b26c0..81038eab6b51 100644 --- a/pkg/sql/sem/tree/changefeed.go +++ b/pkg/sql/sem/tree/changefeed.go @@ -10,17 +10,28 @@ package tree +import ( + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" +) + // CreateChangefeed represents a CREATE CHANGEFEED statement. type CreateChangefeed struct { Targets ChangefeedTargets SinkURI Expr Options KVOptions + Select *SelectClause } var _ Statement = &CreateChangefeed{} // Format implements the NodeFormatter interface. func (node *CreateChangefeed) Format(ctx *FmtCtx) { + if node.Select != nil { + node.formatWithPredicates(ctx) + return + } + if node.SinkURI != nil { ctx.WriteString("CREATE ") } else { @@ -28,6 +39,7 @@ func (node *CreateChangefeed) Format(ctx *FmtCtx) { // prefix. They're also still EXPERIMENTAL, so they get marked as such. ctx.WriteString("EXPERIMENTAL ") } + ctx.WriteString("CHANGEFEED FOR ") ctx.FormatNode(&node.Targets) if node.SinkURI != nil { @@ -40,6 +52,22 @@ func (node *CreateChangefeed) Format(ctx *FmtCtx) { } } +// formatWithPredicates is a helper to format node when creating +// changefeed with predicates. +func (node *CreateChangefeed) formatWithPredicates(ctx *FmtCtx) { + ctx.WriteString("CREATE CHANGEFEED") + if node.SinkURI != nil { + ctx.WriteString(" INTO ") + ctx.FormatNode(node.SinkURI) + } + if node.Options != nil { + ctx.WriteString(" WITH ") + ctx.FormatNode(&node.Options) + } + ctx.WriteString(" AS ") + node.Select.Format(ctx) +} + // ChangefeedTarget represents a database object to be watched by a changefeed. type ChangefeedTarget struct { TableName TablePattern @@ -68,3 +96,18 @@ func (cts *ChangefeedTargets) Format(ctx *FmtCtx) { ctx.FormatNode(&ct) } } + +// ChangefeedTargetFromTableExpr returns ChangefeedTarget for the +// specified table expression. +func ChangefeedTargetFromTableExpr(e TableExpr) (ChangefeedTarget, error) { + switch t := e.(type) { + case TablePattern: + return ChangefeedTarget{TableName: t}, nil + case *AliasedTableExpr: + if tn, ok := t.Expr.(*TableName); ok { + return ChangefeedTarget{TableName: tn}, nil + } + } + return ChangefeedTarget{}, pgerror.Newf( + pgcode.InvalidName, "unsupported changefeed target type") +}