Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: fix custom key column when used with CDC queries #115163

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdcevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (r Row) ForEachUDTColumn() Iterator {
func (r Row) DatumNamed(n string) (Iterator, error) {
idx, ok := r.EventDescriptor.colsByName[n]
if !ok {
return nil, errors.Errorf("No column with name %s in this row", n)
return nil, errors.AssertionFailedf("No column with name %s in this row", n)
}
return iter{r: r, cols: []int{idx}}, nil
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/ccl/changefeedccl/cdcevent/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ type Projection Row
// MakeProjection returns Projection builder given underlying descriptor.
func MakeProjection(d *EventDescriptor) Projection {
p := Projection{
EventDescriptor: &EventDescriptor{Metadata: d.Metadata},
EventDescriptor: &EventDescriptor{
Metadata: d.Metadata,
},
}

// Add all primary key columns.
Expand All @@ -54,6 +56,10 @@ func (p *Projection) addColumn(name string, typ *types.T, sqlString string, colI

p.datums = append(p.datums, rowenc.EncDatum{})
p.allCols = append(p.allCols, ord)
if p.colsByName == nil {
p.colsByName = make(map[string]int)
}
p.colsByName[name] = ord
*colIdxSlice = append(*colIdxSlice, ord)
if typ.UserDefined() {
p.udtCols = append(p.udtCols, ord)
Expand Down
124 changes: 124 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3349,6 +3349,130 @@ func TestChangefeedCustomKey(t *testing.T) {
cdcTest(t, testFn, feedTestForceSink("kafka"))
}

// Reproduce issue for #114196. This test verifies that changefeed with custom
// key column works with CDC queries correctly.
func TestChangefeedCustomKeyColumnWithCDCQuery(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tests := map[string]struct {
shouldSkip bool
createTableStmt string
createChangeFeedStmt string
stmts []string
payloadsAfterStmts []string
}{
`select_star`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog", "c": "cat"}`, `foo: ["dog1"]->{"a": 1, "b": "dog1", "c": "cat1"}`},
},
`select_with_filter`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE b='dog'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog", "c": "cat"}`},
},
`select_multiple_columns`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='c', unordered AS SELECT b, c FROM foo WHERE b='dog'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["cat"]->{"b": "dog", "c": "cat"}`},
},
`custom_key_with_created_column`: {
shouldSkip: true,
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='double_b', unordered AS SELECT concat(b, c) AS double_b FROM foo`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["cat"]->{"c": "cat"}`},
},
`select_star_with_builtin_funcs`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='c', unordered AS SELECT *, concat(b, c) FROM foo`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["cat"]->{"a": 0, "b": "dog", "c": "cat", "concat": "dogcat"}`, `foo: ["cat1"]->{"a": 1, "b": "dog1", "c": "cat1", "concat": "dog1cat1"}`},
},
`select_stored_column`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d STRING AS (concat(b, c)) STORED)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='d', unordered AS SELECT * FROM foo WHERE d='dog1cat1'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["dog1cat1"]->{"a": 1, "b": "dog1", "c": "cat1", "d": "dog1cat1"}`},
},
`select_virtual_column`: {
shouldSkip: true,
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d STRING AS (concat(b, c)) VIRTUAL)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='d', unordered AS SELECT d FROM foo WHERE d='dog1cat1'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["dog1cat1"]->{"a": 1, "b": "dog1", "c": "cat1", "d": "dog1cat1"}`},
},
`select_with_filter_IN`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE b IN ('dog', 'dog1')`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `INSERT INTO foo VALUES (1, 'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog"}`, `foo: ["dog1"]->{"a": 1, "b": "dog1"}`},
},
`select_with_delete`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT *, event_op() = 'delete' AS deleted FROM foo WHERE 'hello' != 'world'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `INSERT INTO foo VALUES (1, 'dog1')`, `DELETE FROM foo WHERE a=1`},
payloadsAfterStmts: []string{`foo: [null]->{"a": 1, "b": null, "deleted": true}`, `foo: ["dog"]->{"a": 0, "b": "dog", "deleted": false}`, `foo: ["dog1"]->{"a": 1, "b": "dog1", "deleted": false}`},
},
`select_with_filter_delete`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE event_op() = 'delete'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `INSERT INTO foo VALUES (1, 'dog1')`, `DELETE FROM foo WHERE a=1`},
payloadsAfterStmts: []string{`foo: [null]->{"a": 1, "b": null}`},
},
`select_with_cdc_prev`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT *, (cdc_prev) FROM foo`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `UPSERT INTO foo VALUES (0,'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog", "cdc_prev": null}`, `foo: ["dog1"]->{"a": 0, "b": "dog1", "cdc_prev": {"a": 0, "b": "dog"}}`},
},
`select_with_filter_cdc_prev`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT b FROM foo WHERE (cdc_prev).a = 0`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `UPSERT INTO foo VALUES (0, 'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog1"]->{"b": "dog1"}`},
},
`select_with_hidden_column`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING NOT VISIBLE)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT b FROM foo`,
stmts: []string{`INSERT INTO foo(a,b) VALUES (0, 'dog')`, `INSERT INTO foo(a,b) VALUES (1, 'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"b": "dog"}`, `foo: ["dog1"]->{"b": "dog1"}`},
},
`select_with_cdc_prev_column`: {
shouldSkip: true,
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='cdc_prev.a', unordered AS SELECT * FROM foo`,
stmts: []string{`INSERT INTO foo(a,b) VALUES (0, 'dog')`, `INSERT INTO foo(a,b) VALUES (1, 'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"b": "dog"}`, `foo: ["dog1"]->{"b": "dog1"}`},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
if test.shouldSkip {
t.Logf("skipping this test because %s is currently not supported; "+
"see #115267 for more details", name)
return
}
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, test.createTableStmt)
foo := feed(t, f, test.createChangeFeedStmt)
defer closeFeed(t, foo)
for _, stmt := range test.stmts {
sqlDB.Exec(t, stmt)
}
assertPayloads(t, foo, test.payloadsAfterStmts)
}
cdcTest(t, testFn, feedTestForceSink("kafka"))
})
}
}

func TestChangefeedCustomKeyAvro(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down