From 2d4f2a14a05b8e23a134cad42b435886657f8172 Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Mon, 27 Nov 2023 18:44:34 +0000 Subject: [PATCH] changefeedccl: fix custom key column when used with CDC queries Previously, CDC custom key column caused incorrect behaviour with CDC queries, leading to a `retryable error: No column with name in this row`. This was because we forgot to add columns to `colsByName` map when generating projection rows for CDC queries. This caused [`DatumNamed`](https://github.com/cockroachdb/cockroach/blob/8d74d10e77f020dd0f5c98003f7976b63531de18/pkg/ccl/changefeedccl/cdcevent/event.go#L116-L122) to return a column not found error, This patch addresses the issue by populating the `colsByName` map during the addition of columns for CDC queries. Note that the issue with supporting `ALTER CHANGEFEED` with CDC queries remains unresolved. Fixes: #114196 Release note (bug fix): CDC custom key column now functions with CDC queries correctly. For example, `CREATE CHANGEFEED WITH key_column=..., unordered AS SELECT * FROM table` now works correctly instead of retrying forever. Note that some functionalities with CDC custom keys are not fully supported, see 115267 for more details. --- pkg/ccl/changefeedccl/cdcevent/event.go | 2 +- pkg/ccl/changefeedccl/cdcevent/projection.go | 8 +- pkg/ccl/changefeedccl/changefeed_test.go | 124 +++++++++++++++++++ 3 files changed, 132 insertions(+), 2 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdcevent/event.go b/pkg/ccl/changefeedccl/cdcevent/event.go index 4ee1c102e911..9f993d5004eb 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event.go +++ b/pkg/ccl/changefeedccl/cdcevent/event.go @@ -116,7 +116,7 @@ func (r Row) ForEachUDTColumn() Iterator { func (r Row) DatumNamed(n string) (Iterator, error) { idx, ok := r.EventDescriptor.colsByName[n] if !ok { - return nil, errors.Errorf("No column with name %s in this row", n) + return nil, errors.AssertionFailedf("No column with name %s in this row", n) } return iter{r: r, cols: []int{idx}}, nil } diff --git a/pkg/ccl/changefeedccl/cdcevent/projection.go b/pkg/ccl/changefeedccl/cdcevent/projection.go index 1037a5d7fbe4..d72154bbf1a0 100644 --- a/pkg/ccl/changefeedccl/cdcevent/projection.go +++ b/pkg/ccl/changefeedccl/cdcevent/projection.go @@ -30,7 +30,9 @@ type Projection Row // MakeProjection returns Projection builder given underlying descriptor. func MakeProjection(d *EventDescriptor) Projection { p := Projection{ - EventDescriptor: &EventDescriptor{Metadata: d.Metadata}, + EventDescriptor: &EventDescriptor{ + Metadata: d.Metadata, + }, } // Add all primary key columns. @@ -54,6 +56,10 @@ func (p *Projection) addColumn(name string, typ *types.T, sqlString string, colI p.datums = append(p.datums, rowenc.EncDatum{}) p.allCols = append(p.allCols, ord) + if p.colsByName == nil { + p.colsByName = make(map[string]int) + } + p.colsByName[name] = ord *colIdxSlice = append(*colIdxSlice, ord) if typ.UserDefined() { p.udtCols = append(p.udtCols, ord) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index d7ad3ba6f18b..e5b1cf5edacd 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -3349,6 +3349,130 @@ func TestChangefeedCustomKey(t *testing.T) { cdcTest(t, testFn, feedTestForceSink("kafka")) } +// Reproduce issue for #114196. This test verifies that changefeed with custom +// key column works with CDC queries correctly. +func TestChangefeedCustomKeyColumnWithCDCQuery(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + tests := map[string]struct { + shouldSkip bool + createTableStmt string + createChangeFeedStmt string + stmts []string + payloadsAfterStmts []string + }{ + `select_star`: { + createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`, + createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo`, + stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`}, + payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog", "c": "cat"}`, `foo: ["dog1"]->{"a": 1, "b": "dog1", "c": "cat1"}`}, + }, + `select_with_filter`: { + createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`, + createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE b='dog'`, + stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`}, + payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog", "c": "cat"}`}, + }, + `select_multiple_columns`: { + createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`, + createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='c', unordered AS SELECT b, c FROM foo WHERE b='dog'`, + stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`}, + payloadsAfterStmts: []string{`foo: ["cat"]->{"b": "dog", "c": "cat"}`}, + }, + `custom_key_with_created_column`: { + shouldSkip: true, + createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`, + createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='double_b', unordered AS SELECT concat(b, c) AS double_b FROM foo`, + stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`}, + payloadsAfterStmts: []string{`foo: ["cat"]->{"c": "cat"}`}, + }, + `select_star_with_builtin_funcs`: { + createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING)`, + createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='c', unordered AS SELECT *, concat(b, c) FROM foo`, + stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`}, + payloadsAfterStmts: []string{`foo: ["cat"]->{"a": 0, "b": "dog", "c": "cat", "concat": "dogcat"}`, `foo: ["cat1"]->{"a": 1, "b": "dog1", "c": "cat1", "concat": "dog1cat1"}`}, + }, + `select_stored_column`: { + createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d STRING AS (concat(b, c)) STORED)`, + createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='d', unordered AS SELECT * FROM foo WHERE d='dog1cat1'`, + stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`}, + payloadsAfterStmts: []string{`foo: ["dog1cat1"]->{"a": 1, "b": "dog1", "c": "cat1", "d": "dog1cat1"}`}, + }, + `select_virtual_column`: { + shouldSkip: true, + createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d STRING AS (concat(b, c)) VIRTUAL)`, + createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='d', unordered AS SELECT d FROM foo WHERE d='dog1cat1'`, + stmts: []string{`INSERT INTO foo VALUES (0, 'dog', 'cat')`, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`}, + payloadsAfterStmts: []string{`foo: ["dog1cat1"]->{"a": 1, "b": "dog1", "c": "cat1", "d": "dog1cat1"}`}, + }, + `select_with_filter_IN`: { + createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`, + createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE b IN ('dog', 'dog1')`, + stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `INSERT INTO foo VALUES (1, 'dog1')`}, + payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog"}`, `foo: ["dog1"]->{"a": 1, "b": "dog1"}`}, + }, + `select_with_delete`: { + createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`, + createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT *, event_op() = 'delete' AS deleted FROM foo WHERE 'hello' != 'world'`, + stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `INSERT INTO foo VALUES (1, 'dog1')`, `DELETE FROM foo WHERE a=1`}, + payloadsAfterStmts: []string{`foo: [null]->{"a": 1, "b": null, "deleted": true}`, `foo: ["dog"]->{"a": 0, "b": "dog", "deleted": false}`, `foo: ["dog1"]->{"a": 1, "b": "dog1", "deleted": false}`}, + }, + `select_with_filter_delete`: { + createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`, + createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE event_op() = 'delete'`, + stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `INSERT INTO foo VALUES (1, 'dog1')`, `DELETE FROM foo WHERE a=1`}, + payloadsAfterStmts: []string{`foo: [null]->{"a": 1, "b": null}`}, + }, + `select_with_cdc_prev`: { + createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`, + createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT *, (cdc_prev) FROM foo`, + stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `UPSERT INTO foo VALUES (0,'dog1')`}, + payloadsAfterStmts: []string{`foo: ["dog"]->{"a": 0, "b": "dog", "cdc_prev": null}`, `foo: ["dog1"]->{"a": 0, "b": "dog1", "cdc_prev": {"a": 0, "b": "dog"}}`}, + }, + `select_with_filter_cdc_prev`: { + createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`, + createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT b FROM foo WHERE (cdc_prev).a = 0`, + stmts: []string{`INSERT INTO foo VALUES (0, 'dog')`, `UPSERT INTO foo VALUES (0, 'dog1')`}, + payloadsAfterStmts: []string{`foo: ["dog1"]->{"b": "dog1"}`}, + }, + `select_with_hidden_column`: { + createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING NOT VISIBLE)`, + createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT b FROM foo`, + stmts: []string{`INSERT INTO foo(a,b) VALUES (0, 'dog')`, `INSERT INTO foo(a,b) VALUES (1, 'dog1')`}, + payloadsAfterStmts: []string{`foo: ["dog"]->{"b": "dog"}`, `foo: ["dog1"]->{"b": "dog1"}`}, + }, + `select_with_cdc_prev_column`: { + shouldSkip: true, + createTableStmt: `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`, + createChangeFeedStmt: `CREATE CHANGEFEED WITH key_column='cdc_prev.a', unordered AS SELECT * FROM foo`, + stmts: []string{`INSERT INTO foo(a,b) VALUES (0, 'dog')`, `INSERT INTO foo(a,b) VALUES (1, 'dog1')`}, + payloadsAfterStmts: []string{`foo: ["dog"]->{"b": "dog"}`, `foo: ["dog1"]->{"b": "dog1"}`}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + if test.shouldSkip { + t.Logf("skipping this test because %s is currently not supported; "+ + "see #115267 for more details", name) + return + } + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, test.createTableStmt) + foo := feed(t, f, test.createChangeFeedStmt) + defer closeFeed(t, foo) + for _, stmt := range test.stmts { + sqlDB.Exec(t, stmt) + } + assertPayloads(t, foo, test.payloadsAfterStmts) + } + cdcTest(t, testFn, feedTestForceSink("kafka")) + }) + } +} + func TestChangefeedCustomKeyAvro(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)