Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: add observability into kvfeed restarts #124635

Open
andyyang890 opened this issue May 23, 2024 · 1 comment
Open

changefeedccl: add observability into kvfeed restarts #124635

andyyang890 opened this issue May 23, 2024 · 1 comment
Assignees
Labels
A-cdc Change Data Capture C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) O-support Would prevent or help troubleshoot a customer escalation - bugs, missing observability/tooling, docs P-2 Issues/test failures with a fix SLA of 3 months T-cdc

Comments

@andyyang890
Copy link
Collaborator

andyyang890 commented May 23, 2024

This code in (*kvfeed.kvFeed).run surrounding restarts due to schema changes does not have any observability in the form of either logs or metrics:

for i := 0; ; i++ {
initialScan := i == 0
initialScanOnly := f.endTime.EqOrdering(f.initialHighWater)
scannedSpans, scannedTS, err := f.scanIfShould(ctx, initialScan, initialScanOnly, rangeFeedResumeFrontier.Frontier())
if err != nil {
return err
}
// We have scanned scannedSpans up to and including scannedTS. Advance frontier
// for those spans. Note, since rangefeed start time is *exclusive* (that it, rangefeed
// starts from timestamp.Next()), we advanced frontier to the scannedTS.
for _, sp := range scannedSpans {
if _, err := rangeFeedResumeFrontier.Forward(sp, scannedTS); err != nil {
return err
}
}
if initialScanOnly {
if err := emitResolved(f.initialHighWater, jobspb.ResolvedSpan_EXIT); err != nil {
return err
}
return errChangefeedCompleted
}
if err = f.runUntilTableEvent(ctx, rangeFeedResumeFrontier); err != nil {
if tErr := (*errEndTimeReached)(nil); errors.As(err, &tErr) {
if err := emitResolved(rangeFeedResumeFrontier.Frontier(), jobspb.ResolvedSpan_EXIT); err != nil {
return err
}
return errChangefeedCompleted
}
return err
}
// Clear out checkpoint after the initial scan or rangefeed.
if initialScan {
f.checkpoint = nil
f.checkpointTimestamp = hlc.Timestamp{}
}
highWater := rangeFeedResumeFrontier.Frontier()
boundaryType := jobspb.ResolvedSpan_BACKFILL
events, err := f.tableFeed.Peek(ctx, highWater.Next())
if err != nil {
return err
}
// Detect whether the event corresponds to a primary index change. Also
// detect whether the change corresponds to any change in the set of visible
// primary key columns.
//
// If a primary key is being changed and there are no changes in the
// primary key's columns, this may be due to a column which was dropped
// logically before and is presently being physically dropped.
//
// If is no change in the primary key columns, then a primary key change
// should not trigger a failure in the `stop` policy because this change is
// effectively invisible to consumers.
primaryIndexChange, noColumnChanges := isPrimaryKeyChange(events, f.targets)
if primaryIndexChange && (noColumnChanges ||
f.schemaChangePolicy != changefeedbase.OptSchemaChangePolicyStop) {
boundaryType = jobspb.ResolvedSpan_RESTART
} else if f.schemaChangePolicy == changefeedbase.OptSchemaChangePolicyStop {
boundaryType = jobspb.ResolvedSpan_EXIT
}
// Resolve all of the spans as a boundary if the policy indicates that
// we should do so.
if f.schemaChangePolicy != changefeedbase.OptSchemaChangePolicyNoBackfill ||
boundaryType == jobspb.ResolvedSpan_RESTART {
if err := emitResolved(highWater, boundaryType); err != nil {
return err
}
}
// Exit if the policy says we should.
if boundaryType == jobspb.ResolvedSpan_RESTART || boundaryType == jobspb.ResolvedSpan_EXIT {
return schemaChangeDetectedError{highWater.Next()}
}
}

One possible thing we could do is log a message right before the loop restarts explaining why the kvfeed is not exiting and/or why it needed to stop in the first place. We could also maybe have a metric counting the number of kvfeed restarts (possibly broken down by reason).

This would've been useful while investigating https://github.com/cockroachlabs/support/issues/2958.

Jira issue: CRDB-38982

Epic CRDB-37337

@andyyang890 andyyang890 added C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) A-cdc Change Data Capture T-cdc labels May 23, 2024
Copy link

blathers-crl bot commented May 23, 2024

cc @cockroachdb/cdc

@andyyang890 andyyang890 added O-support Would prevent or help troubleshoot a customer escalation - bugs, missing observability/tooling, docs P-3 Issues/test failures with no fix SLA labels May 28, 2024
@exalate-issue-sync exalate-issue-sync bot added P-1 Issues/test failures with a fix SLA of 1 month and removed P-3 Issues/test failures with no fix SLA labels Sep 17, 2024
@andyyang890 andyyang890 self-assigned this Sep 17, 2024
@rharding6373 rharding6373 added P-2 Issues/test failures with a fix SLA of 3 months and removed P-1 Issues/test failures with a fix SLA of 1 month labels Sep 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-cdc Change Data Capture C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) O-support Would prevent or help troubleshoot a customer escalation - bugs, missing observability/tooling, docs P-2 Issues/test failures with a fix SLA of 3 months T-cdc
Projects
None yet
Development

No branches or pull requests

2 participants