-
Notifications
You must be signed in to change notification settings - Fork 11.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Storage: Watch support #82282
Storage: Watch support #82282
Conversation
1f3d959
to
615597e
Compare
added an initial implementation using a single poller that broadcasts out to all watchers, I still need to look at tweaking the startup process for new watches that specify a "since" value (which may be in the past) but it's more or less working and has the basics we need to swap in a different tail mechanism by providing the broadcaster with a different connectFunc implementation |
|
||
fields := s.getReadFields(rr) | ||
|
||
entityQuery := selectQuery{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need SELECT FOR SHARE ? This would ensure we don't skip entities in the process of being committed. I suspect transaction t1 with revision_version=1 might be comited after t2 with revision_version=2 leading to a gap? If that's correct, using SELECT FOR SHARE would ensure we wait for all the in-progress transactions to finish before returning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, I'll have to think about that. It's supported by MySQL and Postgres but not SQLite. It does seem like it shouldn't hurt anything if we added it via dialect.
7d70f73
to
aa641ba
Compare
// result doesn't match our watch params, skip it | ||
if !watchMatches(r, result) { | ||
s.log.Debug("watch result not matched", "guid", result.Guid, "action", result.Action, "rv", result.ResourceVersion) | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand the code correctly we should continue on to the next event instead of stopping the watch, shouldn't we?
break | |
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more or less the same impact since we go to the next loop of the for and call select again, but yes continue may make more sense
aa641ba
to
f81d028
Compare
@DanCech Running it locally to test #83772 I found that watchInit appears to cache results, even for deleted entities, If I create and delete playlists and run |
So when you run kubectl get with watch enabled, it performs list + watch, it'll first get the list of items, then start a watch from the resourceVersion returned by the list call. You can see what it's doing if you experiment with the What you may be seeing if you delete a playlist would be that it won't be returned in the list, but when the watch is started from the most recent resourceVersion returned by the list, that resourceVersion is older than the resourceVersion for some create, update and/or delete events for that playlist so they are included in the initial set of events returned in the watch. There are a few ways we could deal with that, one may be to return a fresh snowflake in the list response rather than the most-recent resourceVersion of the items we're returning, which would mean that the watch would be started from the time of the list command rather than the time of the most recently created/updated item in the list. |
Unless we see any major blockers I'd like to get this PR merged and do any further cleanup in followup PRs, since it's already quite substantial and we are starting to accumulate PRs off this branch. |
@DanCech it seems like i remember the etcd implementation doing something like this. is that what it does? |
4294c07
to
155ee64
Compare
Co-authored-by: Igor Suleymanov <radiohead@users.noreply.github.com>
155ee64
to
1400a7e
Compare
This PR adds unified storage support for k8s WATCH
Within the storage server it uses a broadcaster pattern to consume events from the database and dispatch them to all subscribed watchers, each watcher filters the broadcasted events and returns matching events to the grpc client.
When a new watch is initiated, watchInit reads the first batch of records from the database, then subscribes to the broadcaster. The broadcaster maintains an in-memory circular buffer of recent events to help bridge any gap between the watchInit query and the subscriber connecting, the contents of the buffer are replayed to each new subscriber before any new events.
The subscriber is responsible for discarding any duplicate events and for filtering events received from the broadcaster to return only those events applicable to the requested watch.
In the k8s storage implementation, we simply use the grpc client to connect to the storage api and stream the results to the consumer via the k8s-provided StreamWatcher implementation.
Currently the database event consumer is implemented via a background poller which reads from the entity_history table, but we can add support for alternative implementations that tail binary logs or use changefeeds to discover new entries in entity_history.
Right now the circular buffer size, broadcaster subscription channel buffer size, sql batch size, and poll interval are hardcoded but they can be made configurable in the future if/when needed.