Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: fix custom key column when used with CDC queries #115163

Merged
merged 1 commit into from Nov 29, 2023

Conversation

wenyihu6
Copy link
Contributor

@wenyihu6 wenyihu6 commented Nov 28, 2023

Previously, CDC custom key column caused incorrect behaviour with CDC queries,
leading to a retryable error: No column with name <name> in this row. This was
because we forgot to add columns to colsByName map when generating projection
rows for CDC queries. This caused
DatumNamed
to return a column not found error. This patch addresses the issue by populating
the colsByName map during the addition of columns for CDC queries. Note that
the issue with supporting ALTER CHANGEFEED with CDC queries remains
unresolved.
Fixes: #114196
Release note (bug fix): CDC custom key column now functions with CDC queries
correctly. For example, CREATE CHANGEFEED WITH key_column=..., unordered AS SELECT * FROM table now works correctly instead of retrying forever. Note that
some functionalities with CDC custom keys are not fully supported, see 115267
for more details.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@wenyihu6 wenyihu6 self-assigned this Nov 28, 2023
@wenyihu6
Copy link
Contributor Author

pkg/ccl/changefeedccl/changefeed_test.go line 3371 at r1 (raw file):

		// b="dog".
		assertPayloads(t, foo, []string{
			`foo: ["dog1"]->{"a": 1, "b": "dog1", "c": "cat1"}`,

Could you help me understand why changefeed sometimes emit "after" -> .... and sometimes does not? For example,

assertPayloads(t, foo, []string{`foo: [0]->{"a": 0, "b": "dog"}`})
doesn't emit after but https://github.com/cockroachdb/cockroach/blob/ba6423f916dc4006bb590c786cf53ad2b0213417/pkg/ccl/changefeedccl/changefeed_test.go#L1522C1-L1522C79 does emit after. Does it have to do with how CDC queries are used?

@wenyihu6 wenyihu6 marked this pull request as ready for review November 28, 2023 15:20
@wenyihu6 wenyihu6 requested a review from a team as a code owner November 28, 2023 15:20
@wenyihu6 wenyihu6 requested review from miretskiy and removed request for a team and miretskiy November 28, 2023 15:20
Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava and @wenyihu6)


-- commits line 16 at r1:
Not exactly the right wording -- changefeed is not hanging -- it's retrying forever.

And I actually think it shouldn't -- we should ensure that the error returned by DatumNamed
is treated as a fatal error. You should make this change, and you can do it by changing

if !ok {  
  return nil, errors.Errorf("No column with name %s in this row", n)  
}

to

 return nil, changefeedbase.WithTerminalError(errors.Errorf("No column with name %s in this row", n) )

or... perhaps changing errors.Errorf to errors.AssertionFailedf (which is treated as terminal error).


pkg/ccl/changefeedccl/cdcevent/projection.go line 57 at r1 (raw file):

	p.datums = append(p.datums, rowenc.EncDatum{})
	p.allCols = append(p.allCols, ord)
	p.colsByName[name] = ord

Okay -- I don't think this will work correct. You'll probably discover this as you add more tests cases suggested
below. Basically, colsByName is a map, and maps need to be initialized via make(...). You also probably need to copy column names when you construct projection (like we do with primary keys)_...


pkg/ccl/changefeedccl/changefeed_test.go line 3365 at r1 (raw file):

		sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'dog1', 'cat1')`)
		foo := feed(t, f,
			`CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE b='dog1'`)

This test needs to be expanded. You can either use table driven tests where you pass in various configuraiton options to this test... Or... You can create bunch of subtests withing.
You can take this line + everything up to assert below, and keep copy pasting different test cases.
For example:

   t.Run("star", func(t *testing.T) {
        foo := feed(t, f,
			`CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM'`)
         ....
   })
   t.Run("start_with_where", func(t *testing.T) {
       foo := feed(t, f,
			`CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE b='dog1'`)
   })

What I would like is to see tests that

  • Select explicit columns ("SELECT a, c...")
  • Try to use computed column as a key_column; for example CREATE CHANGEFEED WITH key_column='double_b' SELECT a, concat(b, b) AS double_b, c FROM ... (quite honestly, I'm not sure if it works -- and if not, we probably need to make it work).
  • Figure out if you can use special, hidden system columns as a key_column -- for example crdb_internal_mvcc_timestamp column. (You might have to explicitly select this column for it to have a chance of working).
  • Figure out if you can use the previous state of the row as a key_column: CREATE CHANGEFEED WITH key_column='old_dog' SELECT a, c, cdc_prev.b AS old_dog FROM ...
  • Take a look at various existing tests cdceval package for inspiration.

A lot of the above suggestions (like the use of computed columns) may not work. If that's the case, you need
to file an issue for that and it's okay to focus this PR on the simple case. And... You can still keep the above tests -- even if they don't work -- because you can add t.SkipWithIssue(....) call at the start of the test to indicate that we are skipping something that's known not to work.

(note to self: we really need to get @andyyang890 help to perhaps turn these types of tests into datadriven/ logictests).


pkg/ccl/changefeedccl/changefeed_test.go line 3371 at r1 (raw file):

Previously, wenyihu6 (Wenyi Hu) wrote…

Could you help me understand why changefeed sometimes emit "after" -> .... and sometimes does not? For example,

assertPayloads(t, foo, []string{`foo: [0]->{"a": 0, "b": "dog"}`})
doesn't emit after but https://github.com/cockroachdb/cockroach/blob/ba6423f916dc4006bb590c786cf53ad2b0213417/pkg/ccl/changefeedccl/changefeed_test.go#L1522C1-L1522C79 does emit after. Does it have to do with how CDC queries are used?

Yeah; this is the envelope funcitonality in changefeed.
https://www.cockroachlabs.com/docs/v23.1/changefeed-messages
The envelope "wrapps" the message with some structure. The default for regular changefeed is
to use "wrapped" envelope; while the default for CDC query is to use "bare" envelope (basically,
we emit whatever you SELECT). However, you can use wrapped envelope if you want with cdc query
by adding "WITH envelope='wrapped'" option to your changefeed statement.

@wenyihu6 wenyihu6 force-pushed the keycolumn branch 4 times, most recently from 09cbb30 to 71c25d5 Compare November 29, 2023 17:16
@wenyihu6
Copy link
Contributor Author

-- commits line 16 at r1:

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Not exactly the right wording -- changefeed is not hanging -- it's retrying forever.

And I actually think it shouldn't -- we should ensure that the error returned by DatumNamed
is treated as a fatal error. You should make this change, and you can do it by changing

if !ok {  
  return nil, errors.Errorf("No column with name %s in this row", n)  
}

to

 return nil, changefeedbase.WithTerminalError(errors.Errorf("No column with name %s in this row", n) )

or... perhaps changing errors.Errorf to errors.AssertionFailedf (which is treated as terminal error).

Done.

@wenyihu6
Copy link
Contributor Author

pkg/ccl/changefeedccl/changefeed_test.go line 3365 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

This test needs to be expanded. You can either use table driven tests where you pass in various configuraiton options to this test... Or... You can create bunch of subtests withing.
You can take this line + everything up to assert below, and keep copy pasting different test cases.
For example:

   t.Run("star", func(t *testing.T) {
        foo := feed(t, f,
			`CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM'`)
         ....
   })
   t.Run("start_with_where", func(t *testing.T) {
       foo := feed(t, f,
			`CREATE CHANGEFEED WITH key_column='b', unordered AS SELECT * FROM foo WHERE b='dog1'`)
   })

What I would like is to see tests that

  • Select explicit columns ("SELECT a, c...")
  • Try to use computed column as a key_column; for example CREATE CHANGEFEED WITH key_column='double_b' SELECT a, concat(b, b) AS double_b, c FROM ... (quite honestly, I'm not sure if it works -- and if not, we probably need to make it work).
  • Figure out if you can use special, hidden system columns as a key_column -- for example crdb_internal_mvcc_timestamp column. (You might have to explicitly select this column for it to have a chance of working).
  • Figure out if you can use the previous state of the row as a key_column: CREATE CHANGEFEED WITH key_column='old_dog' SELECT a, c, cdc_prev.b AS old_dog FROM ...
  • Take a look at various existing tests cdceval package for inspiration.

A lot of the above suggestions (like the use of computed columns) may not work. If that's the case, you need
to file an issue for that and it's okay to focus this PR on the simple case. And... You can still keep the above tests -- even if they don't work -- because you can add t.SkipWithIssue(....) call at the start of the test to indicate that we are skipping something that's known not to work.

(note to self: we really need to get @andyyang890 help to perhaps turn these types of tests into datadriven/ logictests).

Added more tests and tracked some usage that doesn't work here. Confirmed that SELECT 'crdb_internal_mvcc_timestamp` works but can't add as a unit test easily since the value is always changing.

// Different mvcc time every time expected  
\`select\_mvcc\_timestamp\`: {  
  createTableStmt:      \`CREATE TABLE foo (a INT PRIMARY KEY, b STRING)\`,  
  createChangeFeedStmt: \`CREATE CHANGEFEED WITH key\_column='crdb\_internal\_mvcc\_timestamp', unordered AS SELECT crdb\_internal\_mvcc\_timestamp FROM foo WHERE crdb\_internal\_mvcc\_timestamp > 0\`,  
  stmts:                \[\]string{\`INSERT INTO foo VALUES (0, 'dog')\`, \`INSERT INTO foo VALUES (1, 'dog1')\`},  
  payloadsAfterStmts:   \[\]string{\`foo: \[1701232020914949000.0000000000\]->{"crdb\_internal\_mvcc\_timestamp": 1701232020914949000.0000000000}\`},  
},

@wenyihu6
Copy link
Contributor Author

pkg/ccl/changefeedccl/changefeed_test.go line 3371 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Yeah; this is the envelope funcitonality in changefeed.
https://www.cockroachlabs.com/docs/v23.1/changefeed-messages
The envelope "wrapps" the message with some structure. The default for regular changefeed is
to use "wrapped" envelope; while the default for CDC query is to use "bare" envelope (basically,
we emit whatever you SELECT). However, you can use wrapped envelope if you want with cdc query
by adding "WITH envelope='wrapped'" option to your changefeed statement.

I see. Thanks for the explanation!

Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 1 files at r3, 1 of 1 files at r5.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava and @wenyihu6)


pkg/ccl/changefeedccl/changefeed_test.go line 3457 at r7 (raw file):

		t.Run(name, func(t *testing.T) {
			if test.skipWithIssue != "" {
				t.Skip(test.skipWithIssue)

I think we should be using skip.WithIssue(....) call instead of direct t.Skip.
Or... Maybe, just "print" a message to say that the test is being ignored
(if you want to avoid using skip.WithIssue because it's not necessarily a test
issue, but merely a gap in the functionality).

@wenyihu6
Copy link
Contributor Author

wenyihu6 commented Nov 29, 2023

pkg/ccl/changefeedccl/changefeed_test.go line 3457 at r7 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I think we should be using skip.WithIssue(....) call instead of direct t.Skip.
Or... Maybe, just "print" a message to say that the test is being ignored
(if you want to avoid using skip.WithIssue because it's not necessarily a test
issue, but merely a gap in the functionality).

Do you want me to create a separate issue here or just leave the original issue unclosed? Update: created a separate issue for this #115267

@wenyihu6 wenyihu6 force-pushed the keycolumn branch 2 times, most recently from 6347a6f to a77e1fb Compare November 29, 2023 18:15
Previously, CDC custom key column caused incorrect behaviour with CDC queries,
leading to a `retryable error: No column with name <name> in this row`. This was
because we forgot to add columns to `colsByName` map when generating projection
rows for CDC queries. This caused
[`DatumNamed`](https://github.com/cockroachdb/cockroach/blob/8d74d10e77f020dd0f5c98003f7976b63531de18/pkg/ccl/changefeedccl/cdcevent/event.go#L116-L122)
to return a column not found error, This patch addresses the issue by populating
the `colsByName` map during the addition of columns for CDC queries. Note that
the issue with supporting `ALTER CHANGEFEED` with CDC queries remains
unresolved.
Fixes: cockroachdb#114196
Release note (bug fix): CDC custom key column now functions with CDC queries
correctly. For example, `CREATE CHANGEFEED WITH key_column=..., unordered AS
SELECT * FROM table` now works correctly instead of retrying forever. Note that
some functionalities with CDC custom keys are not fully supported, see 115267
for more details.
@wenyihu6
Copy link
Contributor Author

This last push fixed a linter issue to use skip.WithIssue instead of t.Skip().

@wenyihu6
Copy link
Contributor Author

TFTR!

bors r=miretskiy

@craig
Copy link
Contributor

craig bot commented Nov 29, 2023

Build succeeded:

@craig craig bot merged commit d5a990b into cockroachdb:master Nov 29, 2023
8 checks passed
@miretskiy miretskiy added the backport-23.1.x Flags PRs that need to be backported to 23.1 label Dec 21, 2023
@miretskiy miretskiy added the backport-23.2.x Flags PRs that need to be backported to 23.2. label Dec 21, 2023
@wenyihu6
Copy link
Contributor Author

blathers backport 23.1 23.2

@celiala
Copy link
Collaborator

celiala commented Jan 9, 2024

blathers backport 23.1.14-rc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-23.1.x Flags PRs that need to be backported to 23.1 backport-23.2.x Flags PRs that need to be backported to 23.2.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

cdc: key_column option unsupported with CDC query
4 participants