-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cdc-sink: Overhaul #73
Conversation
cf3d314
to
14854be
Compare
6c0245c
to
603fb84
Compare
Codecov Report
@@ Coverage Diff @@
## master #73 +/- ##
=========================================
Coverage ? 77.14%
=========================================
Files ? 25
Lines ? 1260
Branches ? 0
=========================================
Hits ? 972
Misses ? 211
Partials ? 77 Continue to review full report at Codecov.
|
I have pushed some revisions based on the desk review yesterday.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 18 of 41 files at r1, 18 of 22 files at r3, 1 of 1 files at r5, 5 of 5 files at r6, 2 of 3 files at r7, 1 of 1 files at r8, 24 of 24 files at r9, all commit messages.
Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @bobvawter and @BramGruneir)
internal/sinktypes/sinktypes.go, line 15 at r8 (raw file):
// the types into this package is to make it easy to compose // functionality as the cdc-sink project evolves. package sinktypes
Should we call this package simply "types" (or model/interfaces)?
internal/sinktypes/sinktypes.go, line 79 at r8 (raw file):
// A TimeSwapper maintains a durable map of string keys to timestamps. type TimeSwapper interface {
Perhaps we can call it TimeKeeper? And the method simply Update?
internal/util/ident/ident.go, line 12 at r8 (raw file):
// Package ident contains types for safely representing SQL identifiers. package ident
Like this approach. Makes the (rest of the) code much more robust and easier to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 35 of 52 files reviewed, 2 unresolved discussions (waiting on @BramGruneir and @sravotto)
internal/sinktypes/sinktypes.go, line 15 at r8 (raw file):
Previously, sravotto (silvano) wrote…
Should we call this package simply "types" (or model/interfaces)?
Renamed it to "types".
internal/sinktypes/sinktypes.go, line 79 at r8 (raw file):
Previously, sravotto (silvano) wrote…
Perhaps we can call it TimeKeeper? And the method simply Update?
Switch it to TimeKeeper.Put()
since it's map-like and one puts into a map.
internal/frontend/cdc/resolved.go, line 128 at r10 (raw file):
Do we throw an error if we get one that doesn't match? A table we were not expecting? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 15 of 41 files at r1, 5 of 22 files at r3, 1 of 5 files at r6, 1 of 3 files at r7, 1 of 1 files at r8, 12 of 24 files at r9, 17 of 17 files at r10, all commit messages.
Reviewable status: all files reviewed, 36 unresolved discussions (waiting on @bobvawter and @sravotto)
internal/backend/apply/apply.go, line 162 at r10 (raw file):
defer res.Close() for i, j := 0, batch.Len(); i < j; i++ {
Is the assignment of j needed here?
internal/backend/apply/apply.go, line 194 at r10 (raw file):
rawColName := col.Name.Raw() decoded, ok := temp[rawColName] delete(temp, rawColName)
These seems slightly out of order. I think they'll work, (delete something that doesn't exist,) but the logic doesn't flow cleanly.
internal/backend/apply/apply.go, line 252 at r10 (raw file):
_, _ = fmt.Fprintf(&delete, "DELETE FROM %s WHERE (", a.target) _, _ = fmt.Fprintf(&upsert, "UPSERT INTO %s (", a.target) for i := range colData {
why not
for i, col :=
cleans up a lot of the colData[i] you're using all over all the place
internal/backend/apply/apply.go, line 270 at r10 (raw file):
delete.WriteString(") IN (SELECT ") upsert.WriteString(") SELECT ") for i := range colData {
Same here
internal/backend/apply/factory.go, line 39 at r10 (raw file):
// Get returns a memoized instance of the Applier for the table. func (f *factory) Get(
I'm a little confused by the api here.
Get and New are the only exposed functions?
Get --> getUnlocked
New --> returns a factory
But no Create?
? --> createUnlocked
internal/backend/mutation/factory.go, line 43 at r10 (raw file):
} func (f *factory) createUnlocked(ctx context.Context, table ident.Table) (*store, error) {
Are createUnlocked and getUnlocked called from anywhere else? Do you need them to be their own functions? I'm guessing for unit tests?
internal/backend/mutation/store.go, line 50 at r10 (raw file):
// newStore constructs a new mutation store that will track pending // mutations to be applied to the given target table. func newStore(
Shouldn't this be New?
internal/backend/mutation/store.go, line 133 at r10 (raw file):
batch := &pgx.Batch{} for i := range mutations {
Why not use for i, mut := range mutations instead of indexing in every time?
internal/backend/mutation/store_test.go, line 79 at r10 (raw file):
// Dequeue. ret, err := s.Drain(ctx, dbInfo.Pool(),
style nit, I thought our usual style was to put only the ctx on the first line then the rest on the 2nd. Or to put everything on the 2nd line with the close bracket on the 3rd
Hmm... can we get the style guide from cockroach?
internal/backend/schemawatch/coldata.go, line 37 at r10 (raw file):
// Retrieve the primary key columns in their index-order, then append // any remaining non-generated columns. const sqlColumnsQuery = `
Wow... is the most efficient way? It makes sense but I'm just surprised you need this complex a query.
internal/backend/schemawatch/coldata_test.go, line 90 at r10 (raw file):
// Virtual columns not supported before v21.1 if !strings.Contains(dbInfo.Version(), "v20.2.") {
If you have a few more tests that need version numbers, it makes sense to create a utility to determine if the version is newer, older or equal. Can you just add a comment about how the older version supported is v20.2? Perhaps that info should go in the readme as well.
internal/backend/schemawatch/watcher.go, line 48 at r10 (raw file):
mu struct { sync.RWMutex cond sync.Cond // Conditional on the RLocker
Should cond be under mu if a lock isn't required for its use?
I see it's a weird mixed thing... Just needs a read lock.
So I find this a little confusing. Simplifying the locking here might be worthwhile.
internal/backend/schemawatch/watcher.go, line 112 at r10 (raw file):
w.mu.data = data w.mu.Unlock() w.mu.cond.Broadcast()
Why take this out of the lock?
internal/backend/schemawatch/watcher.go, line 153 at r10 (raw file):
// Respond to context cancellation or dropping the table. if !ok || ctx.Err() != nil { return
Shouldn't this do something with the error?
internal/backend/schemawatch/watcher_test.go, line 41 at r10 (raw file):
// Bootstrap column. tblInfo, err := sinktest.CreateTable(ctx, dbName, "CREATE TABLE %s (pk INT PRIMARY KEY)")
Do you delete this table on test cleanup?
internal/backend/schemawatch/watcher_test.go, line 59 at r10 (raw file):
select { case <-time.After(10 * time.Second):
Consider setting these 10s much smaller, as this is unit test.
internal/backend/schemawatch/watchers.go, line 35 at r10 (raw file):
// NewWatchers creates a Watchers factory. func NewWatchers(pool *pgxpool.Pool) (_ *Watchers, cancel func()) {
So in some factories you call this New, in other the name of the struct you're creating and in this case, NewStruct. Can you do a quick audit and make them all match in style?
internal/backend/sinktest/info.go, line 23 at r10 (raw file):
// DBInfo encapsulates metadata and a connection to a database. type DBInfo struct {
perhaps name this file dbinfo to match?
internal/backend/sinktest/info.go, line 45 at r10 (raw file):
} // DeleteAll deletes (not TRUNCATEs) all rows in the table.
Why no Create here? Just Drop?
internal/backend/sinktest/sinktest.go, line 102 at r10 (raw file):
cancel = func() { err := retry.Execute(ctx, db, fmt.Sprintf("DROP DATABASE IF EXISTS %s CASCADE", name))
Ah, dropping happens here. Good, but perhaps this whole function should be part of dbinfo?
I'm having a bit an of an issue understanding the line between this file and info.go
internal/backend/timestamp/store.go, line 30 at r10 (raw file):
// store implements a simple key/value store for HLC timestamps. type store struct {
Store is very generic. Should we call it timekeeper instead? The file and the type?
internal/backend/timestamp/store_test.go, line 43 at r10 (raw file):
prev := hlc.Zero() for i := 0; i <= count; i++ { next := hlc.New(int64(1000*i), i)
Please add comment on exactly what this test is testing.
internal/frontend/cdc/handler_test.go, line 34 at r10 (raw file):
} func testHandler(t *testing.T, immediate bool) {
Please add some comments within the tests.
internal/frontend/cdc/webhook.go, line 16 at r10 (raw file):
) var (
So should we be using webhook or https? It it worth it to support both?
internal/frontend/cdc/webhook.go, line 70 at r10 (raw file):
} // First, we'll aggregate the mutations by target table. We know
You have a first, but not a second or third.
internal/frontend/server/integration_test.go, line 49 at r10 (raw file):
// Prefer the webhook format for current versions of CRDB. useWebhook := true if strings.Contains(dbInfo.Version(), "v20.2.") || strings.Contains(dbInfo.Version(), "v21.1.") {
Again, a nice version function for comparing would be better.
internal/frontend/server/integration_test.go, line 56 at r10 (raw file):
defer func() { *GenerateSelfSigned = false }() srv, err := newServer(ctx, "127.0.0.1:0", dbInfo.Pool().Config().ConnString(), false)
Do we need to worry if that's already in use?
internal/frontend/server/integration_test.go, line 84 at r10 (raw file):
if useWebhook { feedURL = url.URL{ Scheme: "webhook-https",
So webhook is beta, http is experimental. What is the difference? (This is not a question for you.)
internal/frontend/server/server.go, line 45 at r10 (raw file):
ConnectionString = flag.String( "conn", "postgresql://root@localhost:26257/?sslmode=disable",
We're going to need to get this working with certs and a non-root user soon.
internal/frontend/server/server.go, line 49 at r10 (raw file):
) IgnoreResolved = flag.Bool("ignoreResolved", false,
This shouldn't be a flag on all of cdc-sink, but instead on a per feed basis.
internal/frontend/server/server.go, line 136 at r10 (raw file):
} go func() {
Go really needs a better closer. Or at least some syntactic sugar over this.
internal/frontend/server/tls_config.go, line 42 at r10 (raw file):
// Loosely based on https://golang.org/src/crypto/tls/generate_cert.go if *GenerateSelfSigned { priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
I'm just going to trust this works.
internal/util/batches/batches.go, line 29 at r10 (raw file):
// of values in a batch-oriented fashion. The indexes provided to // the callback function are a half-open range [begin , end). func Batch(count int, fn func(begin, end int) error) error {
This is really nice.
internal/util/batches/batches.go, line 29 at r10 (raw file):
// of values in a batch-oriented fashion. The indexes provided to // the callback function are a half-open range [begin , end). func Batch(count int, fn func(begin, end int) error) error {
Please add tests for batch.
internal/util/ident/ident.go, line 79 at r10 (raw file):
// Empty returns true if the identifier is empty. func (n Ident) Empty() bool {
Shouldn't this be IsEmpty()
internal/util/ident/ident.go, line 85 at r10 (raw file):
// Raw returns the original, raw value. func (n Ident) Raw() string { return strings.ReplaceAll(n.q[1:len(n.q)-1], `""`, `"`)
Why not just store the raw? Because this is clearly a lie :)
internal/util/ident/ident_test.go, line 49 at r10 (raw file):
expectError bool qual Qualification }{
Please add some more test cases that would produce errors. Empty schema, empty db, empty table, etc.
internal/util/retry/retry.go, line 11 at r10 (raw file):
// licenses/APL.txt. // Package retry contains utility code for retrying database transactions.
Is there any way to not re-implement this ourselves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a lot of code, I went through it all.
Not many changes needed I think.
I still don't like the frontend/backend names.
How about sources/target?
Reviewable status: all files reviewed, 36 unresolved discussions (waiting on @bobvawter and @sravotto)
This change overhauls the cdc-sink code to split it into well-defined packages and APIs. Notable functional changes: A cdc-sink endpoint now operates on an entire database schema (i.e. the second-level namespace) at once, since this is the use-case that has been most prominent in discussions around our preferred microservice architecture. For users that only ever use the "public" schema in their databases, this will handle whole-database use cases. An "immediate" mode is supported, which applies incoming data without waiting for resolved timestamps. This is intended for use when backfilling large datasets or if a high-volume changefeed must catch up after an outage. It is not expected to be the default configuration for cdc-sink. The cdc-sink code can now detect and optionally recover from a limited amount of structural schema drift between the source and target databases. The schema for each target database is held in memory and refreshed from time to time. Drift is checked during resolved-timestamp flushes, which will effectively pause a changefeed until the stored payloads are at least structurally-compatible with the target tables. The new webhook-https:// scheme in CockroachDB v21.2 is now supported. This necessitates that cdc-sink can use a TLS-enabled HTTP server. The option now exists to load a certificate and private key from disk. For testing purposes, a self-signed certifcate can be internally generated by cdc-sink. Notes to reviewers: The internal/types package defines interfaces for the major moving parts of the revised cdc-sink code base. At present, each interface is implemented by a single type, but having small APIs has been useful in identifying the independent parts of cdc-sink. Similarly, the package structure may be overly fine-grained in favor of identifying specific, small portions of code that compose easily. A comment at the top of most files indicates where code was repackaged from. Recommended package review order: types, source/cdc, target/stage, target/apply, target/timestamp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TFTR. PTAL. TANSTAAFL.
Renamed the mid-level packages to to sources
and target
.
Reviewable status: 20 of 55 files reviewed, 36 unresolved discussions (waiting on @BramGruneir and @sravotto)
internal/util/batches/batches.go, line 29 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
This is really nice.
Thanks.
internal/util/batches/batches.go, line 29 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Please add tests for batch.
Done.
internal/util/ident/ident.go, line 79 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Shouldn't this be IsEmpty()
Done.
internal/util/ident/ident.go, line 85 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Why not just store the raw? Because this is clearly a lie :)
Changed to store both the quoted and the raw, since they're used in pretty much equal proportions throughout the code.
internal/util/ident/ident_test.go, line 49 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Please add some more test cases that would produce errors. Empty schema, empty db, empty table, etc.
Done.
internal/backend/sinktest/info.go, line 23 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
perhaps name this file dbinfo to match?
I split out db_info.go
and table_info.go
. Moved the relevant creation functions into those two files, as well.
internal/backend/sinktest/info.go, line 45 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Why no Create here? Just Drop?
See above comment.
internal/backend/apply/apply.go, line 162 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Is the assignment of j needed here?
Changed this to use a countdown loop instead. The pgx.Batch
type is opaque.
internal/backend/apply/apply.go, line 194 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
These seems slightly out of order. I think they'll work, (delete something that doesn't exist,) but the logic doesn't flow cleanly.
I reworked this to not use delete()
, but to instead accumulate a set of known column names that are present in the incoming column data. The variable names were expanded to make them easier to think through.
internal/backend/apply/apply.go, line 252 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
why not
for i, col :=cleans up a lot of the colData[i] you're using all over all the place
Done.
internal/backend/apply/apply.go, line 270 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Same here
Done.
internal/backend/apply/factory.go, line 39 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
I'm a little confused by the api here.
Get and New are the only exposed functions?
Get --> getUnlocked
New --> returns a factoryBut no Create?
? --> createUnlocked
Get()
calls createUnlocked()
. I've renamed createUnlocked()
to getOrCreateUnlocked()
to be more explicit.
internal/backend/mutation/factory.go, line 43 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Are createUnlocked and getUnlocked called from anywhere else? Do you need them to be their own functions? I'm guessing for unit tests?
They exist as two different functions because getUnlocked()
is read-locked and createUnlocked()
is write-locked. The create method has been renamed and comments added to explain the differences in locking.
internal/backend/mutation/store.go, line 50 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Shouldn't this be New?
newStore()
isn't public because it is called only by the factory, and new
is a keyword.
internal/backend/mutation/store.go, line 133 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Why not use for i, mut := range mutations instead of indexing in every time?
Done.
internal/backend/mutation/store_test.go, line 79 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
style nit, I thought our usual style was to put only the ctx on the first line then the rest on the 2nd. Or to put everything on the 2nd line with the close bracket on the 3rd
Hmm... can we get the style guide from cockroach?
Fixed the chop-down.
internal/backend/schemawatch/coldata.go, line 37 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Wow... is the most efficient way? It makes sense but I'm just surprised you need this complex a query.
I couldn't find a simpler way to set the query up to concatenate two result sets, while preserving order in the result set.
internal/backend/schemawatch/coldata_test.go, line 90 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
If you have a few more tests that need version numbers, it makes sense to create a utility to determine if the version is newer, older or equal. Can you just add a comment about how the older version supported is v20.2? Perhaps that info should go in the readme as well.
Added a comment. Will add a version enum or something if we wind up having more version checks in the future.
internal/backend/schemawatch/watcher.go, line 48 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Should cond be under mu if a lock isn't required for its use?
I see it's a weird mixed thing... Just needs a read lock.So I find this a little confusing. Simplifying the locking here might be worthwhile.
I moved cond
outside of the mu
struct since it is never mutated. The condition, though, is still on mu
's read-lock.
internal/backend/schemawatch/watcher.go, line 112 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Why take this out of the lock?
Moved cond
outside of mu
, so the timing now makes more sense. sync.Cond
does not require callers to hold the lock when signaling.
internal/backend/schemawatch/watcher.go, line 153 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Shouldn't this do something with the error?
There's nothing to do with the context's error.
internal/backend/schemawatch/watcher_test.go, line 41 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Do you delete this table on test cleanup?
The database is dropped.
internal/backend/schemawatch/watcher_test.go, line 59 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Consider setting these 10s much smaller, as this is unit test.
Done.
internal/backend/schemawatch/watchers.go, line 35 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
So in some factories you call this New, in other the name of the struct you're creating and in this case, NewStruct. Can you do a quick audit and make them all match in style?
The methods are now all named NewFactory()
internal/backend/sinktest/sinktest.go, line 102 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Ah, dropping happens here. Good, but perhaps this whole function should be part of dbinfo?
I'm having a bit an of an issue understanding the line between this file and info.go
Moved to db_info.go
.
internal/backend/timestamp/store.go, line 30 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Store is very generic. Should we call it timekeeper instead? The file and the type?
Renamed all to timekeeper
.
internal/backend/timestamp/store_test.go, line 43 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Please add comment on exactly what this test is testing.
Added comment to the test method.
internal/frontend/cdc/handler_test.go, line 34 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Please add some comments within the tests.
Done.
internal/frontend/cdc/resolved.go, line 128 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Do we throw an error if we get one that doesn't match? A table we were not expecting?
Revised the comment here that it's a filtering operation.
internal/frontend/cdc/webhook.go, line 16 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
So should we be using webhook or https? It it worth it to support both?
Supporting both really isn't that big of a deal, the difference is in the payload-decoder. The bulk of the machinery is identical. I also think we should strive to have compatibility with any CRDB release that's within the company's support window.
internal/frontend/cdc/webhook.go, line 70 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
You have a first, but not a second or third.
Turns out that was all that was necessary. Fixed the comment.
internal/frontend/server/integration_test.go, line 49 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Again, a nice version function for comparing would be better.
Agreed, but will save that for v22.1, if it becomes necessary.
internal/frontend/server/integration_test.go, line 56 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Do we need to worry if that's already in use?
We don't, since we're binding to an ephemeral port number and then reading the bound address back when setting up the changefeed.
internal/frontend/server/server.go, line 45 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
We're going to need to get this working with certs and a non-root user soon.
Sure, but that shouldn't affect any of the cdc-sink code.
internal/frontend/server/server.go, line 49 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
This shouldn't be a flag on all of cdc-sink, but instead on a per feed basis.
Per Slack discussion, immediate-mode has moved to an ?immediate=true
query parameter passed by the changefeed. This will ensure that the configuration is globally synchronized. The integration test has been tweaked to have immediate and deferred sub-tests.
internal/frontend/server/tls_config.go, line 42 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
I'm just going to trust this works.
The resulting certificate is used in the webhook-https
test.
internal/util/retry/retry.go, line 11 at r10 (raw file):
Previously, BramGruneir (Bram Gruneir) wrote…
Is there any way to not re-implement this ourselves?
This retry logic implements reentrancy, to delegate any retrying to the outermost call to Loop()
. This makes it much easier to write tests, since you can arbitrarily compose code that needs retry behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 20 of 55 files reviewed, 36 unresolved discussions (waiting on @BramGruneir and @sravotto)
internal/frontend/cdc/resolved.go, line 128 at r10 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Revised the comment here that it's a filtering operation.
Meant to say that I moved the filtering operation into the Snapshot()
method, which now takes the user-defined schema as an argumunt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to look up TANSTAAFL.
Reviewed 35 of 35 files at r11, all commit messages.
Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @BramGruneir and @sravotto)
internal/frontend/cdc/webhook.go, line 16 at r10 (raw file):
Previously, bobvawter (Bob Vawter) wrote…
Supporting both really isn't that big of a deal, the difference is in the payload-decoder. The bulk of the machinery is identical. I also think we should strive to have compatibility with any CRDB release that's within the company's support window.
SGTM
This change overhauls the cdc-sink code to split it into well-defined packages
and APIs.
Notable functional changes:
A cdc-sink endpoint now operates on an entire database at once, since this is
the use-case that has been most prominent in discussions.
An "immediate" mode is supported, which applies incoming data without waiting
for resolved timestamps. This is intended for use when backfilling large
datasets or if a high-volume changefeed must catch up after an outage. It is
not expected to be the default configuration for cdc-sink.
The cdc-sink code can now detect and recover from a limited amount of
structural schema drift between the source and target databases. The schema for
each target database is held in memory and refreshed from time to time. Drift
is checked during resolved-timestamp flushes, which will effectively pause a
changefeed until the stored payloads are at least structurally-compatible with
the target tables.
Notes to reviewers:
The sinktypes package defines interfaces for the major moving parts of the
revised cdc-sink code base. At present, each interface is implemented by a
single type, but having small APIs has been useful in identifying the
independent parts of cdc-sink.
Similarly, the package structure may be overly fine-grained in favor of
identifying specific, small portions of code that compose easily.
A comment at the top of most files indicates where code was repackaged from.
Recommended package review order:
sinktypes, cdc/handler, backend/mutation, backend/apply, backend/timestamp
This change is![Reviewable](https://camo.githubusercontent.com/23b05f5fb48215c989e92cc44cf6512512d083132bd3daf689867c8d9d386888/68747470733a2f2f72657669657761626c652e696f2f7265766965775f627574746f6e2e737667)