Skip to content

Commit

Permalink
identity: cache: close events channel in writer
Browse files Browse the repository at this point in the history
As part of the shutdown procedure involving IPCache and the identity
allocation components, it was possible to hit a 'send on closed
channel' panic, caused by writes of the localIdentityCache to the events
channel, which is closed as part of the shutdown of the identity
allocator.

Instead of directly closing the channel after shutting down the
allocator, call into the localIdentityCache to do so, with proper mutual
exclusion guaranteed its mutex.

The offending writes happened in 'lookupOrCreate' as well as 'release',
both of which take the mutex, and hence are correctly synchronised with
the new 'close()' method. The other writer is the allocator, but we
block on its shutdown before 'close()'.

Suggested-by: André Martins <andre@cilium.io>
Signed-off-by: David Bimmler <david.bimmler@isovalent.com>
  • Loading branch information
bimmlerd authored and tommyp1ckles committed May 11, 2023
1 parent 3190d5d commit efb6f56
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 8 deletions.
10 changes: 7 additions & 3 deletions pkg/allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ const (
type Allocator struct {
// events is a channel which will receive AllocatorEvent as IDs are
// added, modified or removed from the allocator
events AllocatorEventChan
events AllocatorEventSendChan

// keyType is an instance of the type to be used as allocator key.
keyType AllocatorKey
Expand Down Expand Up @@ -345,7 +345,7 @@ func WithBackend(backend Backend) AllocatorOption {
// read while NewAllocator() is being called to ensure that the channel does
// not block indefinitely while NewAllocator() emits events on it while
// populating the initial cache.
func WithEvents(events AllocatorEventChan) AllocatorOption {
func WithEvents(events AllocatorEventSendChan) AllocatorOption {
return func(a *Allocator) { a.events = events }
}

Expand Down Expand Up @@ -381,7 +381,7 @@ func WithoutGC() AllocatorOption {
// GetEvents returns the events channel given to the allocator when
// constructed.
// Note: This channel is not owned by the allocator!
func (a *Allocator) GetEvents() AllocatorEventChan {
func (a *Allocator) GetEvents() AllocatorEventSendChan {
return a.events
}

Expand Down Expand Up @@ -867,6 +867,10 @@ func (a *Allocator) startLocalKeySync() {
// AllocatorEventChan is a channel to receive allocator events on
type AllocatorEventChan chan AllocatorEvent

// Send- and receive-only versions of the above.
type AllocatorEventRecvChan = <-chan AllocatorEvent
type AllocatorEventSendChan = chan<- AllocatorEvent

// AllocatorEvent is an event sent over AllocatorEventChan
type AllocatorEvent struct {
// Typ is the type of event (create / modify / delete)
Expand Down
9 changes: 7 additions & 2 deletions pkg/identity/cache/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type CachingIdentityAllocator struct {

identitiesPath string

// This field exists is to hand out references that are either for sending
// and receiving. It should not be used directly without converting it first
// to a AllocatorEventSendChan or AllocatorEventRecvChan.
events allocator.AllocatorEventChan
watcher identityWatcher

Expand Down Expand Up @@ -167,7 +170,7 @@ func (m *CachingIdentityAllocator) InitIdentityAllocator(client clientset.Interf

// Asynchronously set up the global identity allocator since it connects
// to the kvstore.
go func(owner IdentityAllocatorOwner, events allocator.AllocatorEventChan, minID, maxID idpool.ID) {
go func(owner IdentityAllocatorOwner, events allocator.AllocatorEventSendChan, minID, maxID idpool.ID) {
m.setupMutex.Lock()
defer m.setupMutex.Unlock()

Expand Down Expand Up @@ -273,7 +276,9 @@ func (m *CachingIdentityAllocator) Close() {

m.IdentityAllocator.Delete()
if m.events != nil {
close(m.events)
// Have the now only remaining writing party close the events channel,
// to ensure we don't panic with 'send on closed channel'.
m.localIdentities.close()
m.events = nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/identity/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func collectEvent(event allocator.AllocatorEvent, added, deleted IdentityCache)
}

// watch starts the identity watcher
func (w *identityWatcher) watch(events allocator.AllocatorEventChan) {
func (w *identityWatcher) watch(events allocator.AllocatorEventRecvChan) {

go func() {
for {
Expand Down
16 changes: 14 additions & 2 deletions pkg/identity/cache/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ type localIdentityCache struct {
nextNumericIdentity identity.NumericIdentity
minID identity.NumericIdentity
maxID identity.NumericIdentity
events allocator.AllocatorEventChan
events allocator.AllocatorEventSendChan
}

func newLocalIdentityCache(minID, maxID identity.NumericIdentity, events allocator.AllocatorEventChan) *localIdentityCache {
func newLocalIdentityCache(minID, maxID identity.NumericIdentity, events allocator.AllocatorEventSendChan) *localIdentityCache {
return &localIdentityCache{
identitiesByID: map[identity.NumericIdentity]*identity.Identity{},
identitiesByLabels: map[string]*identity.Identity{},
Expand Down Expand Up @@ -191,3 +191,15 @@ func (l *localIdentityCache) GetIdentities() map[identity.NumericIdentity]*ident

return cache
}

// close closes the events channel. The local identity cache is the writing
// party, hence also needs to close the channel.
func (l *localIdentityCache) close() {
l.mutex.Lock()
defer l.mutex.Unlock()

if l.events != nil {
close(l.events)
l.events = nil
}
}

0 comments on commit efb6f56

Please sign in to comment.