Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.2: changefeedccl: fix custom key column when used with CDC queries #116967

Merged
merged 1 commit into from Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdcevent/event.go
Expand Up @@ -116,7 +116,7 @@ func (r Row) ForEachUDTColumn() Iterator {
func (r Row) DatumNamed(n string) (Iterator, error) {
idx, ok := r.EventDescriptor.colsByName[n]
if !ok {
return nil, errors.Errorf("No column with name %s in this row", n)
return nil, errors.AssertionFailedf("No column with name %s in this row", n)
}
return iter{r: r, cols: []int{idx}}, nil
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/ccl/changefeedccl/cdcevent/projection.go
Expand Up @@ -30,7 +30,9 @@ type Projection Row
// MakeProjection returns Projection builder given underlying descriptor.
func MakeProjection(d *EventDescriptor) Projection {
p := Projection{
EventDescriptor: &EventDescriptor{Metadata: d.Metadata},
EventDescriptor: &EventDescriptor{
Metadata: d.Metadata,
},
}

// Add all primary key columns.
Expand All @@ -54,6 +56,10 @@ func (p *Projection) addColumn(name string, typ *types.T, sqlString string, colI

p.datums = append(p.datums, rowenc.EncDatum{})
p.allCols = append(p.allCols, ord)
if p.colsByName == nil {
p.colsByName = make(map[string]int)
}
p.colsByName[name] = ord
*colIdxSlice = append(*colIdxSlice, ord)
if typ.UserDefined() {
p.udtCols = append(p.udtCols, ord)
Expand Down
124 changes: 124 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Expand Up @@ -3349,6 +3349,130 @@ func TestChangefeedCustomKey(t *testing.T) {
cdcTest(t, testFn, feedTestForceSink("kafka"))
}

// Reproduce issue for #114196. This test verifies that changefeed with custom
// key column works with CDC queries correctly.
func TestChangefeedCustomKeyColumnWithCDCQuery(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tests := map[string]struct {
shouldSkip bool
createTableStmt string
createChangeFeedStmt string
stmts []string
payloadsAfterStmts []string
}{
`select_star`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog", "c": "cat"}`, `foo: ["dog1"]->{"a": 1, "b": "dog1", "c": "cat1"}`},
},
`select_with_filter`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE b='dog'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog", "c": "cat"}`},
},
`select_multiple_columns`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='c', unordered AS SELECT b, c FROM foo WHERE b='dog'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["cat"]->{"b": "dog", "c": "cat"}`},
},
`custom_key_with_created_column`: {
shouldSkip: true,
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='double_b', unordered AS SELECT concat(b, c) AS double_b FROM foo`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["cat"]->{"c": "cat"}`},
},
`select_star_with_builtin_funcs`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='c', unordered AS SELECT *, concat(b, c) FROM foo`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["cat"]->{"a": 0, "b": "dog", "c": "cat", "concat": "dogcat"}`, `foo: ["cat1"]->{"a": 1, "b": "dog1", "c": "cat1", "concat": "dog1cat1"}`},
},
`select_stored_column`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d STRING AS (concat(b, c)) STORED)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='d', unordered AS SELECT * FROM foo WHERE d='dog1cat1'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["dog1cat1"]->{"a": 1, "b": "dog1", "c": "cat1", "d": "dog1cat1"}`},
},
`select_virtual_column`: {
shouldSkip: true,
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d STRING AS (concat(b, c)) VIRTUAL)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='d', unordered AS SELECT d FROM foo WHERE d='dog1cat1'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`},
payloadsAfterStmts: []string{`foo: ["dog1cat1"]->{"a": 1, "b": "dog1", "c": "cat1", "d": "dog1cat1"}`},
},
`select_with_filter_IN`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE b IN ('dog', 'dog1')`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `INSERT INTO foo VALUES (1, 'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog"}`, `foo: ["dog1"]->{"a": 1, "b": "dog1"}`},
},
`select_with_delete`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT *, event_op() = 'delete' AS deleted FROM foo WHERE 'hello' != 'world'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `INSERT INTO foo VALUES (1, 'dog1')`, `DELETE FROM foo WHERE a=1`},
payloadsAfterStmts: []string{`foo: [null]->{"a": 1, "b": null, "deleted": true}`, `foo: ["dog"]->{"a": 0, "b": "dog", "deleted": false}`, `foo: ["dog1"]->{"a": 1, "b": "dog1", "deleted": false}`},
},
`select_with_filter_delete`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE event_op() = 'delete'`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `INSERT INTO foo VALUES (1, 'dog1')`, `DELETE FROM foo WHERE a=1`},
payloadsAfterStmts: []string{`foo: [null]->{"a": 1, "b": null}`},
},
`select_with_cdc_prev`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT *, (cdc_prev) FROM foo`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `UPSERT INTO foo VALUES (0,'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog", "cdc_prev": null}`, `foo: ["dog1"]->{"a": 0, "b": "dog1", "cdc_prev": {"a": 0, "b": "dog"}}`},
},
`select_with_filter_cdc_prev`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT b FROM foo WHERE (cdc_prev).a = 0`,
stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `UPSERT INTO foo VALUES (0, 'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog1"]->{"b": "dog1"}`},
},
`select_with_hidden_column`: {
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING NOT VISIBLE)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT b FROM foo`,
stmts: []string{`INSERT INTO foo(a,b) VALUES (0, 'dog')`, `INSERT INTO foo(a,b) VALUES (1, 'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"b": "dog"}`, `foo: ["dog1"]->{"b": "dog1"}`},
},
`select_with_cdc_prev_column`: {
shouldSkip: true,
createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`,
createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='cdc_prev.a', unordered AS SELECT * FROM foo`,
stmts: []string{`INSERT INTO foo(a,b) VALUES (0, 'dog')`, `INSERT INTO foo(a,b) VALUES (1, 'dog1')`},
payloadsAfterStmts: []string{`foo: ["dog"]->{"b": "dog"}`, `foo: ["dog1"]->{"b": "dog1"}`},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
if test.shouldSkip {
t.Logf("skipping this test because %s is currently not supported; "+
"see #115267 for more details", name)
return
}
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, test.createTableStmt)
foo := feed(t, f, test.createChangeFeedStmt)
defer closeFeed(t, foo)
for _, stmt := range test.stmts {
sqlDB.Exec(t, stmt)
}
assertPayloads(t, foo, test.payloadsAfterStmts)
}
cdcTest(t, testFn, feedTestForceSink("kafka"))
})
}
}

func TestChangefeedCustomKeyAvro(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down