Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ go_library(
"//pkg/ccl/changefeedccl/kvfeed",
"//pkg/ccl/changefeedccl/resolvedspan",
"//pkg/ccl/changefeedccl/schemafeed",
"//pkg/ccl/changefeedccl/tableset",
"//pkg/ccl/changefeedccl/timers",
"//pkg/ccl/kvccl/kvfollowerreadsccl",
"//pkg/ccl/utilccl",
Expand Down
138 changes: 132 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/tableset"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/externalconn"
Expand All @@ -40,9 +41,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
Expand All @@ -59,6 +62,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/channel"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -1413,6 +1417,24 @@ func requiresTopicInValue(s Sink) bool {
return s.getConcreteType() == sinkTypeWebhook
}

// shouldStartDatabaseWatcher determines whether we should initialize a database-level
// tableset watcher. This is only applicable when:
// - there is exactly one target specification, and
// - that specification is a DATABASE target, and
// - there are currently no resolved table targets (i.e., zero unique tables).
func shouldStartDatabaseWatcher(
details jobspb.ChangefeedDetails, targets changefeedbase.Targets,
) bool {
if len(details.TargetSpecifications) != 1 {
return false
}
ts := details.TargetSpecifications[0]
if ts.Type != jobspb.ChangefeedTargetSpecification_DATABASE {
return false
}
return targets.NumUniqueTables() == 0
}

func makeChangefeedDescription(
ctx context.Context,
changefeed *tree.CreateChangefeed,
Expand Down Expand Up @@ -1546,6 +1568,7 @@ type changefeedResumer struct {

mu struct {
syncutil.Mutex
watcherDiffSent bool
// perNodeAggregatorStats is a per component running aggregate of trace
// driven AggregatorStats emitted by the processors.
perNodeAggregatorStats bulk.ComponentAggregatorStats
Expand Down Expand Up @@ -1769,6 +1792,12 @@ func (b *changefeedResumer) resumeWithRetries(

maxBackoff := changefeedbase.MaxRetryBackoff.Get(&execCfg.Settings.SV)
backoffReset := changefeedbase.RetryBackoffReset.Get(&execCfg.Settings.SV)

// Create memory monitor for tableset watcher. Similar to how processors create
// their monitors at function start, we create this unconditionally.
watcherMemMonitor := execinfra.NewMonitor(ctx, execCfg.DistSQLSrv.ChangefeedMonitor, mon.MakeName("changefeed-tableset-watcher-mem"))
defer watcherMemMonitor.Stop(ctx)

for r := getRetry(ctx, maxBackoff, backoffReset); r.Next(); {
flowErr := maybeUpgradePreProductionReadyExpression(ctx, jobID, details, jobExec)

Expand All @@ -1782,7 +1811,7 @@ func (b *changefeedResumer) resumeWithRetries(
knobs.BeforeDistChangefeed()
}

confPoller := make(chan struct{})
changefeedDoneCh := make(chan struct{})
g := ctxgroup.WithContext(ctx)
initialHighWater, schemaTS, err := computeDistChangefeedTimestamps(ctx, jobExec, details, localState)
if err != nil {
Expand All @@ -1796,28 +1825,125 @@ func (b *changefeedResumer) resumeWithRetries(
if err != nil {
return err
}

// This watcher is only used for db-level changefeeds with no watched tables.
var watcher *tableset.Watcher
watcherChan := make(chan []tableset.TableDiff, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the poller goroutine doesn't block on sending the diff and it can keep polling. Though the goroutine waiting for the diff, before starting the changefeed, should already be listening on this channel.

var cancelWatcher context.CancelFunc
var watcherCtx context.Context
waitForTables := shouldStartDatabaseWatcher(details, targets)
if waitForTables {
// Create a watcher for the database.
dbID := details.TargetSpecifications[0].DescID
filter := tableset.Filter{
DatabaseID: dbID,
}
if details.TargetSpecifications[0].FilterList != nil && len(details.TargetSpecifications[0].FilterList.Tables) > 0 {
if details.TargetSpecifications[0].FilterList.FilterType == tree.IncludeFilter {
filter.IncludeTables = make(map[string]struct{})
for fqTableName := range details.TargetSpecifications[0].FilterList.Tables {
// TODO: use fully qualified names once watcher supports it.
// Extract just the table name from the fully qualified name (e.g., "db.public.table" -> "table")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely want to keep at least the schema name. Will need to wait for @asg0451 's change to the tableset watcher to know what to pass in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update this once that inflight change is ready

tn, err := parser.ParseQualifiedTableName(fqTableName)
if err != nil {
return errors.Wrapf(err, "failed to parse name in filter list: %s", fqTableName)
}
filter.IncludeTables[tn.Object()] = struct{}{}
}
} else if details.TargetSpecifications[0].FilterList.FilterType == tree.ExcludeFilter {
filter.ExcludeTables = make(map[string]struct{})
for fqTableName := range details.TargetSpecifications[0].FilterList.Tables {
// TODO: use fully qualified names once watcher supports it.
// Extract just the table name from the fully qualified name (e.g., "db.public.table" -> "table")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flagging this for removal before merge.

tn, err := parser.ParseQualifiedTableName(fqTableName)
if err != nil {
return errors.Wrapf(err, "failed to parse name in filter list: %s", fqTableName)
}
filter.ExcludeTables[tn.Object()] = struct{}{}
}
}
}

b.mu.Lock()
b.mu.watcherDiffSent = false
b.mu.Unlock()

watcher = tableset.NewWatcher(filter, execCfg, watcherMemMonitor, int64(jobID))
g.GoCtx(func(ctx context.Context) error {
// This method runs the watcher until its context is canceled.
// The watcher context is canceled when diffs are sent to the
// watcherChan or when the the parent group context is canceled.
watcherCtx, cancelWatcher = context.WithCancel(ctx)
defer cancelWatcher()
err := watcher.Start(watcherCtx, schemaTS)
b.mu.Lock()
defer b.mu.Unlock()
if err != nil && !b.mu.watcherDiffSent {
return err
}
return nil
})
}

// Run the changefeed until it completes or is shut down.
// If the changefeed's target tableset is empty, it will need to block
// until a diff in the tableset is found from the watcher.
g.GoCtx(func(ctx context.Context) error {
defer close(confPoller)
return startDistChangefeed(ctx, jobExec, jobID, schemaTS, details, description,
initialHighWater, localState, startedCh, onTracingEvent, targets)
defer close(changefeedDoneCh)
if waitForTables {
select {
case <-ctx.Done():
return ctx.Err()
case diffs := <-watcherChan:
schemaTS = diffs[0].AsOf
initialHighWater = diffs[0].AsOf
targets, err = AllTargets(ctx, details, execCfg, schemaTS)
if err != nil {
return err
Comment on lines +1894 to +1902
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we wait for the first event from the watcher before starting? i dont think that's right... what if the tables are already there and none change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startDistChangefeed is outside of the if watcher != nil. There's only a watcher created if needed, so it would otherwise go right into running the changefeed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the conditional check for this if statement so it's more clear. But it will only enter into this select statement when there was an empty tableset and the watcher was created. If there's already tables, it will just go straight to running startDistChangefeed.

}
}
}
return startDistChangefeed(ctx, jobExec, jobID, schemaTS, details, description, initialHighWater, localState, startedCh, onTracingEvent, targets)
})

// Poll for updated configuration or new database tables if hibernating.
g.GoCtx(func(ctx context.Context) error {
t := time.NewTicker(15 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-confPoller:
case <-changefeedDoneCh:
return nil
case <-t.C:
case tick := <-t.C:
newDest, err := reloadDest(ctx, jobID, execCfg)
if err != nil {
log.Changefeed.Warningf(ctx, "failed to check for updated configuration: %v", err)
} else if newDest != resolvedDest {
resolvedDest = newDest
return replanErr
}
b.mu.Lock()
diffSent := b.mu.watcherDiffSent
b.mu.Unlock()
if waitForTables && !diffSent {
unchanged, diffs, err := watcher.PopUnchangedUpTo(ctx, hlc.Timestamp{WallTime: tick.UnixNano()})
if err != nil {
return err
}
if !unchanged && len(diffs) > 0 {
select {
case watcherChan <- diffs:
b.mu.Lock()
b.mu.watcherDiffSent = true
b.mu.Unlock()
cancelWatcher()
case <-ctx.Done():
return ctx.Err()
}
}
}
}
}
})
Expand Down
151 changes: 151 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13084,3 +13084,154 @@ func TestDatabaseLevelChangefeedWithInitialScanOptions(t *testing.T) {
})
}
}

// TestDatabaseLevelChangefeedEmptyTableset tests that a database-level changefeed
// hibernates while there are no tables in the database.
func TestDatabaseLevelChangefeedEmptyTableset(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFnNoWait := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE DATABASE db`)
sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`)
dbcf := feed(t, f, `CREATE CHANGEFEED FOR DATABASE db`)
defer closeFeed(t, dbcf)

// create a table
sqlDB.Exec(t, `CREATE TABLE db.foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO db.foo VALUES (0, 'initial')`)

assertPayloads(t, dbcf, []string{
`foo: [0]->{"after": {"a": 0, "b": "initial"}}`,
})
}
cdcTest(t, testFnNoWait, feedTestEnterpriseSinks)

testFnWait := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE DATABASE db`)
sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`)
dbcf := feed(t, f, `CREATE CHANGEFEED FOR DATABASE db`)
defer closeFeed(t, dbcf)

time.Sleep(5 * time.Second)

// create a table
sqlDB.Exec(t, `CREATE TABLE db.foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO db.foo VALUES (0, 'initial')`)

assertPayloads(t, dbcf, []string{
`foo: [0]->{"after": {"a": 0, "b": "initial"}}`,
})
}
cdcTest(t, testFnWait, feedTestEnterpriseSinks)
}

// TestDatabaseLevelChangefeedFiltersHibernation tests that a database-level changefeed
// with include/exclude filters correctly handles hibernation:
// - With EXCLUDE filter: creating an excluded table should not wake the changefeed,
// but creating a non-excluded table should wake it.
// - With INCLUDE filter: creating a non-included table should not wake the changefeed,
// but creating an included table should wake it.
func TestDatabaseLevelChangefeedFiltersHibernation(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

full_filter := map[string]string{
"include": "EXCLUDE TABLES excluded_table",
"exclude": "INCLUDE TABLES included_table",
}
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory, filterType string) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE DATABASE db`)
sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`)

// Create changefeed with exclude filter - should hibernate since no tables exist
createStmt := fmt.Sprintf(`CREATE CHANGEFEED FOR DATABASE db %s`, full_filter[filterType])
dbcf := feed(t, f, createStmt)
defer closeFeed(t, dbcf)

var jobID jobspb.JobID
if ef, ok := dbcf.(cdctest.EnterpriseTestFeed); ok {
jobID = ef.JobID()
} else {
t.Fatal("expected EnterpriseTestFeed")
}

// Get initial diagram count (should be 0 when hibernating, as no diagram is written yet)
getDiagramCount := func() int {
var count int
sqlDB.QueryRow(t,
`SELECT count(*) FROM system.job_info WHERE job_id = $1 AND info_key LIKE '~dsp-diag-url-%'`,
jobID,
).Scan(&count)
return count
}

testutils.SucceedsSoon(t, func() error {
var count int
sqlDB.QueryRow(t,
`SELECT count(*) FROM [SHOW CHANGEFEED JOB $1] WHERE running_status = 'running'`,
jobID,
).Scan(&count)
return nil
})
require.Equal(t, 0, getDiagramCount(), "changefeed should be hibernating (no diagram written)")
time.Sleep(20 * time.Second)
require.Equal(t, 0, getDiagramCount(), "changefeed should stay hibernating (no diagram written)")

// Create a table that is excluded - changefeed should stay hibernating
sqlDB.Exec(t, `CREATE TABLE db.excluded_table (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO db.excluded_table VALUES (0, 'excluded')`)

// // Verify changefeed stays hibernating (no new diagram written)
time.Sleep(20 * time.Second)
require.Equal(t, 0, getDiagramCount(), "changefeed should stay hibernating (no new diagram written)")

// Create a table that is NOT excluded - changefeed should wake up
sqlDB.Exec(t, `CREATE TABLE db.included_table (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO db.included_table VALUES (0, 'included')`)

// Wait for a new diagram to be written (indicating changefeed woke up)
time.Sleep(20 * time.Second)
require.Equal(t, 1, getDiagramCount(), "changefeed should wake up (new diagram written)")

// Should only receive events from the included table
assertPayloads(t, dbcf, []string{
`included_table: [0]->{"after": {"a": 0, "b": "included"}}`,
})
}
testutils.RunValues(t, "filterType", []string{"include", "exclude"}, func(t *testing.T, filterType string) {
runTestFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
testFn(t, s, f, filterType)
}
cdcTest(t, runTestFn, feedTestEnterpriseSinks)
})
}

// TestChangefeedWatcherCleanupOnStop verifies that the watcher context is properly
// cleaned up when a changefeed is stopped before receiving any table diffs.
func TestChangefeedWatcherCleanupOnStop(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `CREATE DATABASE db`)
sqlDB.Exec(t, `GRANT CHANGEFEED ON DATABASE db TO enterprisefeeduser`)

feed, err := f.Feed(`CREATE CHANGEFEED FOR DATABASE db`)
require.NoError(t, err)
enterpriseFeed := feed.(cdctest.EnterpriseTestFeed)
waitForJobState(sqlDB, t, enterpriseFeed.JobID(), jobs.StateRunning)

sqlDB.Exec(t, `CANCEL JOB $1`, enterpriseFeed.JobID())
waitForJobState(sqlDB, t, enterpriseFeed.JobID(), jobs.StateCanceled)

require.NoError(t, feed.Close())
}

cdcTest(t, testFn, feedTestEnterpriseSinks)
}
2 changes: 1 addition & 1 deletion pkg/sql/delegate/show_changefeed_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ SELECT
database.name AS database_name
FROM
crdb_internal.jobs
INNER JOIN targets ON job_id = targets.id
LEFT JOIN targets ON job_id = targets.id
INNER JOIN payload ON job_id = payload.id
LEFT JOIN database ON job_id = database.id
`
Expand Down
Loading