Skip to content

Commit

Permalink
Revert "policy: Make selectorcache callbacks lock-free"
Browse files Browse the repository at this point in the history
This reverts commit a75599d.
has it seems to be causing a lot of FQDN flakes through the entire CI.

Signed-off-by: André Martins <andre@cilium.io>
  • Loading branch information
aanm committed Jul 5, 2021
1 parent 9a73c92 commit a97bd0d
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 160 deletions.
6 changes: 3 additions & 3 deletions pkg/policy/l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/cilium/cilium/pkg/policy/api"
"github.com/cilium/cilium/pkg/policy/trafficdirection"
"github.com/cilium/cilium/pkg/u8proto"
cilium "github.com/cilium/proxy/go/cilium/api"
"github.com/cilium/proxy/go/cilium/api"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -426,8 +426,8 @@ func (l4Filter *L4Filter) ToMapState(policyOwner PolicyOwner, direction trafficd
}

// IdentitySelectionUpdated implements CachedSelectionUser interface
// This call is made from a single goroutine in FIFO order to keep add
// and delete events ordered properly. No locks are held.
// This call is made while holding name manager and selector cache
// locks, must beware of deadlocking!
//
// The caller is responsible for making sure the same identity is not
// present in both 'added' and 'deleted'.
Expand Down
5 changes: 0 additions & 5 deletions pkg/policy/resolve_deny_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package policy

import (
"time"

"github.com/cilium/cilium/pkg/checker"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/identity/cache"
Expand Down Expand Up @@ -358,7 +356,6 @@ func (ds *PolicyTestSuite) TestMapStateWithIngressDenyWildcard(c *C) {
identity.NumericIdentity(192): labels.ParseSelectLabelArray("id=resolve_test_1"),
}
testSelectorCache.UpdateIdentities(added1, nil)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 0)

// Have to remove circular reference before testing to avoid an infinite loop
Expand Down Expand Up @@ -436,14 +433,12 @@ func (ds *PolicyTestSuite) TestMapStateWithIngressDeny(c *C) {
testSelectorCache.UpdateIdentities(added1, nil)
// Cleanup the identities from the testSelectorCache
defer testSelectorCache.UpdateIdentities(nil, added1)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 3)

deleted1 := cache.IdentityCache{
identity.NumericIdentity(193): labels.ParseSelectLabelArray("id=resolve_test_1", "num=2"),
}
testSelectorCache.UpdateIdentities(nil, deleted1)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 4)

cachedSelectorWorld := testSelectorCache.FindCachedIdentitySelector(api.ReservedEndpointSelectors[labels.IDNameWorld])
Expand Down
4 changes: 0 additions & 4 deletions pkg/policy/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"sync"
"testing"
"time"

"github.com/cilium/cilium/pkg/checker"
"github.com/cilium/cilium/pkg/identity"
Expand Down Expand Up @@ -520,7 +519,6 @@ func (ds *PolicyTestSuite) TestMapStateWithIngressWildcard(c *C) {
identity.NumericIdentity(192): labels.ParseSelectLabelArray("id=resolve_test_1"),
}
testSelectorCache.UpdateIdentities(added1, nil)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 0)

// Have to remove circular reference before testing to avoid an infinite loop
Expand Down Expand Up @@ -600,14 +598,12 @@ func (ds *PolicyTestSuite) TestMapStateWithIngress(c *C) {
testSelectorCache.UpdateIdentities(added1, nil)
// Cleanup the identities from the testSelectorCache
defer testSelectorCache.UpdateIdentities(nil, added1)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 3)

deleted1 := cache.IdentityCache{
identity.NumericIdentity(193): labels.ParseSelectLabelArray("id=resolve_test_1", "num=2"),
}
testSelectorCache.UpdateIdentities(nil, deleted1)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 4)

cachedSelectorWorld := testSelectorCache.FindCachedIdentitySelector(api.ReservedEndpointSelectors[labels.IDNameWorld])
Expand Down
70 changes: 8 additions & 62 deletions pkg/policy/selectorcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"encoding/json"
"sort"
"strings"
"sync"
"sync/atomic"
"unsafe"

Expand Down Expand Up @@ -153,7 +152,7 @@ type identitySelector interface {
removeUser(CachedSelectionUser, identityNotifier) (last bool)

// This may be called while the NameManager lock is held
notifyUsers(sc *SelectorCache, added, deleted []identity.NumericIdentity)
notifyUsers(added, deleted []identity.NumericIdentity)

numUsers() int
}
Expand Down Expand Up @@ -184,17 +183,6 @@ func getIdentityCache(ids cache.IdentityCache) scIdentityCache {
return idCache
}

// userNotification stores the information needed to call
// IdentitySelectionUpdated callbacks to notify users of selector's
// identity changes. These are queued to be able to call the callbacks
// in FIFO order while not holding any locks.
type userNotification struct {
user CachedSelectionUser
selector CachedSelector
added []identity.NumericIdentity
deleted []identity.NumericIdentity
}

// SelectorCache caches identities, identity selectors, and the
// subsets of identities each selector selects.
type SelectorCache struct {
Expand All @@ -209,14 +197,6 @@ type SelectorCache struct {
selectors map[string]identitySelector

localIdentityNotifier identityNotifier

// userCond is a condition variable for receiving signals
// about addition of new elements in userNotes
userCond *sync.Cond
// userMutex protects userNotes and is linked to userCond
userMutex lock.Mutex
// userNotes holds a FIFO list of user notifications to be made
userNotes []userNotification
}

// GetModel returns the API model of the SelectorCache.
Expand All @@ -243,46 +223,12 @@ func (sc *SelectorCache) GetModel() models.SelectorCache {
return selCacheMdl
}

func (sc *SelectorCache) handleUserNotifications() {
for {
sc.userMutex.Lock()
for len(sc.userNotes) == 0 {
sc.userCond.Wait()
}
// get the current batch of notifications and release the lock so that SelectorCache
// can't block on userMutex while we call IdentitySelectionUpdated callbacks below.
notifications := sc.userNotes
sc.userNotes = nil
sc.userMutex.Unlock()

for _, n := range notifications {
n.user.IdentitySelectionUpdated(n.selector, n.added, n.deleted)
}
}
}

func (sc *SelectorCache) queueUserNotification(user CachedSelectionUser, selector CachedSelector, added, deleted []identity.NumericIdentity) {
sc.userMutex.Lock()
sc.userNotes = append(sc.userNotes, userNotification{
user: user,
selector: selector,
added: added,
deleted: deleted,
})
sc.userMutex.Unlock()
sc.userCond.Signal()
}

// NewSelectorCache creates a new SelectorCache with the given identities.
func NewSelectorCache(ids cache.IdentityCache) *SelectorCache {
sc := &SelectorCache{
return &SelectorCache{
idCache: getIdentityCache(ids),
selectors: make(map[string]identitySelector),
}
sc.userCond = sync.NewCond(&sc.userMutex)
go sc.handleUserNotifications()

return sc
}

// SetLocalIdentityNotifier injects the provided identityNotifier into the
Expand Down Expand Up @@ -429,10 +375,10 @@ type fqdnSelector struct {
//
// The caller is responsible for making sure the same identity is not
// present in both 'added' and 'deleted'.
func (f *fqdnSelector) notifyUsers(sc *SelectorCache, added, deleted []identity.NumericIdentity) {
func (f *fqdnSelector) notifyUsers(added, deleted []identity.NumericIdentity) {
for user := range f.users {
// pass 'f' to the user as '*fqdnSelector'
sc.queueUserNotification(user, f, added, deleted)
user.IdentitySelectionUpdated(f, added, deleted)
}
}

Expand Down Expand Up @@ -488,10 +434,10 @@ type labelIdentitySelector struct {
//
// The caller is responsible for making sure the same identity is not
// present in both 'added' and 'deleted'.
func (l *labelIdentitySelector) notifyUsers(sc *SelectorCache, added, deleted []identity.NumericIdentity) {
func (l *labelIdentitySelector) notifyUsers(added, deleted []identity.NumericIdentity) {
for user := range l.users {
// pass 'l' to the user as '*labelIdentitySelector'
sc.queueUserNotification(user, l, added, deleted)
user.IdentitySelectionUpdated(l, added, deleted)
}
}

Expand Down Expand Up @@ -621,7 +567,7 @@ func (sc *SelectorCache) updateFQDNSelector(fqdnSelec api.FQDNSelector, identiti
// getting the CIDR identities which correspond to this FQDNSelector. This
// is the primary difference here between FQDNSelector and IdentitySelector.
fqdnSel.updateSelections()
fqdnSel.notifyUsers(sc, added, deleted) // disjoint sets, see the comment above
fqdnSel.notifyUsers(added, deleted) // disjoint sets, see the comment above
}

// AddFQDNSelector adds the given api.FQDNSelector in to the selector cache. If
Expand Down Expand Up @@ -880,7 +826,7 @@ func (sc *SelectorCache) UpdateIdentities(added, deleted cache.IdentityCache) {
}
if len(dels)+len(adds) > 0 {
idSel.updateSelections()
idSel.notifyUsers(sc, adds, dels)
idSel.notifyUsers(adds, dels)
}
case *fqdnSelector:
// This is a no-op right now. We don't encode in the identities
Expand Down

0 comments on commit a97bd0d

Please sign in to comment.