Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

identity: cache: close channel in writing party #25353

Merged
merged 1 commit into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 7 additions & 3 deletions pkg/allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ const (
type Allocator struct {
// events is a channel which will receive AllocatorEvent as IDs are
// added, modified or removed from the allocator
events AllocatorEventChan
events AllocatorEventSendChan

// keyType is an instance of the type to be used as allocator key.
keyType AllocatorKey
Expand Down Expand Up @@ -345,7 +345,7 @@ func WithBackend(backend Backend) AllocatorOption {
// read while NewAllocator() is being called to ensure that the channel does
// not block indefinitely while NewAllocator() emits events on it while
// populating the initial cache.
func WithEvents(events AllocatorEventChan) AllocatorOption {
func WithEvents(events AllocatorEventSendChan) AllocatorOption {
return func(a *Allocator) { a.events = events }
}

Expand Down Expand Up @@ -381,7 +381,7 @@ func WithoutGC() AllocatorOption {
// GetEvents returns the events channel given to the allocator when
// constructed.
// Note: This channel is not owned by the allocator!
func (a *Allocator) GetEvents() AllocatorEventChan {
func (a *Allocator) GetEvents() AllocatorEventSendChan {
return a.events
}

Expand Down Expand Up @@ -867,6 +867,10 @@ func (a *Allocator) startLocalKeySync() {
// AllocatorEventChan is a channel to receive allocator events on
type AllocatorEventChan chan AllocatorEvent

// Send- and receive-only versions of the above.
type AllocatorEventRecvChan = <-chan AllocatorEvent
type AllocatorEventSendChan = chan<- AllocatorEvent

// AllocatorEvent is an event sent over AllocatorEventChan
type AllocatorEvent struct {
// Typ is the type of event (create / modify / delete)
Expand Down
9 changes: 7 additions & 2 deletions pkg/identity/cache/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type CachingIdentityAllocator struct {

identitiesPath string

// This field exists is to hand out references that are either for sending
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This field exists is to hand out references that are either for sending
// This field exists to hand out references that are either for sending

Nit.

// and receiving. It should not be used directly without converting it first
// to a AllocatorEventSendChan or AllocatorEventRecvChan.
events allocator.AllocatorEventChan
watcher identityWatcher

Expand Down Expand Up @@ -167,7 +170,7 @@ func (m *CachingIdentityAllocator) InitIdentityAllocator(client clientset.Interf

// Asynchronously set up the global identity allocator since it connects
// to the kvstore.
go func(owner IdentityAllocatorOwner, events allocator.AllocatorEventChan, minID, maxID idpool.ID) {
go func(owner IdentityAllocatorOwner, events allocator.AllocatorEventSendChan, minID, maxID idpool.ID) {
m.setupMutex.Lock()
defer m.setupMutex.Unlock()

Expand Down Expand Up @@ -273,7 +276,9 @@ func (m *CachingIdentityAllocator) Close() {

m.IdentityAllocator.Delete()
if m.events != nil {
close(m.events)
Copy link
Member

@gandro gandro May 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Since we don't close m.events anymore, could we make type AllocatorEventChan chan AllocatorEvent into type AllocatorEventChan chan <-AllocatorEvent to disallow any code from accidentally closing it?

You might will to change the signature of newLocalIdentityCache, but all other receivers should hopefully be fine with a receive-only channel.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this is actually a bit more involved than I though - looking into other uses of that type.

// Have the now only remaining writing party close the events channel,
// to ensure we don't panic with 'send on closed channel'.
Comment on lines +279 to +280
Copy link
Member

@giorio94 giorio94 May 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not confident that this statement is guaranteed to be true in a clustermesh scenario. The events channel is also propagated to remote watchers, which might still be running when the main allocator is closed (the clustermesh subsystem is never stopped on shutdown). Likely not a big deal though, since that should be extremely rare and we are already shutting down. For this reason I wonder if it is something actually worth solving it in this PR (it might be quite complex to fix).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right, that's still broken.

It's kind of unclear to me what the ownership relations are in the clustermesh scenario. Does a m.IdentityAllocator own its remote caches (and hence, should Allocator.Delete() call .Close() on them)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the cache corresponding to a given remote cluster is closed here, which happens when the associated cluster configuration is removed, but is not triggered when the agent is shut down.

The two possible alternatives seem to be either ensuring that the clustermesh subsystem is stopped before the allocator is closed, or making sure that m.IdentityAllocator stops also the remote caches when closed. I'm not sure how the first approach plays with the current structure, given that clustermesh has not yet been converted to a Cell module. The second approach instead requires to protect RemoteCache.Close() from being called twice, as it would panic.

Does a m.IdentityAllocator own its remote caches?

I would tend to say so, since a remote cache is pointless to exist without the main allocator (but their lifecycle is currently managed externally).

m.localIdentities.close()
m.events = nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/identity/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func collectEvent(event allocator.AllocatorEvent, added, deleted IdentityCache)
}

// watch starts the identity watcher
func (w *identityWatcher) watch(events allocator.AllocatorEventChan) {
func (w *identityWatcher) watch(events allocator.AllocatorEventRecvChan) {

go func() {
for {
Expand Down
16 changes: 14 additions & 2 deletions pkg/identity/cache/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ type localIdentityCache struct {
nextNumericIdentity identity.NumericIdentity
minID identity.NumericIdentity
maxID identity.NumericIdentity
events allocator.AllocatorEventChan
events allocator.AllocatorEventSendChan
}

func newLocalIdentityCache(minID, maxID identity.NumericIdentity, events allocator.AllocatorEventChan) *localIdentityCache {
func newLocalIdentityCache(minID, maxID identity.NumericIdentity, events allocator.AllocatorEventSendChan) *localIdentityCache {
return &localIdentityCache{
identitiesByID: map[identity.NumericIdentity]*identity.Identity{},
identitiesByLabels: map[string]*identity.Identity{},
Expand Down Expand Up @@ -191,3 +191,15 @@ func (l *localIdentityCache) GetIdentities() map[identity.NumericIdentity]*ident

return cache
}

// close closes the events channel. The local identity cache is the writing
// party, hence also needs to close the channel.
func (l *localIdentityCache) close() {
l.mutex.Lock()
defer l.mutex.Unlock()

if l.events != nil {
close(l.events)
l.events = nil
gandro marked this conversation as resolved.
Show resolved Hide resolved
}
}