Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Reduce message duplicates during node restart #102717

Merged
merged 1 commit into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ server.oidc_authentication.scopes string openid sets OIDC scopes to include with
server.rangelog.ttl duration 720h0m0s if nonzero, entries in system.rangelog older than this duration are periodically purged tenant-rw
server.shutdown.connection_wait duration 0s the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw
server.shutdown.drain_wait duration 0s the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.) tenant-rw
server.shutdown.jobs_wait duration 10s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw
server.shutdown.query_wait duration 10s the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw
server.time_until_store_dead duration 5m0s the time after which if there is no new gossiped information about a store, it is considered dead tenant-rw
server.user_login.cert_password_method.auto_scram_promotion.enabled boolean true whether to automatically promote cert-password authentication to use SCRAM tenant-rw
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
<tr><td><div id="setting-server-secondary-tenants-redact-trace-enabled" class="anchored"><code>server.secondary_tenants.redact_trace.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>controls if server side traces are redacted for tenant operations</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-connection-wait" class="anchored"><code>server.shutdown.connection_wait</code></div></td><td>duration</td><td><code>0s</code></td><td>the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-drain-wait" class="anchored"><code>server.shutdown.drain_wait</code></div></td><td>duration</td><td><code>0s</code></td><td>the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-jobs-wait" class="anchored"><code>server.shutdown.jobs_wait</code></div></td><td>duration</td><td><code>10s</code></td><td>the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-lease-transfer-wait" class="anchored"><code>server.shutdown.lease_transfer_wait</code></div></td><td>duration</td><td><code>5s</code></td><td>the timeout for a single iteration of the range lease transfer phase of draining (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-query-wait" class="anchored"><code>server.shutdown.query_wait</code></div></td><td>duration</td><td><code>10s</code></td><td>the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-time-until-store-dead" class="anchored"><code>server.time_until_store_dead</code></div></td><td>duration</td><td><code>5m0s</code></td><td>the time after which if there is no new gossiped information about a store, it is considered dead</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
80 changes: 59 additions & 21 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ func distChangefeedFlow(
execCtx sql.JobExecContext,
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
progress jobspb.Progress,
localState *cachedState,
resultsCh chan<- tree.Datums,
) error {

opts := changefeedbase.MakeStatementOptions(details.Opts)
progress := localState.progress

// NB: A non-empty high water indicates that we have checkpointed a resolved
// timestamp. Skipping the initial scan is equivalent to starting the
Expand Down Expand Up @@ -123,13 +123,8 @@ func distChangefeedFlow(
}
}

var checkpoint jobspb.ChangefeedProgress_Checkpoint
if cf := progress.GetChangefeed(); cf != nil && cf.Checkpoint != nil {
checkpoint = *cf.Checkpoint
}

return startDistChangefeed(
ctx, execCtx, jobID, schemaTS, details, initialHighWater, checkpoint, resultsCh)
ctx, execCtx, jobID, schemaTS, details, initialHighWater, localState, resultsCh)
}

func fetchTableDescriptors(
Expand Down Expand Up @@ -237,7 +232,7 @@ func startDistChangefeed(
schemaTS hlc.Timestamp,
details jobspb.ChangefeedDetails,
initialHighWater hlc.Timestamp,
checkpoint jobspb.ChangefeedProgress_Checkpoint,
localState *cachedState,
resultsCh chan<- tree.Datums,
) error {
execCfg := execCtx.ExecCfg()
Expand All @@ -253,6 +248,7 @@ func startDistChangefeed(
if err != nil {
return err
}
localState.trackedSpans = trackedSpans
cfKnobs := execCfg.DistSQLSrv.TestingKnobs.Changefeed

// Changefeed flows handle transactional consistency themselves.
Expand All @@ -261,7 +257,12 @@ func startDistChangefeed(
dsp := execCtx.DistSQLPlanner()
evalCtx := execCtx.ExtendedEvalContext()

p, planCtx, err := makePlan(execCtx, jobID, details, initialHighWater, checkpoint, trackedSpans)(ctx, dsp)
var checkpoint *jobspb.ChangefeedProgress_Checkpoint
if progress := localState.progress.GetChangefeed(); progress != nil && progress.Checkpoint != nil {
checkpoint = progress.Checkpoint
}
p, planCtx, err := makePlan(execCtx, jobID, details, initialHighWater,
trackedSpans, checkpoint, localState.drainingNodes)(ctx, dsp)
if err != nil {
return err
}
Expand All @@ -275,9 +276,12 @@ func startDistChangefeed(
replanOracle = knobs.ShouldReplan
}

var replanNoCheckpoint *jobspb.ChangefeedProgress_Checkpoint
var replanNoDrainingNodes []roachpb.NodeID
replanner, stopReplanner := sql.PhysicalPlanChangeChecker(ctx,
p,
makePlan(execCtx, jobID, details, initialHighWater, checkpoint, trackedSpans),
makePlan(execCtx, jobID, details, initialHighWater,
trackedSpans, replanNoCheckpoint, replanNoDrainingNodes),
execCtx,
replanOracle,
func() time.Duration { return replanChangefeedFrequency.Get(execCtx.ExecCfg().SV()) },
Expand All @@ -290,7 +294,23 @@ func startDistChangefeed(
ctx, cancel := execCtx.ExecCfg().DistSQLSrv.Stopper.WithCancelOnQuiesce(ctx)
defer cancel()

resultRows := makeChangefeedResultWriter(resultsCh, cancel)
// clear out previous drain/shutdown information.
localState.drainingNodes = localState.drainingNodes[:0]
localState.aggregatorFrontier = localState.aggregatorFrontier[:0]

resultRows := sql.NewMetadataCallbackWriter(
makeChangefeedResultWriter(resultsCh, cancel),
func(ctx context.Context, meta *execinfrapb.ProducerMetadata) error {
if meta.Changefeed != nil {
if meta.Changefeed.DrainInfo != nil {
localState.drainingNodes = append(localState.drainingNodes, meta.Changefeed.DrainInfo.NodeID)
}
localState.aggregatorFrontier = append(localState.aggregatorFrontier, meta.Changefeed.Checkpoint...)
}
return nil
},
)

recv := sql.MakeDistSQLReceiver(
ctx,
resultRows,
Expand Down Expand Up @@ -343,8 +363,9 @@ func makePlan(
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
initialHighWater hlc.Timestamp,
checkpoint jobspb.ChangefeedProgress_Checkpoint,
trackedSpans []roachpb.Span,
checkpoint *jobspb.ChangefeedProgress_Checkpoint,
drainingNodes []roachpb.NodeID,
) func(context.Context, *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
return func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
var blankTxn *kv.Txn
Expand All @@ -369,6 +390,15 @@ func makePlan(
return nil, nil, err
}

cfKnobs := execCtx.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed
if knobs, ok := cfKnobs.(*TestingKnobs); ok && knobs != nil &&
knobs.FilterDrainingNodes != nil && len(drainingNodes) > 0 {
spanPartitions, err = knobs.FilterDrainingNodes(spanPartitions, drainingNodes)
if err != nil {
return nil, nil, err
}
}

sv := &execCtx.ExecCfg().Settings.SV
if enableBalancedRangeDistribution.Get(sv) {
scanType, err := changefeedbase.MakeStatementOptions(details.Opts).GetInitialScanType()
Expand All @@ -393,13 +423,14 @@ func makePlan(
// Use the same checkpoint for all aggregators; each aggregator will only look at
// spans that are assigned to it.
// We could compute per-aggregator checkpoint, but that's probably an overkill.
aggregatorCheckpoint := execinfrapb.ChangeAggregatorSpec_Checkpoint{
Spans: checkpoint.Spans,
Timestamp: checkpoint.Timestamp,
}

var aggregatorCheckpoint execinfrapb.ChangeAggregatorSpec_Checkpoint
var checkpointSpanGroup roachpb.SpanGroup
checkpointSpanGroup.Add(checkpoint.Spans...)

if checkpoint != nil {
checkpointSpanGroup.Add(checkpoint.Spans...)
aggregatorCheckpoint.Spans = checkpoint.Spans
aggregatorCheckpoint.Timestamp = checkpoint.Timestamp
}

aggregatorSpecs := make([]*execinfrapb.ChangeAggregatorSpec, len(spanPartitions))
for i, sp := range spanPartitions {
Expand Down Expand Up @@ -436,7 +467,6 @@ func makePlan(
UserProto: execCtx.User().EncodeProto(),
}

cfKnobs := execCtx.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed
if knobs, ok := cfKnobs.(*TestingKnobs); ok && knobs != nil && knobs.OnDistflowSpec != nil {
knobs.OnDistflowSpec(aggregatorSpecs, &changeFrontierSpec)
}
Expand Down Expand Up @@ -496,7 +526,15 @@ func (w *changefeedResultWriter) SetRowsAffected(ctx context.Context, n int) {
}
func (w *changefeedResultWriter) SetError(err error) {
w.err = err
w.cancel()
switch {
case errors.Is(err, changefeedbase.ErrNodeDraining):
// Let drain signal proceed w/out cancellation.
// We want to make sure change frontier processor gets a chance
// to send out cancellation to the aggregator so that everything
// transitions to "drain metadata" stage.
default:
w.cancel()
}
}

func (w *changefeedResultWriter) Err() error {
Expand Down
Loading