Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: scope backfills to only the tables affected by a schema change #55135

Merged
merged 1 commit into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func makeKVFeedCfg(
Sink: buf,
Settings: cfg.Settings,
DB: cfg.DB,
Codec: cfg.Codec,
Clock: cfg.DB.Clock(),
Gossip: cfg.Gossip,
Spans: spans,
Expand Down
52 changes: 52 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,58 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
}
}

// Test schema changes that require a backfill on only some watched tables within a changefeed.
func TestChangefeedSchemaChangeBackfillScope(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)

t.Run(`add column with default`, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE add_column_def (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `CREATE TABLE no_def_change (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO add_column_def VALUES (1)`)
sqlDB.Exec(t, `INSERT INTO add_column_def VALUES (2)`)
sqlDB.Exec(t, `INSERT INTO no_def_change VALUES (3)`)
combinedFeed := feed(t, f, `CREATE CHANGEFEED FOR add_column_def, no_def_change WITH updated`)
defer closeFeed(t, combinedFeed)
assertPayloadsStripTs(t, combinedFeed, []string{
`add_column_def: [1]->{"after": {"a": 1}}`,
`add_column_def: [2]->{"after": {"a": 2}}`,
`no_def_change: [3]->{"after": {"a": 3}}`,
})
sqlDB.Exec(t, `ALTER TABLE add_column_def ADD COLUMN b STRING DEFAULT 'd'`)
ts := fetchDescVersionModificationTime(t, db, f, `add_column_def`, 4)
// Schema change backfill
assertPayloadsStripTs(t, combinedFeed, []string{
`add_column_def: [1]->{"after": {"a": 1}}`,
`add_column_def: [2]->{"after": {"a": 2}}`,
})
// Changefeed level backfill
assertPayloads(t, combinedFeed, []string{
fmt.Sprintf(`add_column_def: [1]->{"after": {"a": 1, "b": "d"}, "updated": "%s"}`,
ts.AsOfSystemTime()),
fmt.Sprintf(`add_column_def: [2]->{"after": {"a": 2, "b": "d"}, "updated": "%s"}`,
ts.AsOfSystemTime()),
})
})

}

t.Run(`sinkless`, sinklessTest(testFn))
t.Run(`enterprise`, enterpriseTest(testFn))
log.Flush()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("cdc ux violation"), log.WithFlattenedSensitiveData)
if err != nil {
t.Fatal(err)
}
if len(entries) > 0 {
t.Fatalf("Found violation of CDC's guarantees: %v", entries)
}
}

// fetchDescVersionModificationTime fetches the `ModificationTime` of the specified
// `version` of `tableName`'s table descriptor.
func fetchDescVersionModificationTime(
Expand Down
21 changes: 18 additions & 3 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -37,6 +38,7 @@ import (
type Config struct {
Settings *cluster.Settings
DB *kv.DB
Codec keys.SQLCodec
Clock *hlc.Clock
Gossip gossip.OptionalGossip
Spans []roachpb.Span
Expand Down Expand Up @@ -94,6 +96,7 @@ func Run(ctx context.Context, cfg Config) error {
cfg.SchemaChangeEvents, cfg.SchemaChangePolicy,
cfg.NeedsInitialScan, cfg.WithDiff,
cfg.InitialHighWater,
cfg.Codec,
sf, sc, pff, bf)
g.GoCtx(f.run)
err := g.Wait()
Expand Down Expand Up @@ -131,6 +134,7 @@ type kvFeed struct {
withInitialBackfill bool
initialHighWater hlc.Timestamp
sink EventBufferWriter
codec keys.SQLCodec

schemaChangeEvents changefeedbase.SchemaChangeEventClass
schemaChangePolicy changefeedbase.SchemaChangePolicy
Expand All @@ -149,6 +153,7 @@ func newKVFeed(
schemaChangePolicy changefeedbase.SchemaChangePolicy,
withInitialBackfill, withDiff bool,
initialHighWater hlc.Timestamp,
codec keys.SQLCodec,
tf schemaFeed,
sc kvScanner,
pff physicalFeedFactory,
Expand All @@ -162,6 +167,7 @@ func newKVFeed(
initialHighWater: initialHighWater,
schemaChangeEvents: schemaChangeEvents,
schemaChangePolicy: schemaChangePolicy,
codec: codec,
tableFeed: tf,
scanner: sc,
physicalFeed: pff,
Expand Down Expand Up @@ -212,12 +218,21 @@ func (f *kvFeed) scanIfShould(
// time with an initial backfill but if you use a cursor then you will get the
// updates after that timestamp.
isInitialScan := initialScan && f.withInitialBackfill
var spansToBackfill []roachpb.Span
if isInitialScan {
scanTime = highWater
spansToBackfill = f.spans
} else if len(events) > 0 {
// TODO(ajwerner): In this case we should only backfill for the tables
// which have events which may not be all of the targets.
// Only backfill for the tables which have events which may not be all
// of the targets.
for _, ev := range events {
tablePrefix := f.codec.TablePrefix(uint32(ev.After.ID))
tableSpan := roachpb.Span{Key: tablePrefix, EndKey: tablePrefix.PrefixEnd()}
for _, sp := range f.spans {
if tableSpan.Overlaps(sp) {
spansToBackfill = append(spansToBackfill, sp)
}
}
if !scanTime.Equal(ev.After.ModificationTime) {
log.Fatalf(ctx, "found event in shouldScan which did not occur at the scan time %v: %v",
scanTime, ev)
Expand All @@ -237,7 +252,7 @@ func (f *kvFeed) scanIfShould(
}

if err := f.scanner.Scan(ctx, f.sink, physicalConfig{
Spans: f.spans,
Spans: spansToBackfill,
Timestamp: scanTime,
WithDiff: !isInitialScan && f.withDiff,
}); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func TestKVFeed(t *testing.T) {
tc.schemaChangeEvents, tc.schemaChangePolicy,
tc.needsInitialScan, tc.withDiff,
tc.initialHighWater,
keys.SystemSQLCodec,
&tf, sf, rangefeedFactory(ref.run), bufferFactory)
ctx, cancel := context.WithCancel(context.Background())
g := ctxgroup.WithContext(ctx)
Expand Down