-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
k8s: Introduce subscriber package to simplify & consolidate K8s watcher callbacks / event handling #15295
k8s: Introduce subscriber package to simplify & consolidate K8s watcher callbacks / event handling #15295
Conversation
0d07946
to
fb02f3c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not entirely clear what are we gaining by introducing another queue to process k8s events. We had queues for k8s events in the past but we have removed them because they are redundant as the k8s watcher library already has a really good implementation to handle them.
fb02f3c
to
f53283c
Compare
@aanm I rewrote the PR to remove the use of the queue and instead just directly using a slice. Thanks for the suggestion |
f53283c
to
0c7fdd3
Compare
0c7fdd3
to
8c62a3d
Compare
a1d5ce0
to
3abdba6
Compare
490679e
to
44b6c52
Compare
|
||
func (c *serviceCacheSubscriber) OnAdd(obj *slim_corev1.Service) { | ||
log.WithField(logfields.ServiceName, obj.Name).Debugf("Received service addition %+v", obj) | ||
K8sSvcCache.UpdateService(obj, c.swgSvcs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
K8sSvcCache.UpdateService
sends the events to a buffered channel of some pre-configured capacity. Currently, we have only one buffered channel as K8sSvcCache.UpdateService
is directly called from the watcher callback. But with this change there'll be a channel for every subscriber. Won't that increase memory usage of Cilium? Also, it looks like the code to parse the Service
object, call correlateEndpoints
, etc in K8sSvcCache.UpdateService
be duplicated for every subscriber?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that's not the intent. I opened #15471 to further address subscribers that depend on the service cache itself. An example is explained there.
Generally , the solution wouldn't be to create a new channel or duplicate anything for that matter for each subscriber, but rather refactor the downstream subscribers that depend on the service cache itself.
This comment has been minimized.
This comment has been minimized.
44b6c52
to
d2b4795
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this is part of the BGP PR, do we still want to keep this PR open?
@aanm I intended for the BGP PR to be based off this one. There's also lots of context inside this PR that might be worth keeping, so let's use this one |
d2b4795
to
9455269
Compare
This commit implements a package called subscriber for the K8s resource / object watchers. It contains a list of subscribers to notify when handling a K8s add, update, or delete event, respectively. This allows the watchers to be simpler and leaner than they previously were. They no longer need to hold logic specific to the consumers. Instead, they call Notify*() inside the watcher callbacks depending on the event, passing the resource / object from the event. This notifies all the subscribers registered with Register(). Subscribers (also referred to as consumers or handlers) must implement either k8s.io/client-go/tools/cache.ResourceEventHandler and use the "Raw" API of callbacks such as (*RawList).Register(), where there's no strict contraint on the resource / object type, or implement the provided ServiceHandler and use the "Service" specific API, such as (*ServiceList).Register(). Consumers of the K8s Service watcher should implement the ServiceHandler. For now, only the raw and the service API is provided, due to strict necessity. More specific handlers can be added easily by defining a new interface similar to ServiceHandler, but differing on the object type. Signed-off-by: Chris Tarazi <chris@isovalent.com>
This commit converts the service watcher in the Operator to use the new subscriber package to handle K8s object events by defining a ServiceHandler. Signed-off-by: Chris Tarazi <chris@isovalent.com>
9455269
to
2c5fa4a
Compare
test-me-please |
hit #12891 merging |
This PR implements a new package which represents a subscriber to K8s watcher
events. This package simplifies the logic of K8s watchers and removes all the
unnecessary logic of the consumer (subscriber) from them.
In order for a subscriber to add their event handling to the K8s watcher, it
must use register itself. Depending on what kind of object it's subscribed to,
it must use implement the ServiceHandler interface if it's handling Service
objects, or use the "Raw" API where no such constraint on the object type
exists.
With this implementation, future subscribers can easily add their own logic
without worrying about polluting K8s watcher code with specific, irrelevant
details only really meant for them. Meanwhile, the K8s watchers themselves
simply call
Notify*()
depending on the event type. This notifies all thesubscribers to run their logic on the event.
In an upcoming PR, a BGP subscriber will utilize this infrastructure, all the
while, the core service watcher code remains unchanged.
This PR does not convert the rest of the Operator's watchers, but instead
provides the groundwork for future PRs.
See commit msgs.