Skip to content


Merge pull request #116967 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history

release-23.2: changefeedccl: fix custom key column when used with CDC queries
  • Loading branch information
wenyihu6 committed Jan 3, 2024
2 parents d55f0f8 + 2d4f2a1 commit 77c8ae6
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdcevent/event.go
Expand Up @@ -116,7 +116,7 @@ func (r Row) ForEachUDTColumn() Iterator {
func (r Row) DatumNamed(n string) (Iterator, error) {
idx, ok := r.EventDescriptor.colsByName[n]
if !ok {
return nil, errors.Errorf("No column with name %s in this row", n)
return nil, errors.AssertionFailedf("No column with name %s in this row", n)
return iter{r: r, cols: []int{idx}}, nil
Expand Down
8 changes: 7 additions & 1 deletion pkg/ccl/changefeedccl/cdcevent/projection.go
Expand Up @@ -30,7 +30,9 @@ type Projection Row
// MakeProjection returns Projection builder given underlying descriptor.
func MakeProjection(d *EventDescriptor) Projection {
p := Projection{
EventDescriptor: &EventDescriptor{Metadata: d.Metadata},
EventDescriptor: &EventDescriptor{
Metadata: d.Metadata,

// Add all primary key columns.
Expand All @@ -54,6 +56,10 @@ func (p *Projection) addColumn(name string, typ *types.T, sqlString string, colI

p.datums = append(p.datums, rowenc.EncDatum{})
p.allCols = append(p.allCols, ord)
if p.colsByName == nil {
p.colsByName = make(map[string]int)
p.colsByName[name] = ord
*colIdxSlice = append(*colIdxSlice, ord)
if typ.UserDefined() {
p.udtCols = append(p.udtCols, ord)
Expand Down
124 changes: 124 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Expand Up @@ -3349,6 +3349,130 @@ func TestChangefeedCustomKey(t *testing.T) {
cdcTest(t, testFn, feedTestForceSink("kafka"))

// Reproduce issue for #114196. This test verifies that changefeed with custom
// key column works with CDC queries correctly.
func TestChangefeedCustomKeyColumnWithCDCQuery(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tests := map[string]struct {
shouldSkip bool
createTableStmt string
createChangeFeedStmt string
stmts []string
payloadsAfterStmts []string
`select_star`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog", "c": "cat"}`, `foo: ["dog1"]->{"a": 1, "b": "dog1", "c": "cat1"}`},
`select_with_filter`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE b='dog'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog", "c": "cat"}`},
`select_multiple_columns`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='c', unordered AS SELECT b, c FROM foo WHERE b='dog'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["cat"]->{"b": "dog", "c": "cat"}`},
`custom_key_with_created_column`: {
shouldSkip: true,
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='double_b', unordered AS SELECT concat(b, c) AS double_b FROM foo`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["cat"]->{"c": "cat"}`},
`select_star_with_builtin_funcs`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='c', unordered AS SELECT *, concat(b, c) FROM foo`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["cat"]->{"a": 0, "b": "dog", "c": "cat", "concat": "dogcat"}`, `foo: ["cat1"]->{"a": 1, "b": "dog1", "c": "cat1", "concat": "dog1cat1"}`},
`select_stored_column`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d STRING AS (concat(b, c)) STORED)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='d', unordered AS SELECT * FROM foo WHERE d='dog1cat1'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["dog1cat1"]->{"a": 1, "b": "dog1", "c": "cat1", "d": "dog1cat1"}`},
`select_virtual_column`: {
shouldSkip: true,
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d STRING AS (concat(b, c)) VIRTUAL)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='d', unordered AS SELECT d FROM foo WHERE d='dog1cat1'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["dog1cat1"]->{"a": 1, "b": "dog1", "c": "cat1", "d": "dog1cat1"}`},
`select_with_filter_IN`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE b IN ('dog', 'dog1')`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `INSERT INTO foo VALUES (1, 'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog"}`, `foo: ["dog1"]->{"a": 1, "b": "dog1"}`},
`select_with_delete`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT *, event_op() = 'delete' AS deleted FROM foo WHERE 'hello' != 'world'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `INSERT INTO foo VALUES (1, 'dog1')`, `DELETE FROM foo WHERE a=1`},
payloadsAfterStmts: []string{`foo: [null]->{"a": 1, "b": null, "deleted": true}`, `foo: ["dog"]->{"a": 0, "b": "dog", "deleted": false}`, `foo: ["dog1"]->{"a": 1, "b": "dog1", "deleted": false}`},
`select_with_filter_delete`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE event_op() = 'delete'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `INSERT INTO foo VALUES (1, 'dog1')`, `DELETE FROM foo WHERE a=1`},
payloadsAfterStmts: []string{`foo: [null]->{"a": 1, "b": null}`},
`select_with_cdc_prev`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT *, (cdc_prev) FROM foo`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `UPSERT INTO foo VALUES (0,'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog", "cdc_prev": null}`, `foo: ["dog1"]->{"a": 0, "b": "dog1", "cdc_prev": {"a": 0, "b": "dog"}}`},
`select_with_filter_cdc_prev`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT b FROM foo WHERE (cdc_prev).a = 0`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `UPSERT INTO foo VALUES (0, 'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog1"]->{"b": "dog1"}`},
`select_with_hidden_column`: {
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT b FROM foo`,
stmts: []string{`INSERT INTO foo(a,b) VALUES (0, 'dog')`, `INSERT INTO foo(a,b) VALUES (1, 'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"b": "dog"}`, `foo: ["dog1"]->{"b": "dog1"}`},
`select_with_cdc_prev_column`: {
shouldSkip: true,
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='cdc_prev.a', unordered AS SELECT * FROM foo`,
stmts: []string{`INSERT INTO foo(a,b) VALUES (0, 'dog')`, `INSERT INTO foo(a,b) VALUES (1, 'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"b": "dog"}`, `foo: ["dog1"]->{"b": "dog1"}`},

for name, test := range tests {
t.Run(name, func(t *testing.T) {
if test.shouldSkip {
t.Logf("skipping this test because %s is currently not supported; "+
"see #115267 for more details", name)
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, test.createTableStmt)
foo := feed(t, f, test.createChangeFeedStmt)
defer closeFeed(t, foo)
for _, stmt := range test.stmts {
sqlDB.Exec(t, stmt)
assertPayloads(t, foo, test.payloadsAfterStmts)
cdcTest(t, testFn, feedTestForceSink("kafka"))

func TestChangefeedCustomKeyAvro(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit 77c8ae6

Please sign in to comment.