From 488d871a6f83c66d0ec627adbc2aba27fe376613 Mon Sep 17 00:00:00 2001 From: Matthew Lougheed Date: Mon, 18 Aug 2025 12:36:40 -0400 Subject: [PATCH] changefeedccl: block db-level changefeed creation for non-empty tableset On startup, the changefeed should not be started until there exist tables in the database to watch. If no tables exist at the time of startup, then a tableset watcher is created and polled; the changefeed will be started from where there is first a target table. This patch also makes a change to SHOW CHANGEFEED JOBS to show changefeed jobs with no target tables (ie db-level changefeed waiting for a table to be created). Fixes: #147371 Epic: CRDB-1421 Release note: none --- pkg/ccl/changefeedccl/BUILD.bazel | 1 + pkg/ccl/changefeedccl/changefeed_stmt.go | 138 ++++++++++++++++++++- pkg/ccl/changefeedccl/changefeed_test.go | 151 +++++++++++++++++++++++ pkg/sql/delegate/show_changefeed_jobs.go | 2 +- 4 files changed, 285 insertions(+), 7 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index cbc5ae803844..9d5d1db5c6c0 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -64,6 +64,7 @@ go_library( "//pkg/ccl/changefeedccl/kvfeed", "//pkg/ccl/changefeedccl/resolvedspan", "//pkg/ccl/changefeedccl/schemafeed", + "//pkg/ccl/changefeedccl/tableset", "//pkg/ccl/changefeedccl/timers", "//pkg/ccl/kvccl/kvfollowerreadsccl", "//pkg/ccl/utilccl", diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index c6fd28617d85..eaa2fda04cbf 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/tableset" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/cloud/externalconn" @@ -40,9 +41,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/exprutil" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice" @@ -59,6 +62,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log/channel" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/log/severity" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/span" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -1413,6 +1417,24 @@ func requiresTopicInValue(s Sink) bool { return s.getConcreteType() == sinkTypeWebhook } +// shouldStartDatabaseWatcher determines whether we should initialize a database-level +// tableset watcher. This is only applicable when: +// - there is exactly one target specification, and +// - that specification is a DATABASE target, and +// - there are currently no resolved table targets (i.e., zero unique tables). +func shouldStartDatabaseWatcher( + details jobspb.ChangefeedDetails, targets changefeedbase.Targets, +) bool { + if len(details.TargetSpecifications) != 1 { + return false + } + ts := details.TargetSpecifications[0] + if ts.Type != jobspb.ChangefeedTargetSpecification_DATABASE { + return false + } + return targets.NumUniqueTables() == 0 +} + func makeChangefeedDescription( ctx context.Context, changefeed *tree.CreateChangefeed, @@ -1546,6 +1568,7 @@ type changefeedResumer struct { mu struct { syncutil.Mutex + watcherDiffSent bool // perNodeAggregatorStats is a per component running aggregate of trace // driven AggregatorStats emitted by the processors. perNodeAggregatorStats bulk.ComponentAggregatorStats @@ -1769,6 +1792,12 @@ func (b *changefeedResumer) resumeWithRetries( maxBackoff := changefeedbase.MaxRetryBackoff.Get(&execCfg.Settings.SV) backoffReset := changefeedbase.RetryBackoffReset.Get(&execCfg.Settings.SV) + + // Create memory monitor for tableset watcher. Similar to how processors create + // their monitors at function start, we create this unconditionally. + watcherMemMonitor := execinfra.NewMonitor(ctx, execCfg.DistSQLSrv.ChangefeedMonitor, mon.MakeName("changefeed-tableset-watcher-mem")) + defer watcherMemMonitor.Stop(ctx) + for r := getRetry(ctx, maxBackoff, backoffReset); r.Next(); { flowErr := maybeUpgradePreProductionReadyExpression(ctx, jobID, details, jobExec) @@ -1782,7 +1811,7 @@ func (b *changefeedResumer) resumeWithRetries( knobs.BeforeDistChangefeed() } - confPoller := make(chan struct{}) + changefeedDoneCh := make(chan struct{}) g := ctxgroup.WithContext(ctx) initialHighWater, schemaTS, err := computeDistChangefeedTimestamps(ctx, jobExec, details, localState) if err != nil { @@ -1796,11 +1825,88 @@ func (b *changefeedResumer) resumeWithRetries( if err != nil { return err } + + // This watcher is only used for db-level changefeeds with no watched tables. + var watcher *tableset.Watcher + watcherChan := make(chan []tableset.TableDiff, 1) + var cancelWatcher context.CancelFunc + var watcherCtx context.Context + waitForTables := shouldStartDatabaseWatcher(details, targets) + if waitForTables { + // Create a watcher for the database. + dbID := details.TargetSpecifications[0].DescID + filter := tableset.Filter{ + DatabaseID: dbID, + } + if details.TargetSpecifications[0].FilterList != nil && len(details.TargetSpecifications[0].FilterList.Tables) > 0 { + if details.TargetSpecifications[0].FilterList.FilterType == tree.IncludeFilter { + filter.IncludeTables = make(map[string]struct{}) + for fqTableName := range details.TargetSpecifications[0].FilterList.Tables { + // TODO: use fully qualified names once watcher supports it. + // Extract just the table name from the fully qualified name (e.g., "db.public.table" -> "table") + tn, err := parser.ParseQualifiedTableName(fqTableName) + if err != nil { + return errors.Wrapf(err, "failed to parse name in filter list: %s", fqTableName) + } + filter.IncludeTables[tn.Object()] = struct{}{} + } + } else if details.TargetSpecifications[0].FilterList.FilterType == tree.ExcludeFilter { + filter.ExcludeTables = make(map[string]struct{}) + for fqTableName := range details.TargetSpecifications[0].FilterList.Tables { + // TODO: use fully qualified names once watcher supports it. + // Extract just the table name from the fully qualified name (e.g., "db.public.table" -> "table") + tn, err := parser.ParseQualifiedTableName(fqTableName) + if err != nil { + return errors.Wrapf(err, "failed to parse name in filter list: %s", fqTableName) + } + filter.ExcludeTables[tn.Object()] = struct{}{} + } + } + } + + b.mu.Lock() + b.mu.watcherDiffSent = false + b.mu.Unlock() + + watcher = tableset.NewWatcher(filter, execCfg, watcherMemMonitor, int64(jobID)) + g.GoCtx(func(ctx context.Context) error { + // This method runs the watcher until its context is canceled. + // The watcher context is canceled when diffs are sent to the + // watcherChan or when the the parent group context is canceled. + watcherCtx, cancelWatcher = context.WithCancel(ctx) + defer cancelWatcher() + err := watcher.Start(watcherCtx, schemaTS) + b.mu.Lock() + defer b.mu.Unlock() + if err != nil && !b.mu.watcherDiffSent { + return err + } + return nil + }) + } + + // Run the changefeed until it completes or is shut down. + // If the changefeed's target tableset is empty, it will need to block + // until a diff in the tableset is found from the watcher. g.GoCtx(func(ctx context.Context) error { - defer close(confPoller) - return startDistChangefeed(ctx, jobExec, jobID, schemaTS, details, description, - initialHighWater, localState, startedCh, onTracingEvent, targets) + defer close(changefeedDoneCh) + if waitForTables { + select { + case <-ctx.Done(): + return ctx.Err() + case diffs := <-watcherChan: + schemaTS = diffs[0].AsOf + initialHighWater = diffs[0].AsOf + targets, err = AllTargets(ctx, details, execCfg, schemaTS) + if err != nil { + return err + } + } + } + return startDistChangefeed(ctx, jobExec, jobID, schemaTS, details, description, initialHighWater, localState, startedCh, onTracingEvent, targets) }) + + // Poll for updated configuration or new database tables if hibernating. g.GoCtx(func(ctx context.Context) error { t := time.NewTicker(15 * time.Second) defer t.Stop() @@ -1808,9 +1914,9 @@ func (b *changefeedResumer) resumeWithRetries( select { case <-ctx.Done(): return ctx.Err() - case <-confPoller: + case <-changefeedDoneCh: return nil - case <-t.C: + case tick := <-t.C: newDest, err := reloadDest(ctx, jobID, execCfg) if err != nil { log.Changefeed.Warningf(ctx, "failed to check for updated configuration: %v", err) @@ -1818,6 +1924,26 @@ func (b *changefeedResumer) resumeWithRetries( resolvedDest = newDest return replanErr } + b.mu.Lock() + diffSent := b.mu.watcherDiffSent + b.mu.Unlock() + if waitForTables && !diffSent { + unchanged, diffs, err := watcher.PopUnchangedUpTo(ctx, hlc.Timestamp{WallTime: tick.UnixNano()}) + if err != nil { + return err + } + if !unchanged && len(diffs) > 0 { + select { + case watcherChan <- diffs: + b.mu.Lock() + b.mu.watcherDiffSent = true + b.mu.Unlock() + cancelWatcher() + case <-ctx.Done(): + return ctx.Err() + } + } + } } } }) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index dc0c310c78be..ebfda4fa7653 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -13084,3 +13084,154 @@ func TestDatabaseLevelChangefeedWithInitialScanOptions(t *testing.T) { }) } } + +// TestDatabaseLevelChangefeedEmptyTableset tests that a database-level changefeed +// hibernates while there are no tables in the database. +func TestDatabaseLevelChangefeedEmptyTableset(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFnNoWait := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE DATABASE db`) + sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`) + dbcf := feed(t, f, `CREATE CHANGEFEED FOR DATABASE db`) + defer closeFeed(t, dbcf) + + // create a table + sqlDB.Exec(t, `CREATE TABLE db.foo (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO db.foo VALUES (0, 'initial')`) + + assertPayloads(t, dbcf, []string{ + `foo: [0]->{"after": {"a": 0, "b": "initial"}}`, + }) + } + cdcTest(t, testFnNoWait, feedTestEnterpriseSinks) + + testFnWait := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE DATABASE db`) + sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`) + dbcf := feed(t, f, `CREATE CHANGEFEED FOR DATABASE db`) + defer closeFeed(t, dbcf) + + time.Sleep(5 * time.Second) + + // create a table + sqlDB.Exec(t, `CREATE TABLE db.foo (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO db.foo VALUES (0, 'initial')`) + + assertPayloads(t, dbcf, []string{ + `foo: [0]->{"after": {"a": 0, "b": "initial"}}`, + }) + } + cdcTest(t, testFnWait, feedTestEnterpriseSinks) +} + +// TestDatabaseLevelChangefeedFiltersHibernation tests that a database-level changefeed +// with include/exclude filters correctly handles hibernation: +// - With EXCLUDE filter: creating an excluded table should not wake the changefeed, +// but creating a non-excluded table should wake it. +// - With INCLUDE filter: creating a non-included table should not wake the changefeed, +// but creating an included table should wake it. +func TestDatabaseLevelChangefeedFiltersHibernation(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + full_filter := map[string]string{ + "include": "EXCLUDE TABLES excluded_table", + "exclude": "INCLUDE TABLES included_table", + } + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory, filterType string) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE DATABASE db`) + sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`) + + // Create changefeed with exclude filter - should hibernate since no tables exist + createStmt := fmt.Sprintf(`CREATE CHANGEFEED FOR DATABASE db %s`, full_filter[filterType]) + dbcf := feed(t, f, createStmt) + defer closeFeed(t, dbcf) + + var jobID jobspb.JobID + if ef, ok := dbcf.(cdctest.EnterpriseTestFeed); ok { + jobID = ef.JobID() + } else { + t.Fatal("expected EnterpriseTestFeed") + } + + // Get initial diagram count (should be 0 when hibernating, as no diagram is written yet) + getDiagramCount := func() int { + var count int + sqlDB.QueryRow(t, + `SELECT count(*) FROM system.job_info WHERE job_id = $1 AND info_key LIKE '~dsp-diag-url-%'`, + jobID, + ).Scan(&count) + return count + } + + testutils.SucceedsSoon(t, func() error { + var count int + sqlDB.QueryRow(t, + `SELECT count(*) FROM [SHOW CHANGEFEED JOB $1] WHERE running_status = 'running'`, + jobID, + ).Scan(&count) + return nil + }) + require.Equal(t, 0, getDiagramCount(), "changefeed should be hibernating (no diagram written)") + time.Sleep(20 * time.Second) + require.Equal(t, 0, getDiagramCount(), "changefeed should stay hibernating (no diagram written)") + + // Create a table that is excluded - changefeed should stay hibernating + sqlDB.Exec(t, `CREATE TABLE db.excluded_table (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO db.excluded_table VALUES (0, 'excluded')`) + + // // Verify changefeed stays hibernating (no new diagram written) + time.Sleep(20 * time.Second) + require.Equal(t, 0, getDiagramCount(), "changefeed should stay hibernating (no new diagram written)") + + // Create a table that is NOT excluded - changefeed should wake up + sqlDB.Exec(t, `CREATE TABLE db.included_table (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO db.included_table VALUES (0, 'included')`) + + // Wait for a new diagram to be written (indicating changefeed woke up) + time.Sleep(20 * time.Second) + require.Equal(t, 1, getDiagramCount(), "changefeed should wake up (new diagram written)") + + // Should only receive events from the included table + assertPayloads(t, dbcf, []string{ + `included_table: [0]->{"after": {"a": 0, "b": "included"}}`, + }) + } + testutils.RunValues(t, "filterType", []string{"include", "exclude"}, func(t *testing.T, filterType string) { + runTestFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + testFn(t, s, f, filterType) + } + cdcTest(t, runTestFn, feedTestEnterpriseSinks) + }) +} + +// TestChangefeedWatcherCleanupOnStop verifies that the watcher context is properly +// cleaned up when a changefeed is stopped before receiving any table diffs. +func TestChangefeedWatcherCleanupOnStop(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + sqlDB.Exec(t, `CREATE DATABASE db`) + sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`) + + feed, err := f.Feed(`CREATE CHANGEFEED FOR DATABASE db`) + require.NoError(t, err) + enterpriseFeed := feed.(cdctest.EnterpriseTestFeed) + waitForJobState(sqlDB, t, enterpriseFeed.JobID(), jobs.StateRunning) + + sqlDB.Exec(t, `CANCEL JOB $1`, enterpriseFeed.JobID()) + waitForJobState(sqlDB, t, enterpriseFeed.JobID(), jobs.StateCanceled) + + require.NoError(t, feed.Close()) + } + + cdcTest(t, testFn, feedTestEnterpriseSinks) +} diff --git a/pkg/sql/delegate/show_changefeed_jobs.go b/pkg/sql/delegate/show_changefeed_jobs.go index 3b504ec73a82..e17e6c43ee60 100644 --- a/pkg/sql/delegate/show_changefeed_jobs.go +++ b/pkg/sql/delegate/show_changefeed_jobs.go @@ -104,7 +104,7 @@ SELECT database.name AS database_name FROM crdb_internal.jobs - INNER JOIN targets ON job_id = targets.id + LEFT JOIN targets ON job_id = targets.id INNER JOIN payload ON job_id = payload.id LEFT JOIN database ON job_id = database.id `