Skip to content

Commit

Permalink
changefeedccl: Reduce message duplicates during node restart
Browse files Browse the repository at this point in the history
Changefeeds have at least once semantics.  These semantics
are implemnted via reliance on the closed timestamp system,
which drives the changefeed checkpointing logic.

When the node is restarted (gracefully) by draining the node,
the changefeed will restart, re-emitting messages written since
the last successfull checkpoint.

During rolling cluster restarts, however, this behavior results
in an almost quadratic behavior with respect to duplicate messages
(e.g. a node may be drained every 5 minutes, and the checkpoint
maybe produced every 5 minutes -- with each subsequent node
being drained right before successfull checkpoint).

This PR addresses the issue of duplicates during node (and cluster)
restarts.

First, this information is exposed via `OnDrain` channel made available
in jobs registry.  This channel is signaled when the node begins
its drain process, and there is a wait period, configured via
`server.shutdown.jobs_wait` setting -- 10 seconds by default -- before
the registry will shut down, cancelling all currently running jobs.

During this time, the changefeed aggregator running on the node being
drained, detects this and shuts down -- transmitting its full frontier
to the changfeed coordinator.  Coordinator, in turn, sends signal
to the remaining aggregators to exit -- also transmissintg their
up-to-date frontier information.

Prior to retrying the changefeed, an up-to-date frontier is
reconstructed, persisted to the jobs table as needed, and the
changefeed flow is replanned, avoiding the node that is being drained.

Epic: CRDB-26978

Release note (enterprise change): Changefeeds emit significantly
fewer duplicate messages during node/cluster restarts.
  • Loading branch information
Yevgeniy Miretskiy committed May 8, 2023
1 parent 2973e83 commit 880aaf1
Show file tree
Hide file tree
Showing 17 changed files with 665 additions and 132 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ server.oidc_authentication.scopes string openid sets OIDC scopes to include with
server.rangelog.ttl duration 720h0m0s if nonzero, entries in system.rangelog older than this duration are periodically purged tenant-rw
server.shutdown.connection_wait duration 0s the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw
server.shutdown.drain_wait duration 0s the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.) tenant-rw
server.shutdown.jobs_wait duration 10s the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown tenant-rw
server.shutdown.query_wait duration 10s the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting) tenant-rw
server.time_until_store_dead duration 5m0s the time after which if there is no new gossiped information about a store, it is considered dead tenant-rw
server.user_login.cert_password_method.auto_scram_promotion.enabled boolean true whether to automatically promote cert-password authentication to use SCRAM tenant-rw
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
<tr><td><div id="setting-server-secondary-tenants-redact-trace-enabled" class="anchored"><code>server.secondary_tenants.redact_trace.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>controls if server side traces are redacted for tenant operations</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-connection-wait" class="anchored"><code>server.shutdown.connection_wait</code></div></td><td>duration</td><td><code>0s</code></td><td>the maximum amount of time a server waits for all SQL connections to be closed before proceeding with a drain. (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-drain-wait" class="anchored"><code>server.shutdown.drain_wait</code></div></td><td>duration</td><td><code>0s</code></td><td>the amount of time a server waits in an unready state before proceeding with a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting. --drain-wait is to specify the duration of the whole draining process, while server.shutdown.drain_wait is to set the wait time for health probes to notice that the node is not ready.)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-jobs-wait" class="anchored"><code>server.shutdown.jobs_wait</code></div></td><td>duration</td><td><code>10s</code></td><td>the maximum amount of time a server waits for all currently executing jobs to notice drain request and to perform orderly shutdown</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-lease-transfer-wait" class="anchored"><code>server.shutdown.lease_transfer_wait</code></div></td><td>duration</td><td><code>5s</code></td><td>the timeout for a single iteration of the range lease transfer phase of draining (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-shutdown-query-wait" class="anchored"><code>server.shutdown.query_wait</code></div></td><td>duration</td><td><code>10s</code></td><td>the timeout for waiting for active queries to finish during a drain (note that the --drain-wait parameter for cockroach node drain may need adjustment after changing this setting)</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-server-time-until-store-dead" class="anchored"><code>server.time_until_store_dead</code></div></td><td>duration</td><td><code>5m0s</code></td><td>the time after which if there is no new gossiped information about a store, it is considered dead</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
103 changes: 82 additions & 21 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ func distChangefeedFlow(
execCtx sql.JobExecContext,
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
progress jobspb.Progress,
localState *cachedState,
resultsCh chan<- tree.Datums,
) error {

opts := changefeedbase.MakeStatementOptions(details.Opts)
progress := localState.progress

// NB: A non-empty high water indicates that we have checkpointed a resolved
// timestamp. Skipping the initial scan is equivalent to starting the
Expand Down Expand Up @@ -123,13 +123,8 @@ func distChangefeedFlow(
}
}

var checkpoint jobspb.ChangefeedProgress_Checkpoint
if cf := progress.GetChangefeed(); cf != nil && cf.Checkpoint != nil {
checkpoint = *cf.Checkpoint
}

return startDistChangefeed(
ctx, execCtx, jobID, schemaTS, details, initialHighWater, checkpoint, resultsCh)
ctx, execCtx, jobID, schemaTS, details, initialHighWater, localState, resultsCh)
}

func fetchTableDescriptors(
Expand Down Expand Up @@ -237,7 +232,7 @@ func startDistChangefeed(
schemaTS hlc.Timestamp,
details jobspb.ChangefeedDetails,
initialHighWater hlc.Timestamp,
checkpoint jobspb.ChangefeedProgress_Checkpoint,
localState *cachedState,
resultsCh chan<- tree.Datums,
) error {
execCfg := execCtx.ExecCfg()
Expand All @@ -253,6 +248,7 @@ func startDistChangefeed(
if err != nil {
return err
}
localState.trackedSpans = trackedSpans
cfKnobs := execCfg.DistSQLSrv.TestingKnobs.Changefeed

// Changefeed flows handle transactional consistency themselves.
Expand All @@ -261,7 +257,12 @@ func startDistChangefeed(
dsp := execCtx.DistSQLPlanner()
evalCtx := execCtx.ExtendedEvalContext()

p, planCtx, err := makePlan(execCtx, jobID, details, initialHighWater, checkpoint, trackedSpans)(ctx, dsp)
var checkpoint *jobspb.ChangefeedProgress_Checkpoint
if progress := localState.progress.GetChangefeed(); progress != nil && progress.Checkpoint != nil {
checkpoint = progress.Checkpoint
}
p, planCtx, err := makePlan(execCtx, jobID, details, initialHighWater,
trackedSpans, checkpoint, localState.drainingNodes)(ctx, dsp)
if err != nil {
return err
}
Expand All @@ -275,9 +276,12 @@ func startDistChangefeed(
replanOracle = knobs.ShouldReplan
}

var replanNoCheckpoint *jobspb.ChangefeedProgress_Checkpoint
var replanNoDrainingNodes []roachpb.NodeID
replanner, stopReplanner := sql.PhysicalPlanChangeChecker(ctx,
p,
makePlan(execCtx, jobID, details, initialHighWater, checkpoint, trackedSpans),
makePlan(execCtx, jobID, details, initialHighWater,
trackedSpans, replanNoCheckpoint, replanNoDrainingNodes),
execCtx,
replanOracle,
func() time.Duration { return replanChangefeedFrequency.Get(execCtx.ExecCfg().SV()) },
Expand All @@ -290,7 +294,23 @@ func startDistChangefeed(
ctx, cancel := execCtx.ExecCfg().DistSQLSrv.Stopper.WithCancelOnQuiesce(ctx)
defer cancel()

resultRows := makeChangefeedResultWriter(resultsCh, cancel)
// clear out previous drain/shutdown information.
localState.drainingNodes = localState.drainingNodes[:0]
localState.aggregatorFrontier = localState.aggregatorFrontier[:0]

resultRows := sql.NewMetadataCallbackWriter(
makeChangefeedResultWriter(resultsCh, cancel),
func(ctx context.Context, meta *execinfrapb.ProducerMetadata) error {
if meta.Changefeed != nil {
if meta.Changefeed.DrainInfo != nil {
localState.drainingNodes = append(localState.drainingNodes, meta.Changefeed.DrainInfo.NodeID)
}
localState.aggregatorFrontier = append(localState.aggregatorFrontier, meta.Changefeed.Checkpoint...)
}
return nil
},
)

recv := sql.MakeDistSQLReceiver(
ctx,
resultRows,
Expand Down Expand Up @@ -343,8 +363,9 @@ func makePlan(
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
initialHighWater hlc.Timestamp,
checkpoint jobspb.ChangefeedProgress_Checkpoint,
trackedSpans []roachpb.Span,
checkpoint *jobspb.ChangefeedProgress_Checkpoint,
drainingNodes []roachpb.NodeID,
) func(context.Context, *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
return func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
var blankTxn *kv.Txn
Expand All @@ -369,6 +390,12 @@ func makePlan(
return nil, nil, err
}

cfKnobs := execCtx.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed
if knobs, ok := cfKnobs.(*TestingKnobs); ok && knobs != nil &&
knobs.FilterDrainingNodes && len(drainingNodes) > 0 {
spanPartitions = testingFilterDrainingNodes(spanPartitions, drainingNodes)
}

sv := &execCtx.ExecCfg().Settings.SV
if enableBalancedRangeDistribution.Get(sv) {
scanType, err := changefeedbase.MakeStatementOptions(details.Opts).GetInitialScanType()
Expand All @@ -393,13 +420,14 @@ func makePlan(
// Use the same checkpoint for all aggregators; each aggregator will only look at
// spans that are assigned to it.
// We could compute per-aggregator checkpoint, but that's probably an overkill.
aggregatorCheckpoint := execinfrapb.ChangeAggregatorSpec_Checkpoint{
Spans: checkpoint.Spans,
Timestamp: checkpoint.Timestamp,
}

var aggregatorCheckpoint execinfrapb.ChangeAggregatorSpec_Checkpoint
var checkpointSpanGroup roachpb.SpanGroup
checkpointSpanGroup.Add(checkpoint.Spans...)

if checkpoint != nil {
checkpointSpanGroup.Add(checkpoint.Spans...)
aggregatorCheckpoint.Spans = checkpoint.Spans
aggregatorCheckpoint.Timestamp = checkpoint.Timestamp
}

aggregatorSpecs := make([]*execinfrapb.ChangeAggregatorSpec, len(spanPartitions))
for i, sp := range spanPartitions {
Expand Down Expand Up @@ -436,7 +464,6 @@ func makePlan(
UserProto: execCtx.User().EncodeProto(),
}

cfKnobs := execCtx.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed
if knobs, ok := cfKnobs.(*TestingKnobs); ok && knobs != nil && knobs.OnDistflowSpec != nil {
knobs.OnDistflowSpec(aggregatorSpecs, &changeFrontierSpec)
}
Expand Down Expand Up @@ -464,6 +491,32 @@ func makePlan(
}
}

func testingFilterDrainingNodes(
partitions []sql.SpanPartition, draining []roachpb.NodeID,
) []sql.SpanPartition {
skip := map[roachpb.NodeID]struct{}{}
for _, n := range draining {
skip[n] = struct{}{}
}
var filtered []sql.SpanPartition
var filteredSpans []roachpb.Span
for _, p := range partitions {
if _, skip := skip[roachpb.NodeID(p.SQLInstanceID)]; skip {
filteredSpans = append(filteredSpans, p.Spans...)
} else {
filtered = append(filtered, p)
}
}
if len(filtered) == 0 {
return partitions // Maybe panic.
}
if len(filteredSpans) == 0 {
return partitions
}
filtered[0].Spans = append(filtered[0].Spans, filteredSpans...)
return filtered
}

// changefeedResultWriter implements the `sql.rowResultWriter` that sends
// the received rows back over the given channel.
type changefeedResultWriter struct {
Expand Down Expand Up @@ -496,7 +549,15 @@ func (w *changefeedResultWriter) SetRowsAffected(ctx context.Context, n int) {
}
func (w *changefeedResultWriter) SetError(err error) {
w.err = err
w.cancel()
switch {
case errors.Is(err, changefeedbase.ErrNodeDraining):
// Let drain signal proceed w/out cancellation.
// We want to make sure change frontier processor gets a chance
// to send out cancellation to the aggregator so that everything
// transitions to "drain metadata" stage.
default:
w.cancel()
}
}

func (w *changefeedResultWriter) Err() error {
Expand Down
Loading

0 comments on commit 880aaf1

Please sign in to comment.