Skip to content

Commit

Permalink
changefeedccl: fix custom key column when used with CDC queries
Browse files Browse the repository at this point in the history
Previously, CDC custom key column caused incorrect behaviour with CDC queries,
leading to a `retryable error: No column with name <name> in this row`. This was
because we forgot to add columns to `colsByName` map when generating projection
rows for CDC queries. This caused
[`DatumNamed`](https://github.com/cockroachdb/cockroach/blob/8d74d10e77f020dd0f5c98003f7976b63531de18/pkg/ccl/changefeedccl/cdcevent/event.go#L116-L122)
to return a column not found error, This patch addresses the issue by populating
the `colsByName` map during the addition of columns for CDC queries. Note that
the issue with supporting `ALTER CHANGEFEED` with CDC queries remains
unresolved.
Fixes: #114196
Release note (bug fix): CDC custom key column now functions with CDC queries
correctly. For example, `CREATE CHANGEFEED WITH key_column=..., unordered AS
SELECT * FROM table` now works correctly instead of retrying forever.
  • Loading branch information
wenyihu6 committed Nov 29, 2023
1 parent 60719a0 commit 0437796
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdcevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (r Row) ForEachUDTColumn() Iterator {
func (r Row) DatumNamed(n string) (Iterator, error) {
idx, ok := r.EventDescriptor.colsByName[n]
if !ok {
return nil, errors.Errorf("No column with name %s in this row", n)
return nil, errors.AssertionFailedf("No column with name %s in this row", n)
}
return iter{r: r, cols: []int{idx}}, nil
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/ccl/changefeedccl/cdcevent/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ type Projection Row
// MakeProjection returns Projection builder given underlying descriptor.
func MakeProjection(d *EventDescriptor) Projection {
p := Projection{
EventDescriptor: &EventDescriptor{Metadata: d.Metadata},
EventDescriptor: &EventDescriptor{
Metadata: d.Metadata,
},
}

// Add all primary key columns.
Expand All @@ -54,6 +56,10 @@ func (p *Projection) addColumn(name string, typ *types.T, sqlString string, colI

p.datums = append(p.datums, rowenc.EncDatum{})
p.allCols = append(p.allCols, ord)
if p.colsByName == nil {
p.colsByName = make(map[string]int)
}
p.colsByName[name] = ord
*colIdxSlice = append(*colIdxSlice, ord)
if typ.UserDefined() {
p.udtCols = append(p.udtCols, ord)
Expand Down
122 changes: 122 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3349,6 +3349,128 @@ func TestChangefeedCustomKey(t *testing.T) {
cdcTest(t, testFn, feedTestForceSink("kafka"))
}

// Reproduce issue for #114196. This test verifies that changefeed with custom
// key column works with CDC queries correctly.
func TestChangefeedCustomKeyColumnWithCDCQuery(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tests := map[string]struct {
skipWithIssue string
createTableStmt string
createChangeFeedStmt string
stmts []string
payloadsAfterStmts []string
}{
`select_star`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog", "c": "cat"}`, `foo: ["dog1"]->{"a": 1, "b": "dog1", "c": "cat1"}`},
},
`select_with_filter`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE b='dog'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog", "c": "cat"}`},
},
`select_multiple_columns`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='c', unordered AS SELECT b, c FROM foo WHERE b='dog'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["cat"]->{"b": "dog", "c": "cat"}`},
},
`custom_key_with_created_column`: {
skipWithIssue: "#114196",
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='double_b', unordered AS SELECT concat(b, c) AS double_b FROM foo`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["cat"]->{"c": "cat"}`},
},
`select_star_with_builtin_funcs`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='c', unordered AS SELECT *, concat(b, c) FROM foo`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["cat"]->{"a": 0, "b": "dog", "c": "cat", "concat": "dogcat"}`, `foo: ["cat1"]->{"a": 1, "b": "dog1", "c": "cat1", "concat": "dog1cat1"}`},
},
`select_stored_column`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d STRING AS (concat(b, c)) STORED)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='d', unordered AS SELECT * FROM foo WHERE d='dog1cat1'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["dog1cat1"]->{"a": 1, "b": "dog1", "c": "cat1", "d": "dog1cat1"}`},
},
`select_virtual_column`: {
skipWithIssue: "#114196",
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d STRING AS (concat(b, c)) VIRTUAL)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='d', unordered AS SELECT d FROM foo WHERE d='dog1cat1'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["dog1cat1"]->{"a": 1, "b": "dog1", "c": "cat1", "d": "dog1cat1"}`},
},
`select_with_filter_IN`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE b IN ('dog', 'dog1')`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `INSERT INTO foo VALUES (1, 'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog"}`, `foo: ["dog1"]->{"a": 1, "b": "dog1"}`},
},
`select_with_delete`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT *, event_op() = 'delete' AS deleted FROM foo WHERE 'hello' != 'world'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `INSERT INTO foo VALUES (1, 'dog1')`, `DELETE FROM foo WHERE a=1`},
payloadsAfterStmts: []string{`foo: [null]->{"a": 1, "b": null, "deleted": true}`, `foo: ["dog"]->{"a": 0, "b": "dog", "deleted": false}`, `foo: ["dog1"]->{"a": 1, "b": "dog1", "deleted": false}`},
},
`select_with_filter_delete`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE event_op() = 'delete'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `INSERT INTO foo VALUES (1, 'dog1')`, `DELETE FROM foo WHERE a=1`},
payloadsAfterStmts: []string{`foo: [null]->{"a": 1, "b": null}`},
},
`select_with_cdc_prev`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT *, (cdc_prev) FROM foo`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `UPSERT INTO foo VALUES (0,'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog", "cdc_prev": null}`, `foo: ["dog1"]->{"a": 0, "b": "dog1", "cdc_prev": {"a": 0, "b": "dog"}}`},
},
`select_with_filter_cdc_prev`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT b FROM foo WHERE (cdc_prev).a = 0`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `UPSERT INTO foo VALUES (0, 'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog1"]->{"b": "dog1"}`},
},
`select_with_hidden_column`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING NOT VISIBLE)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT b FROM foo`,
stmts: []string{`INSERT INTO foo(a,b) VALUES (0, 'dog')`, `INSERT INTO foo(a,b) VALUES (1, 'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"b": "dog"}`, `foo: ["dog1"]->{"b": "dog1"}`},
},
`select_with_cdc_prev_column`: {
skipWithIssue: "#114196",
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='cdc_prev.a', unordered AS SELECT * FROM foo`,
stmts: []string{`INSERT INTO foo(a,b) VALUES (0, 'dog')`, `INSERT INTO foo(a,b) VALUES (1, 'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"b": "dog"}`, `foo: ["dog1"]->{"b": "dog1"}`},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
if test.skipWithIssue != "" {
t.Skip(test.skipWithIssue)
}
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, test.createTableStmt)
foo := feed(t, f, test.createChangeFeedStmt)
defer closeFeed(t, foo)
for _, stmt := range test.stmts {
sqlDB.Exec(t, stmt)
}
assertPayloads(t, foo, test.payloadsAfterStmts)
}
cdcTest(t, testFn, feedTestForceSink("kafka"))
})
}
}

func TestChangefeedCustomKeyAvro(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit 0437796

Please sign in to comment.