Skip to content

Commit

Permalink
changefeedccl: Projections and Filters in CDC.
Browse files Browse the repository at this point in the history
Add a variant of CHANGEFEED statement that allows specification
of predicates and projections.

```
CREATE CHANGEFEED [INTO 'sink'] [WITH opt=val, ...]
AS SELECT .... FROM t WHERE ...
```

This changefeed variant can target at most 1 table (and 1 column
family) at a time. The expressions used as the projections and
filters can be almost any supported expression with some restrictions:
  * Volatile functions not allowed.
  * Subselects not allowed.
  * Aggregate and window functions (i.e. functions operating over many
    rows) not allowed.
  * Some stable functions, notably functions which return MVCC
    timestamp, are overridden to return MVCC timestamp of the event.

In addition, some CDC specific functions are provided:
  * cdc_is_delete: returns true if the event is a deletion event.
  * cdc_prev: returns JSON representation of the previous row state.
  * cdc_updated_timestamp: returns event update timestamp (usually MVCC
    timestamp, but can be different if e.g. undergoing schema changes)
Additional CDC specific functions will be added in the follow on PRs.

Few examples:

* Emit all but the deletion events:
```
CREATE CHANGEFEED INTO 'kafka://'
AS SELECT * FROM table
WHERE NOT cdc_is_delete()
```

* Emit all events that modified `important_col` column:
```
CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT *, cdc_prev() AS previous
FROM important_table
WHERE important_col != cdc_prev()->'important_col'
```

* Emit few colums, as well as computed expresions:
```
CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT warehouseID, (totalItems - orderedItems) as itemsAvailable
FROM warehouse
WHERE region='US/east';
```

When filter expression is specified, changefeed will now consult
optimizer so that the set of spans scanned by changefeed can be
restricted based on the predicate.

For example, given the following table and a changefeed:
```
CREATE TABLE warehouse (
  region STRING,
  warehouseID int,
  ....
  PRIMARY KEY (region, warehouseID)
);

CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT *
FROM warehouse
WHERE region='US/east';
```

The create changefeed will only scan table spans that contain `US/east`
region (and ignore all other table spans).

Release Notes (enterprise):
CHANGEFEED statement now supports general expressions: predicates and projections.
Projections allow customers to emit only the data that they care about,
including computed columns, while predicates (i.e. filters) allow them
to restrict the data that's emitted only to those events that match the
filter.

```
CREATE CHANGEFEED INTO 'kafka://' AS SELECT * FROM t WHERE NOT cdc_is_delete()
```
  • Loading branch information
Yevgeniy Miretskiy committed Jun 26, 2022
1 parent 21dff06 commit afb95b1
Show file tree
Hide file tree
Showing 43 changed files with 1,809 additions and 668 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -281,4 +281,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 22.1-22 set the active cluster version in the format '<major>.<minor>'
version version 22.1-24 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-22</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-24</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
5 changes: 5 additions & 0 deletions docs/generated/sql/bnf/create_changefeed_stmt.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@ create_changefeed_stmt ::=
| 'CREATE' 'CHANGEFEED' 'FOR' changefeed_target ( ( ',' changefeed_target ) )* 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
| 'CREATE' 'CHANGEFEED' 'FOR' changefeed_target ( ( ',' changefeed_target ) )* 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )*
| 'CREATE' 'CHANGEFEED' 'FOR' changefeed_target ( ( ',' changefeed_target ) )* 'INTO' sink
| 'CREATE' 'CHANGEFEED' 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )* 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause
| 'CREATE' 'CHANGEFEED' 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )* 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause
| 'CREATE' 'CHANGEFEED' 'INTO' sink 'WITH' option '=' value ( ( ',' ( option '=' value | option | option '=' value | option ) ) )* 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause
| 'CREATE' 'CHANGEFEED' 'INTO' sink 'WITH' option ( ( ',' ( option '=' value | option | option '=' value | option ) ) )* 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause
| 'CREATE' 'CHANGEFEED' 'INTO' sink 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause
45 changes: 27 additions & 18 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ create_schedule_for_backup_stmt ::=

create_changefeed_stmt ::=
'CREATE' 'CHANGEFEED' 'FOR' changefeed_targets opt_changefeed_sink opt_with_options
| 'CREATE' 'CHANGEFEED' opt_changefeed_sink opt_with_options 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause

create_extension_stmt ::=
'CREATE' 'EXTENSION' 'IF' 'NOT' 'EXISTS' name
Expand Down Expand Up @@ -1644,6 +1645,12 @@ changefeed_targets ::=
opt_changefeed_sink ::=
'INTO' string_or_placeholder

target_list ::=
( target_elem ) ( ( ',' target_elem ) )*

changefeed_target_expr ::=
insert_target

with_clause ::=
'WITH' cte_list
| 'WITH' 'RECURSIVE' cte_list
Expand All @@ -1660,9 +1667,6 @@ limit_clause ::=
| 'FETCH' first_or_next select_fetch_first_value row_or_rows 'ONLY'
| 'FETCH' first_or_next row_or_rows 'ONLY'

target_list ::=
( target_elem ) ( ( ',' target_elem ) )*

drop_database_stmt ::=
'DROP' 'DATABASE' database_name opt_drop_behavior
| 'DROP' 'DATABASE' 'IF' 'EXISTS' database_name opt_drop_behavior
Expand Down Expand Up @@ -2263,10 +2267,13 @@ opt_sequence_option_list ::=
|

changefeed_target ::=
'TABLE' table_name
| table_name
| 'TABLE' table_name 'FAMILY' family_name
| table_name 'FAMILY' family_name
opt_table_prefix table_name opt_changefeed_family

target_elem ::=
a_expr 'AS' target_name
| a_expr 'identifier'
| a_expr
| '*'

cte_list ::=
( common_table_expr ) ( ( ',' common_table_expr ) )*
Expand Down Expand Up @@ -2301,12 +2308,6 @@ row_or_rows ::=
'ROW'
| 'ROWS'

target_elem ::=
a_expr 'AS' target_name
| a_expr 'identifier'
| a_expr
| '*'

table_index_name_list ::=
( table_index_name ) ( ( ',' table_index_name ) )*

Expand Down Expand Up @@ -2787,8 +2788,16 @@ create_as_table_defs ::=
enum_val_list ::=
( 'SCONST' ) ( ( ',' 'SCONST' ) )*

family_name ::=
name
opt_table_prefix ::=
'TABLE'
|

opt_changefeed_family ::=
'FAMILY' family_name
|

target_name ::=
unrestricted_name

common_table_expr ::=
table_alias_name opt_column_list 'AS' '(' preparable_stmt ')'
Expand All @@ -2806,9 +2815,6 @@ only_signed_fconst ::=
'+' 'FCONST'
| '-' 'FCONST'

target_name ::=
unrestricted_name

scrub_option ::=
'INDEX' 'ALL'
| 'INDEX' '(' name_list ')'
Expand Down Expand Up @@ -3151,6 +3157,9 @@ family_def ::=
create_as_constraint_def ::=
create_as_constraint_elem

family_name ::=
name

materialize_clause ::=
'MATERIALIZED'
| 'NOT' 'MATERIALIZED'
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ go_library(
deps = [
"//pkg/base",
"//pkg/ccl/backupccl/backupresolver",
"//pkg/ccl/changefeedccl/cdceval",
"//pkg/ccl/changefeedccl/cdcevent",
"//pkg/ccl/changefeedccl/cdcutils",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/changefeedccl/changefeeddist",
"//pkg/ccl/changefeedccl/changefeedvalidators",
"//pkg/ccl/changefeedccl/kvevent",
"//pkg/ccl/changefeedccl/kvfeed",
Expand Down Expand Up @@ -77,6 +77,7 @@ go_library(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/pgwire/pgnotice",
"//pkg/sql/physicalplan",
"//pkg/sql/privilege",
"//pkg/sql/roleoption",
"//pkg/sql/rowenc",
Expand Down Expand Up @@ -157,10 +158,10 @@ go_test(
deps = [
"//pkg/base",
"//pkg/blobs",
"//pkg/ccl/changefeedccl/cdceval",
"//pkg/ccl/changefeedccl/cdcevent",
"//pkg/ccl/changefeedccl/cdctest",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/changefeedccl/changefeeddist",
"//pkg/ccl/changefeedccl/kvevent",
"//pkg/ccl/changefeedccl/kvfeed",
"//pkg/ccl/changefeedccl/schemafeed",
Expand Down Expand Up @@ -193,6 +194,7 @@ go_test(
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigptsreader",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/colinfo",
Expand All @@ -215,6 +217,7 @@ go_test(
"//pkg/sql/sem/tree",
"//pkg/sql/sem/volatility",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/sqllivenesstestutils",
"//pkg/sql/tests",
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -1280,6 +1281,13 @@ func TestAlterChangefeedUpdateFilter(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Skip this test for now. It used to test alter changefeed with
// now deprecated and removed 'primary_key_filter' option.
// Since predicates and projections are no longer a "string" option,
// alter statement implementation (and grammar) needs to be updated, and
// this test modified and re-enabled.
skip.WithIssue(t, 82491)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -247,8 +248,10 @@ func createBenchmarkChangefeed(
return nil, nil, err
}
serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig
eventConsumer, err := newKVEventToRowConsumer(ctx, &serverCfg, sf, initialHighWater,
sink, encoder, makeChangefeedConfigFromJobDetails(details), TestingKnobs{}, nil)
eventConsumer, err := newKVEventToRowConsumer(ctx, &serverCfg, nil, sf, initialHighWater,
sink, encoder, makeChangefeedConfigFromJobDetails(details),
execinfrapb.Expression{}, TestingKnobs{}, nil)

if err != nil {
return nil, nil, err
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/ccl/changefeedccl/cdceval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,27 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "cdceval",
srcs = [
"constraint.go",
"doc.go",
"expr_eval.go",
"functions.go",
"parse.go",
"validation.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/changefeedccl/cdcevent",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/roachpb",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/schemaexpr",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/builtins",
Expand All @@ -27,6 +38,7 @@ go_library(
"//pkg/util/json",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
],
)

Expand All @@ -36,20 +48,22 @@ go_test(
"expr_eval_test.go",
"functions_test.go",
"main_test.go",
"validation_test.go",
],
embed = [":cdceval"],
deps = [
"//pkg/base",
"//pkg/ccl/changefeedccl/cdcevent",
"//pkg/ccl/changefeedccl/cdctest",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/utilccl",
"//pkg/jobs/jobspb",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/distsql",
Expand All @@ -58,6 +72,7 @@ go_test(
"//pkg/sql/rowenc",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
Expand Down
108 changes: 108 additions & 0 deletions pkg/ccl/changefeedccl/cdceval/constraint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package cdceval

import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

// ConstrainPrimaryIndexSpanByFilter attempts to constrain table primary
// index span if changefeed expression (select clause) is specified.
// Returns possibly constrained spans, and a possibly modified (optimized)
// select clause.
func ConstrainPrimaryIndexSpanByFilter(
ctx context.Context,
execCtx sql.JobExecContext,
selectClause string,
descr catalog.TableDescriptor,
target jobspb.ChangefeedTargetSpecification,
includeVirtual bool,
) (_ []roachpb.Span, updatedSelect string, _ error) {
if selectClause == "" {
return nil, "", errors.AssertionFailedf("unexpected empty filter")
}
sc, err := ParseChangefeedExpression(selectClause)
if err != nil {
return nil, "", pgerror.Wrap(err, pgcode.InvalidParameterValue,
"could not parse changefeed expression")
}

ed, err := newEventDescriptorForTarget(descr, target, schemaTS(execCtx), includeVirtual)
if err != nil {
return nil, "", err
}

evalCtx := &execCtx.ExtendedEvalContext().Context
spans, remainingFilter, err := constrainSpansBySelectClause(
ctx, execCtx, evalCtx, execCtx.ExecCfg().Codec, sc, ed)
if err != nil {
return nil, "", err
}

if remainingFilter != nil {
// non-nil remainingFilter implies we had sc.Where clause.
if remainingFilter == tree.DBoolTrue {
sc.Where = nil
} else {
sc.Where.Expr = remainingFilter
}
}

return spans, AsStringUnredacted(sc), nil
}

// constrainSpansBySelectClause is a helper that attempts to constrain primary
// index spans by the filter in the select clause. Returns constrained spans,
// along with the remaining filter.
func constrainSpansBySelectClause(
ctx context.Context,
sc sql.SpanConstrainer,
evalCtx *eval.Context,
codec keys.SQLCodec,
selectClause *tree.SelectClause,
ed *cdcevent.EventDescriptor,
) ([]roachpb.Span, tree.Expr, error) {
// Predicate changefeed currently works on a single table only.
// Verify this assumption.
if len(selectClause.From.Tables) != 1 {
return nil, nil, errors.AssertionFailedf(
"expected 1 table expression, found %d", len(selectClause.From.Tables))
}

if selectClause.Where == nil {
// Nothing to constrain.
return []roachpb.Span{ed.TableDescriptor().PrimaryIndexSpan(codec)}, nil, nil
}

tableName := tableNameOrAlias(ed.TableName, selectClause.From.Tables[0])
semaCtx := newSemaCtx(ed)
return sc.ConstrainPrimaryIndexSpanByExpr(
ctx, sql.BestEffortConstrain, tableName, ed.TableDescriptor(),
evalCtx, semaCtx, selectClause.Where.Expr)
}

func schemaTS(execCtx sql.JobExecContext) hlc.Timestamp {
if execCtx.Txn() != nil {
return execCtx.Txn().ReadTimestamp()
}
return execCtx.ExecCfg().Clock.Now()
}
Loading

0 comments on commit afb95b1

Please sign in to comment.