-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kvcoord: Add observability for actively running range feeds. #69055
Conversation
@ajwerner @andreimatei I think this kind of PR might be better than the other one that tries to restart changefeeds. |
6fdeb92
to
57c7481
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to be extremely useful. Can you add querying this to the debug zip endpoint?
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @andreimatei and @miretskiy)
pkg/kv/kvclient/kvcoord/dist_sender.go, line 295 at r1 (raw file):
// Currently executing range feeds. activeRangeFeeds sync.Map
can you put a comment with the types, like, in this case, // map[*rangeFeedRegistry]nil
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 114 at r1 (raw file):
// ActiveRangeFeed structure describes the state of currently executing range feed. type ActiveRangeFeed struct {
How about we use the word partial
somewhere around here?
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 114 at r1 (raw file):
// ActiveRangeFeed structure describes the state of currently executing range feed. type ActiveRangeFeed struct {
I think we really want to track the current resolved timestamp somehow.
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 121 at r1 (raw file):
} // ActiveRangeFeedIterFn is an iterator function which is passed ActiveRangeFeed structure.
mention iterutil.StopIteration
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 134 at r1 (raw file):
active := k.(*activeRangeFeed) active.Lock() defer active.Unlock()
how about we just copy the ActiveRangeFeed
struct out and pass it to the callback. It'd be bad if this callback blocked, and, for virtual tables, it very much may block if the consumer stops consuming rows.
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 143 at r1 (raw file):
return iterErr == nil })
Can you add:
if iterutil.Done(err) {
err = nil
}
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 165 at r1 (raw file):
RangeFeedContext // Map of ranges (activeRangeFeed* -> nil) started by this registry. ranges sync.Map
nit: I prefer a comment to look like
// feel free to put more commentary here.
ranges sync.Map // map[*activeRangeFeed]nil
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 169 at r1 (raw file):
// startPartialRangeFeed starts execution of partial rangefeed. func (r *rangeFeedRegistry) startPartialRangeFeed(
is there any reason to not just pass rangeFeedRegistry
to partialRangeFeed
and just inline this there?
pkg/sql/crdb_internal.go, line 5063 at r1 (raw file):
);`, populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { return p.extendedEvalCtx.DistSQLPlanner.distSender.ForEachActiveRangeFeed(
nit: don't go in the eval context here. Can you instead do p.execCfg.DistSender
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to zip_per_node
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @andreimatei)
pkg/kv/kvclient/kvcoord/dist_sender.go, line 295 at r1 (raw file):
Previously, ajwerner wrote…
can you put a comment with the types, like, in this case,
// map[*rangeFeedRegistry]nil
Nice. Done
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 114 at r1 (raw file):
How about we use the word partial somewhere around here?
Done.
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 114 at r1 (raw file):
Previously, ajwerner wrote…
I think we really want to track the current resolved timestamp somehow.
Done -- updated virtual table to display both the timestamp of the last KV event and the time of last checkpoint.
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 121 at r1 (raw file):
Previously, ajwerner wrote…
mention
iterutil.StopIteration
Done.
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 134 at r1 (raw file):
Previously, ajwerner wrote…
active := k.(*activeRangeFeed) active.Lock() defer active.Unlock()
how about we just copy the
ActiveRangeFeed
struct out and pass it to the callback. It'd be bad if this callback blocked, and, for virtual tables, it very much may block if the consumer stops consuming rows.
Done... It's true that the consumer may pause if e.g. we have some memory pressure.
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 143 at r1 (raw file):
Previously, ajwerner wrote…
Can you add:
if iterutil.Done(err) { err = nil }
Done.
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 165 at r1 (raw file):
// map[*activeRangeFeed]nil
Done.
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 169 at r1 (raw file):
Previously, ajwerner wrote…
// startPartialRangeFeed starts execution of partial rangefeed. func (r *rangeFeedRegistry) startPartialRangeFeed(
is there any reason to not just pass
rangeFeedRegistry
topartialRangeFeed
and just inline this there?
Nope. Gone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're willing, could you put a snippet of output from the table in the PR description? It'll be cool to see it. This thing is going to be very handy.
Reviewed 2 of 5 files at r1, 10 of 10 files at r3.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @ajwerner, @andreimatei, and @miretskiy)
pkg/ccl/changefeedccl/changefeed_test.go, line 4414 at r3 (raw file):
CREATE TABLE tbl (a INT, b STRING); INSERT INTO tbl VALUES (1, 'one'), (2, 'two'), (3, 'three'); CREATE CHANGEFEED FOR tbl INTO 'null://';
the null
sink is nifty
pkg/ccl/changefeedccl/changefeed_test.go, line 4424 at r3 (raw file):
Quoted 8 lines of code…
testutils.SucceedsSoon(t, func() error { var numRanges int sqlDB.QueryRow(t, numRangesQuery).Scan(&numRanges) if numRanges == 0 { return errors.New("still waiting for ranges to be registered") } return nil })
?
sqlDB.CheckQueryResultsRetry(t, numRangesQuery, [][]string{{"1"}})
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 134 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Done... It's true that the consumer may pause if e.g. we have some memory pressure.
By consumer here I meant the reader of the crdb_internal
table. That is going to be driven by a client connection and underneath the callback here we're likely call a function to push a row up to the connection. That thing can block if the client stops calling Next()
on their Rows
. It'd be bad if we couldn't update something in this *activeRangeFeed
because a SQL client stopped asking for rows.
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 98 at r3 (raw file):
// RangeFeed call. It functions as a kind of key for an active range feed. type RangeFeedContext struct { CtxTags string // context tags
nit: How crazy would it be to store the context.Context
here too? Might make some testing pretty interesting. Maybe for later.
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 111 at r3 (raw file):
StartTS hlc.Timestamp NodeID roachpb.NodeID LastEvent time.Time
nit: what if we call this LastValueReceived
? It's only the value events that seem to move this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner and @andreimatei)
pkg/ccl/changefeedccl/changefeed_test.go, line 4414 at r3 (raw file):
Previously, ajwerner wrote…
the
null
sink is nifty
Ack.
pkg/ccl/changefeedccl/changefeed_test.go, line 4424 at r3 (raw file):
sqlDB.CheckQueryResultsRetry(t, numRangesQuery, [][]string{{"1"}})
Cool.
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 98 at r3 (raw file):
Previously, ajwerner wrote…
nit: How crazy would it be to store the
context.Context
here too? Might make some testing pretty interesting. Maybe for later.
We could, I suppose.
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go, line 111 at r3 (raw file):
Previously, ajwerner wrote…
nit: what if we call this
LastValueReceived
? It's only the value events that seem to move this.
Done.
FYI: @ajwerner I'm going to rename some of the columns in the vtable: |
28b0314
to
51b5637
Compare
FYI: Added range id to the list of columns. |
644edc0
to
de95632
Compare
bors r=ajwerner |
Build failed: |
bors r+ |
Build failed: |
Add a `crdb_internal.active_range_feeds` virtual table which lists all currently running range feeds on a node. The table lists the span for which the range feed was started, along with all of the ranges making up that span. Per range information, including the node where the range feed is running against, as well as the timestamp when the last non-error event has been seen from the range is recorded in the table. ``` root@:26257/defaultdb> select * from crdb_internal.active_range_feeds order by id, range_start; id | tags | range_start | range_end | startts | diff | node_id | partial_range_start | partial_range_end | resolved | last_event_utc ---------------+--------------------------------+-------------+-------------+--------------------------------+-------+---------+------------------------------+------------------------------+--------------------------------+----------------- 824655839232 | n?,rangefeed=table-stats-cache | /Table/20 | /Table/21 | 1629329537656207000.0000000000 | false | 1 | /Table/20 | /Table/21 | 1629335390881550000.0000000000 | NULL 824657576368 | n1,rangefeed=lease | /Table/3 | /Table/4 | 0.0000000000 | false | 1 | /Table/3 | /Table/4 | 1629335390881550000.0000000000 | NULL 824680146768 | n1,job=685578580453523457 | /Table/53/2 | /Table/53/3 | 1629329458537238000.0000000000 | false | 1 | /Table/53/2 | /Table/53/2/"\x80" | 1629335390881550000.0000000000 | NULL 824680146768 | n1,job=685578580453523457 | /Table/53/2 | /Table/53/3 | 1629329458537238000.0000000000 | false | 1 | /Table/53/2/"\x80"/PrefixEnd | /Table/53/3 | 1629335390881550000.0000000000 | NULL 824680146768 | n1,job=685578580453523457 | /Table/53/2 | /Table/53/3 | 1629329458537238000.0000000000 | false | 1 | /Table/53/2/"\x80" | /Table/53/2/"\x80"/PrefixEnd | 1629335390881550000.0000000000 | NULL (11 rows) ``` Release Notes: Improve range feed observability by adding a `crdb_internal.active_range_feeds` virtual table which lists all currently executing range feeds on the node.
bors r+ |
Build succeeded: |
Add a
crdb_internal.active_range_feeds
virtual table which listsall currently running range feeds on a node.
The table lists the span for which the range feed was started, along with all of the ranges
making up that span.
Per range information, including the node where the range feed is running
against, as well as the timestamp when the last non-error event has
been seen from the range is recorded in the table.
Release Notes: Improve range feed observability by adding a
crdb_internal.active_range_feeds
virtual table which listsall currently executing range feeds on the node.