Skip to content

Commit

Permalink
policy: Make selectorcache callbacks lock-free
Browse files Browse the repository at this point in the history
Make IdentitySelectionUpdated() callbacks lock-free by queueing them
while still holding selectorcache lock (to keep FIFO order) and
calling from a goroutine not holding any locks. This prevents
deadlocks caused by the implementation of IdentitySelectionUpdated()
taking locks such as endpoint or selectorcache locks.

Signed-off-by: Jarno Rajahalme <jarno@isovalent.com>
  • Loading branch information
jrajahalme authored and christarazi committed Jun 22, 2021
1 parent 92d851d commit a75599d
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 38 deletions.
6 changes: 3 additions & 3 deletions pkg/policy/l4.go
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/cilium/cilium/pkg/policy/api"
"github.com/cilium/cilium/pkg/policy/trafficdirection"
"github.com/cilium/cilium/pkg/u8proto"
"github.com/cilium/proxy/go/cilium/api"
cilium "github.com/cilium/proxy/go/cilium/api"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -426,8 +426,8 @@ func (l4Filter *L4Filter) ToMapState(policyOwner PolicyOwner, direction trafficd
}

// IdentitySelectionUpdated implements CachedSelectionUser interface
// This call is made while holding name manager and selector cache
// locks, must beware of deadlocking!
// This call is made from a single goroutine in FIFO order to keep add
// and delete events ordered properly. No locks are held.
//
// The caller is responsible for making sure the same identity is not
// present in both 'added' and 'deleted'.
Expand Down
5 changes: 5 additions & 0 deletions pkg/policy/resolve_deny_test.go
Expand Up @@ -17,6 +17,8 @@
package policy

import (
"time"

"github.com/cilium/cilium/pkg/checker"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/identity/cache"
Expand Down Expand Up @@ -356,6 +358,7 @@ func (ds *PolicyTestSuite) TestMapStateWithIngressDenyWildcard(c *C) {
identity.NumericIdentity(192): labels.ParseSelectLabelArray("id=resolve_test_1"),
}
testSelectorCache.UpdateIdentities(added1, nil)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 0)

// Have to remove circular reference before testing to avoid an infinite loop
Expand Down Expand Up @@ -433,12 +436,14 @@ func (ds *PolicyTestSuite) TestMapStateWithIngressDeny(c *C) {
testSelectorCache.UpdateIdentities(added1, nil)
// Cleanup the identities from the testSelectorCache
defer testSelectorCache.UpdateIdentities(nil, added1)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 3)

deleted1 := cache.IdentityCache{
identity.NumericIdentity(193): labels.ParseSelectLabelArray("id=resolve_test_1", "num=2"),
}
testSelectorCache.UpdateIdentities(nil, deleted1)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 4)

cachedSelectorWorld := testSelectorCache.FindCachedIdentitySelector(api.ReservedEndpointSelectors[labels.IDNameWorld])
Expand Down
4 changes: 4 additions & 0 deletions pkg/policy/resolve_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"sync"
"testing"
"time"

"github.com/cilium/cilium/pkg/checker"
"github.com/cilium/cilium/pkg/identity"
Expand Down Expand Up @@ -519,6 +520,7 @@ func (ds *PolicyTestSuite) TestMapStateWithIngressWildcard(c *C) {
identity.NumericIdentity(192): labels.ParseSelectLabelArray("id=resolve_test_1"),
}
testSelectorCache.UpdateIdentities(added1, nil)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 0)

// Have to remove circular reference before testing to avoid an infinite loop
Expand Down Expand Up @@ -598,12 +600,14 @@ func (ds *PolicyTestSuite) TestMapStateWithIngress(c *C) {
testSelectorCache.UpdateIdentities(added1, nil)
// Cleanup the identities from the testSelectorCache
defer testSelectorCache.UpdateIdentities(nil, added1)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 3)

deleted1 := cache.IdentityCache{
identity.NumericIdentity(193): labels.ParseSelectLabelArray("id=resolve_test_1", "num=2"),
}
testSelectorCache.UpdateIdentities(nil, deleted1)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 4)

cachedSelectorWorld := testSelectorCache.FindCachedIdentitySelector(api.ReservedEndpointSelectors[labels.IDNameWorld])
Expand Down
70 changes: 62 additions & 8 deletions pkg/policy/selectorcache.go
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"sort"
"strings"
"sync"
"sync/atomic"
"unsafe"

Expand Down Expand Up @@ -152,7 +153,7 @@ type identitySelector interface {
removeUser(CachedSelectionUser, identityNotifier) (last bool)

// This may be called while the NameManager lock is held
notifyUsers(added, deleted []identity.NumericIdentity)
notifyUsers(sc *SelectorCache, added, deleted []identity.NumericIdentity)

numUsers() int
}
Expand Down Expand Up @@ -183,6 +184,17 @@ func getIdentityCache(ids cache.IdentityCache) scIdentityCache {
return idCache
}

// userNotification stores the information needed to call
// IdentitySelectionUpdated callbacks to notify users of selector's
// identity changes. These are queued to be able to call the callbacks
// in FIFO order while not holding any locks.
type userNotification struct {
user CachedSelectionUser
selector CachedSelector
added []identity.NumericIdentity
deleted []identity.NumericIdentity
}

// SelectorCache caches identities, identity selectors, and the
// subsets of identities each selector selects.
type SelectorCache struct {
Expand All @@ -197,6 +209,14 @@ type SelectorCache struct {
selectors map[string]identitySelector

localIdentityNotifier identityNotifier

// userCond is a condition variable for receiving signals
// about addition of new elements in userNotes
userCond *sync.Cond
// userMutex protects userNotes and is linked to userCond
userMutex lock.Mutex
// userNotes holds a FIFO list of user notifications to be made
userNotes []userNotification
}

// GetModel returns the API model of the SelectorCache.
Expand All @@ -223,12 +243,46 @@ func (sc *SelectorCache) GetModel() models.SelectorCache {
return selCacheMdl
}

func (sc *SelectorCache) handleUserNotifications() {
for {
sc.userMutex.Lock()
for len(sc.userNotes) == 0 {
sc.userCond.Wait()
}
// get the current batch of notifications and release the lock so that SelectorCache
// can't block on userMutex while we call IdentitySelectionUpdated callbacks below.
notifications := sc.userNotes
sc.userNotes = nil
sc.userMutex.Unlock()

for _, n := range notifications {
n.user.IdentitySelectionUpdated(n.selector, n.added, n.deleted)
}
}
}

func (sc *SelectorCache) queueUserNotification(user CachedSelectionUser, selector CachedSelector, added, deleted []identity.NumericIdentity) {
sc.userMutex.Lock()
sc.userNotes = append(sc.userNotes, userNotification{
user: user,
selector: selector,
added: added,
deleted: deleted,
})
sc.userMutex.Unlock()
sc.userCond.Signal()
}

// NewSelectorCache creates a new SelectorCache with the given identities.
func NewSelectorCache(ids cache.IdentityCache) *SelectorCache {
return &SelectorCache{
sc := &SelectorCache{
idCache: getIdentityCache(ids),
selectors: make(map[string]identitySelector),
}
sc.userCond = sync.NewCond(&sc.userMutex)
go sc.handleUserNotifications()

return sc
}

// SetLocalIdentityNotifier injects the provided identityNotifier into the
Expand Down Expand Up @@ -375,10 +429,10 @@ type fqdnSelector struct {
//
// The caller is responsible for making sure the same identity is not
// present in both 'added' and 'deleted'.
func (f *fqdnSelector) notifyUsers(added, deleted []identity.NumericIdentity) {
func (f *fqdnSelector) notifyUsers(sc *SelectorCache, added, deleted []identity.NumericIdentity) {
for user := range f.users {
// pass 'f' to the user as '*fqdnSelector'
user.IdentitySelectionUpdated(f, added, deleted)
sc.queueUserNotification(user, f, added, deleted)
}
}

Expand Down Expand Up @@ -434,10 +488,10 @@ type labelIdentitySelector struct {
//
// The caller is responsible for making sure the same identity is not
// present in both 'added' and 'deleted'.
func (l *labelIdentitySelector) notifyUsers(added, deleted []identity.NumericIdentity) {
func (l *labelIdentitySelector) notifyUsers(sc *SelectorCache, added, deleted []identity.NumericIdentity) {
for user := range l.users {
// pass 'l' to the user as '*labelIdentitySelector'
user.IdentitySelectionUpdated(l, added, deleted)
sc.queueUserNotification(user, l, added, deleted)
}
}

Expand Down Expand Up @@ -567,7 +621,7 @@ func (sc *SelectorCache) updateFQDNSelector(fqdnSelec api.FQDNSelector, identiti
// getting the CIDR identities which correspond to this FQDNSelector. This
// is the primary difference here between FQDNSelector and IdentitySelector.
fqdnSel.updateSelections()
fqdnSel.notifyUsers(added, deleted) // disjoint sets, see the comment above
fqdnSel.notifyUsers(sc, added, deleted) // disjoint sets, see the comment above
}

// AddFQDNSelector adds the given api.FQDNSelector in to the selector cache. If
Expand Down Expand Up @@ -826,7 +880,7 @@ func (sc *SelectorCache) UpdateIdentities(added, deleted cache.IdentityCache) {
}
if len(dels)+len(adds) > 0 {
idSel.updateSelections()
idSel.notifyUsers(adds, dels)
idSel.notifyUsers(sc, adds, dels)
}
case *fqdnSelector:
// This is a no-op right now. We don't encode in the identities
Expand Down

0 comments on commit a75599d

Please sign in to comment.