changefeedccl: migrate physical kvfeed to rangefeed.Factory client#168723
changefeedccl: migrate physical kvfeed to rangefeed.Factory client#168723aerfrei wants to merge 1 commit intocockroachdb:masterfrom
Conversation
|
Merging to
After your PR is submitted to the merge queue, this comment will be automatically updated with its status. If the PR fails, failure details will also be posted here |
b3f5614 to
080e147
Compare
29e6a78 to
d57d956
Compare
| cfg.Knobs.OnRangeFeedStart(cfg.Spans) | ||
| } | ||
|
|
||
| // TODO(aerfrei): rangefeed.Factory.New takes an initialTimestamp, but it |
There was a problem hiding this comment.
If we like this PR otherwise, I'll file an issue for this change.
DarrylWong
left a comment
There was a problem hiding this comment.
High level question: Before this change, my understanding is that a physical kv feed took raw kvpb.RangeFeedEvent and converted it to a KVEvents that changefeeds cared about. Do we still need a separate physical kvfeed layer now that we can just interface with the new client which does his for us?
| // BulkDelivery enables bulk delivery of rangefeed events, which can improve performance during catchup scans. | ||
| // BulkDelivery is no longer used. The rangefeed client now handles bulk | ||
| // delivery internally. | ||
| var BulkDelivery = settings.RegisterBoolSetting( |
There was a problem hiding this comment.
Can you explain why we remove this outright rather than plumbing this option through the new rangefeed.Factory?
There was a problem hiding this comment.
The new rangefeed forces bulk delivery so it's not as simple as plumbing it through. This was also a temporary non-public setting that I think we planned to deprecate anyway.
There was a problem hiding this comment.
Gotcha, then I think my nit would be if we could put that info in the comment.
| cancel(err) | ||
| } | ||
|
|
||
| onValue := func(ctx context.Context, value *kvpb.RangeFeedValue) { |
There was a problem hiding this comment.
nit: it'd be nice for readability if these were top level functions instead of inlined.
Have you seen RangefeedHandler in pkg/crosscluster/producer/ordered_event_stream.go? I think something like that would be nice to copy (or perhaps outright extract to a common package and reuse).
There was a problem hiding this comment.
Agreed about these being top level. Can you elaborate more on why you think we should use RangefeedHandler here? I think it would be worthwhile to do if we want to have changefeeds use the ordered event stream, but I don't think it's worth doing immediately as part of this PR.
There was a problem hiding this comment.
Right I don't think we need the literal RangefeedHandler interface right now, I was more pointing to it is as an existing example of "how do we neatly describe all the ways we want to consume rangefeed events". I think copying this structure sets us up to go the interface route if we want as well as is visually easier to parse compared to sticking them in helper funcs.
| // flush out every change before the schema change timestamp before we start | ||
| // emitting any changes from after the schema change. The KVFeed's | ||
| // `SchemaFeed` is responsible for detecting and enforcing these , but the | ||
| // after-KVFeed buffer doesn't have access to any of this state. A cleanup is |
There was a problem hiding this comment.
Can we address this TODO now that the new callbacks have access to frontier state? i.e. get rid of the second buffer and teach On* methods to handle schema changes?
There was a problem hiding this comment.
I think this is a little more complicated and should be part of a separate PR, but I agree that we should do this. I think this PR sets us up better for that.
There was a problem hiding this comment.
That seems reasonable but I think:
- It would be nice to do a timeboxed investigation into how hard it would be to actually do this. Naively from the TODO it sounds like all we would have to do is move the schemafeed down a layer into the callbacks but i'm sure there's more complexity than that.
- We should update and persist this TODO somewhere so we don't forget.
| rangefeed.WithOnSSTable(func( | ||
| ctx context.Context, sst *kvpb.RangeFeedSSTable, _ roachpb.Span, | ||
| ) { | ||
| setErr(errors.AssertionFailedf("unexpected SST ingestion: %v", sst)) |
There was a problem hiding this comment.
nit: I think you can just not register an sst handler for the same effect; or are you saying we're retrying the default AssertionFailedf (which sounds like a bug)
There was a problem hiding this comment.
I added a test that shows that when I do that, if I include an SST event, the rangefeed hangs. I think generally retrying when we see an SST is intended but it being an AssertionFailedf is probably a bug/should be changed. I'll file an issue for that.
@DarrylWong I think you're right that we don't really need as much of the scaffolding around the "physicalFeed" including "physicalFeedFactory". I don't think it's really a physical feed anymore, but I think the file separation is good organizationally. I renamed the file and simplified a little. Let me know what you think. |
d57d956 to
5c8cecb
Compare
Migrate from the low-level kvcoord.DistSender.RangeFeed API to the high-level rangefeed.Factory client. The Factory uses typed callbacks instead of a channel-based event loop and handles transient retry logic internally. Notable behavioral change: transient rangefeed errors (range splits, leaseholder moves) are now retried inside the Factory rather than surfacing to the kvfeed layer and triggering a full physical feed restart. The Factory only forwards a narrow set of permanent errors to the caller; we register an OnSSTable handler that errors so unexpected SST events fail loud rather than falling into the silent-retry bucket. The dead RangeObserver plumbing on kvFeed and rangeFeedConfig is removed in passing — PR cockroachdb#163427 already replaced the lagging_ranges/total_ranges DistSender-introspection observer with a per-aggregator frontier poller. Release note (sql change): The changefeed.bulk_delivery.enabled cluster setting is now retired. The rangefeed client handles bulk delivery internally and the setting is no longer consulted. Epic: none
5c8cecb to
4e16926
Compare
Migrate from the low-level kvcoord.DistSender.RangeFeed API to the
high-level rangefeed.Factory client. The Factory uses typed callbacks
instead of a channel-based event loop and handles transient retry
logic internally.
Notable behavioral change: transient rangefeed errors (range splits,
leaseholder moves) are now retried inside the Factory rather than
surfacing to the kvfeed layer and triggering a full physical feed
restart. The Factory only forwards a narrow set of permanent errors to
the caller; we register an OnSSTable handler that errors so unexpected
SST events fail loud rather than falling into the silent-retry bucket.
The dead RangeObserver plumbing on kvFeed and rangeFeedConfig is
removed in passing — PR #163427 already replaced the
lagging_ranges/total_ranges DistSender-introspection observer with a
per-aggregator frontier poller.
Release note (sql change): The changefeed.bulk_delivery.enabled cluster
setting is now retired. The rangefeed client handles bulk delivery
internally and the setting is no longer consulted.
Epic: none