Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: checkpoint for lagging high-water #77763

Merged
merged 5 commits into from May 3, 2022

Conversation

miretskiy
Copy link
Contributor

@miretskiy miretskiy commented Mar 14, 2022

A changefeed's main method of persisting progress is through the
high-water mark, the timestamp at which every tracked span has met or
exceeded.

This meant that if some small set of spans were lagging behind the rest
for example due to nodes becoming transiently unavailable and the
changefeed was to be restarted, it would consider every span to be
at that lagging timestamp and begin re-emitting the other spans. This
would be a pain point for significantly high QPS changefeeds where
restarting even 20 minutes into the past would result in millions of
duplicated events being sent.

In addition, when changefeed starts with the cursor, the changefeeed
performs a catchup scan. Those catchup scans could be expensive
if the cursor sufficiently back in the past. Since KV server
limits the number of concurrent catchup scans, some spans will complete
their catchup scan and beging emitting regular (rangefeed) events, while
others would still be waiting to perform catchup scan. Any transient
error at this time would result in a restart -- and the checkpoint
for the spans that were able to begin rangefeed is important since
it allows changefeed to make forward progress.

This change extends the current per-span checkpointing used in backfills
to also encompass the situation when the high-water mark is sufficiently
lagging behind the latest edge of the frontier. Once the high-water
mark's delay has exceeded the value of the
frontier_highwater_lag_checkpoint_threshold cluster setting, checkpoints
will be stored at the same frontier_checkpoint_frequency as backfills,
with both a number of spans as well as the minimum timestamp they have
advanced to. On changefeed resumption, the frontier will advance these
spans to that timestamp.

Fixes #77693

Release note (performance improvement): per-span checkpointing added to
cases when the high-water mark lags excessively behind the leading edge
of the frontier in order to avoid re-emitting the majority of spans due
to a small minority that is experiencing issues progressing.

Release Justification: Important fix to enable changefeed to operate on
very large tables when performing large catchup scan.

@miretskiy miretskiy requested a review from a team as a code owner March 14, 2022 13:31
@miretskiy miretskiy requested review from HonoreDB and removed request for a team March 14, 2022 13:31
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@miretskiy miretskiy marked this pull request as draft March 14, 2022 13:31
@miretskiy miretskiy force-pushed the checkpoint branch 2 times, most recently from 955e9a7 to c6e5972 Compare March 19, 2022 21:41
@miretskiy miretskiy force-pushed the checkpoint branch 8 times, most recently from 6e98afb to 6603ac0 Compare April 6, 2022 14:11
// any error returned from this function is propagated to the caller, causing frontier
// to no longer be used.
for _, sp := range spansToBackfill {
if _, err := frontier.Forward(sp, scanTime.Next()); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds strange to me to say "set the frontier after the scan time", since given the definition of the frontier that reads to me like "we're now saying that we've seen all events for this span up to and including right after the scan time". What causes us to need this? Is it possible to instead have a .Next() further down at the point we're using this frontier information? Even if it'd now be usable as a rangefeed start time, my concern is that at some point we do/would rely on the classic definition of frontier as "we've already seen this timestamp" and have some off-by-one issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a bit of a misunderstanding. When we scan spans, we scan up to and including that timestamp.
Therefore, when we start rangefeed, the feed for those spans should start from the next timestamp.
That was the nature of the off-by 1 bug fix. Now, the reason I did this manipulation in this function because
the set of spans and the timestamp depends on whether or not it's an initial scan and if there are table events.
At any rate, I changed the signature to return the list of spans that were scanned and their timestamp so that
we can advance frontier at the call site. Do you think this is more clear now?

@miretskiy miretskiy force-pushed the checkpoint branch 4 times, most recently from b3377a9 to 28d5a24 Compare April 9, 2022 17:10
@miretskiy miretskiy marked this pull request as ready for review April 10, 2022 14:00
@miretskiy miretskiy requested a review from a team as a code owner April 10, 2022 14:00
@miretskiy miretskiy requested review from a team and ajwerner April 10, 2022 14:00
@miretskiy
Copy link
Contributor Author

This is finally ready for review; I probably missed the cutoff time for 22.2.0 backport.
@ajwerner would appreciate you 👁️ on this.

@otan otan removed the request for review from a team April 18, 2022 20:35
Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 🚀

g.GoCtx(func(ctx context.Context) error {
return ds.divideAndSendRangeFeedToRanges(ctx, rs, startFrom, rangeCh)
})
// for _, span := range spans {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

detritus

Comment on lines 251 to 257
// We have scanned scannedSpans up to and including scannedTS. Advance frontier
// for those spans -- we can start their range feed from scannedTS.Next().
for _, sp := range scannedSpans {
if _, err := frontier.Forward(sp, scannedTS.Next()); err != nil {
return err
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still concerns me because by setting frontier like this we're still saying "We've seen everything up to time scannedTS.Next()" due to the definition of frontier and then we rely on that definition later on even though it's no longer necessarily true. This'd likely never happen but suppose a schema change backfill occurred at time t when we had an endTime of t.Next() while we also had a single change occurring at t.Next() as well, wouldn't that event be erroneously skipped?

If our initialScanOnly check directly below this block was instead an endTimeReached = f.endTime.EqOrdering(frontier.Frontier()), that should technically still have been a reasonable choice but this type of invalidation would've broken that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think we are saying that Scan has seen all events scanTS, and thus we can start rangefeed after that time -- i.e at scanTS.Next()

I think this is all pretty subtle, and maybe @ajwerner can illucidate/correct things.
Schema change occurred at T, runUntilTableEvent exists with T.Prev().
We then scan table up-to, and including T. We emit all events at T. That means we can start rangefeed from T.Next().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but wouldn't the endTime case I described still be possible? Where we start the rangefeed from T.next() but we think we've already finished all events for T.next() because that's what the Frontier is at so we allow ourselves to exit at the endTime scanBoundary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm just seeing that you mean newly added EndTime -- which simply didn't exist when this PR was written.
I think it's fine? Or at the very least we didn't define end_time too exactly. So, it looks like it's exclusive.

Copy link
Contributor

@samiskin samiskin Apr 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we could define it that way in this case, tho it does feel a little awkward that its "almost always inclusive except for a specific edge case".

I don't disagree that the way things are set up now will likely work perfectly fine for our current requirements, my concern is more on the complexity/code-smell side where we're adding complexity to the definition of a "frontier" where now it has two possible meanings, either "the time at or before we know we've seen all events" or "the starting point of the rangefeeds". Bugs/restrictions like "endTime can't be inclusive" can occur where we assume one meaning while the other meaning is active.

Here is my suggestion:
Screen Shot 2022-04-20 at 2 44 09 PM

This way, our frontier always maintains the invariant of "we've handled all information up to and including the frontier", there's no more "or is the initial starting time of the feed". It is also always updated as early as possible, so:

  1. We initialize it to initialHighWater because in Config we've defined that as "InitialHighWater is the timestamp after which new events are guaranteed to be produced", so initialHighWater is already considered handled
  2. We immediately forward the checkpoint information because we know that progress has happened from the beginning. I also don't think we necessarily need to clear out checkpoint and checkpointTimestamp because now unlike before we have the timestamp information that restricts its validity.
  3. As soon as the scan occurs we forward the frontier to the scannedTS because we know that those spans were progressed up to and including that timestamp. No further explanation needed of "we'll do .Next() here so that later down in the code when we start the rangefeeds they start correctly".
  4. The SpanTimePair initialization is more clear where the intent of "We want to start the rangefeeds right after our existing progress" is clearly laid out within the context it has relevance to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, keeping things as they are, but renaming frontier to rangeFeedResumeFrontier to make it clear what the purpose of that frontier is.


var stps []kvcoord.SpanTimePair
frontier.Entries(func(s roachpb.Span, ts hlc.Timestamp) (done span.OpResult) {
stps = append(stps, kvcoord.SpanTimePair{Span: s, TS: ts})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of forwarding the frontier above, would it not be more correct to do TS: ts.Next() here? Since then we're saying "Our frontier says we've already handled this span up to time ts, so we want to start a rangefeed at ts.Next() to get all events after that point".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be wrong because scanIfShould may have scanned a subset of spans -(i.e. we have feed on 2 tables, but only 1 table had schema change). That's why scanIfShould now returns the timestamp and the set of spans that it scanned to that timestamp.

pkg/ccl/changefeedccl/kvfeed/kv_feed.go Outdated Show resolved Hide resolved
@miretskiy
Copy link
Contributor Author

bors r+

@craig
Copy link
Contributor

craig bot commented May 1, 2022

Build failed:

@miretskiy miretskiy force-pushed the checkpoint branch 2 times, most recently from 10d9c48 to f0bcfdf Compare May 2, 2022 22:03
Yevgeniy Miretskiy and others added 5 commits May 2, 2022 18:09
Fix data race in TestAlterChangefeedAddTargetsDuringBackfill.
In addition, fix incorrect (racy) assumptions in the test itself.

Release Notes: None
Release Justification: test fix
Expand RangeFeed api to support specification of start
time per span.

Release Notes: None

Release Justification: important stability work required
to support changefeed checkpointing during catchup scans.
Add a utility method to span frontier to construct
frontier at the specified timestamp.

Release Notes: None
Release Justification: low impact/danger change to improve span
frontier library.
A changefeed's main method of persisting progress is through the
high-water mark, the timestamp at which every tracked span has met or
exceeded.

This meant that if some small set of spans were lagging behind the rest
for example due to nodes becoming transiently unavailable and the
changefeed was to be restarted, it would consider every span to be
at that lagging timestamp and begin re-emitting the other spans. This
would be a pain point for significantly high QPS changefeeds where
restarting even 20 minutes into the past would result in millions of
duplicated events being sent.

In addition, when changefeed starts with the cursor, the changefeeed
performs a catchup scan.  Those catchup scans could be expensive
if the cursor sufficiently back in the past.  Since KV server
limits the number of concurrent catchup scans, some spans will complete
their catchup scan and beging emitting regular (rangefeed) events, while
others would still be waiting to perform catchup scan.  Any transient
error at this time would result in a restart -- and the checkpoint
for the spans that were able to begin rangefeed is important since
it allows changefeed to make forward progress.

This change extends the current per-span checkpointing used in backfills
to also encompass the situation when the high-water mark is sufficiently
lagging behind the latest edge of the frontier.  Once the high-water
mark's delay has exceeded the value of the
frontier_highwater_lag_checkpoint_threshold cluster setting, checkpoints
will be stored at the same frontier_checkpoint_frequency as backfills,
with both a number of spans as well as the minimum timestamp they have
advanced to.  On changefeed resumption, the frontier will advance these
spans to that timestamp.

Release note (performance improvement): per-span checkpointing added to
cases when the high-water mark lags excessively behind the leading edge
of the frontier in order to avoid re-emitting the majority of spans due
to a small minority that is experiencing issues progressing.

Release Justification: Important fix to enable changefeed to operate on
very large tables when performing large catchup scan.
Use catchup scan checkpoint when resuming range feed.

Release Notes (enterprise change): Changefeed restarts,
and changefeeds started with cursor are now more efficient since
they can use checkpoint of catchup scan progress.

Release Justification: important performance and scalability
improvements for large scale changefeeds.
@miretskiy
Copy link
Contributor Author

bors r+

@craig
Copy link
Contributor

craig bot commented May 3, 2022

Build succeeded:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

changefeed: Checkpoint catchup scans.
4 participants