Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ func startDistChangefeed(
defer recv.Release()

var finishedSetupFn func(flowinfra.Flow)

if details.SinkURI != `` {
// We abuse the job's results channel to make CREATE CHANGEFEED wait for
// this before returning to the user to ensure the setup went okay. Job
Expand All @@ -322,9 +323,11 @@ func startDistChangefeed(
// returned by resumed jobs, then it breaks instead of returning
// nonsense.
finishedSetupFn = func(flowinfra.Flow) { resultsCh <- tree.Datums(nil) }
}

jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)
// We only store the plan diagram for changefeeds that have a sink since
// sinkless changefeed don't have a job record.
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)
}

// Make sure to use special changefeed monitor going forward as the
// parent monitor for the DistSQL infrastructure. This is needed to
Expand Down
Loading