Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "policy: Make selectorcache callbacks lock-free" #16769

Merged
merged 1 commit into from
Jul 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/policy/l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/cilium/cilium/pkg/policy/api"
"github.com/cilium/cilium/pkg/policy/trafficdirection"
"github.com/cilium/cilium/pkg/u8proto"
cilium "github.com/cilium/proxy/go/cilium/api"
"github.com/cilium/proxy/go/cilium/api"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -426,8 +426,8 @@ func (l4Filter *L4Filter) ToMapState(policyOwner PolicyOwner, direction trafficd
}

// IdentitySelectionUpdated implements CachedSelectionUser interface
// This call is made from a single goroutine in FIFO order to keep add
// and delete events ordered properly. No locks are held.
// This call is made while holding name manager and selector cache
// locks, must beware of deadlocking!
//
// The caller is responsible for making sure the same identity is not
// present in both 'added' and 'deleted'.
Expand Down
5 changes: 0 additions & 5 deletions pkg/policy/resolve_deny_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package policy

import (
"time"

"github.com/cilium/cilium/pkg/checker"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/identity/cache"
Expand Down Expand Up @@ -358,7 +356,6 @@ func (ds *PolicyTestSuite) TestMapStateWithIngressDenyWildcard(c *C) {
identity.NumericIdentity(192): labels.ParseSelectLabelArray("id=resolve_test_1"),
}
testSelectorCache.UpdateIdentities(added1, nil)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 0)

// Have to remove circular reference before testing to avoid an infinite loop
Expand Down Expand Up @@ -436,14 +433,12 @@ func (ds *PolicyTestSuite) TestMapStateWithIngressDeny(c *C) {
testSelectorCache.UpdateIdentities(added1, nil)
// Cleanup the identities from the testSelectorCache
defer testSelectorCache.UpdateIdentities(nil, added1)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 3)

deleted1 := cache.IdentityCache{
identity.NumericIdentity(193): labels.ParseSelectLabelArray("id=resolve_test_1", "num=2"),
}
testSelectorCache.UpdateIdentities(nil, deleted1)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 4)

cachedSelectorWorld := testSelectorCache.FindCachedIdentitySelector(api.ReservedEndpointSelectors[labels.IDNameWorld])
Expand Down
4 changes: 0 additions & 4 deletions pkg/policy/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"sync"
"testing"
"time"

"github.com/cilium/cilium/pkg/checker"
"github.com/cilium/cilium/pkg/identity"
Expand Down Expand Up @@ -520,7 +519,6 @@ func (ds *PolicyTestSuite) TestMapStateWithIngressWildcard(c *C) {
identity.NumericIdentity(192): labels.ParseSelectLabelArray("id=resolve_test_1"),
}
testSelectorCache.UpdateIdentities(added1, nil)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 0)

// Have to remove circular reference before testing to avoid an infinite loop
Expand Down Expand Up @@ -600,14 +598,12 @@ func (ds *PolicyTestSuite) TestMapStateWithIngress(c *C) {
testSelectorCache.UpdateIdentities(added1, nil)
// Cleanup the identities from the testSelectorCache
defer testSelectorCache.UpdateIdentities(nil, added1)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 3)

deleted1 := cache.IdentityCache{
identity.NumericIdentity(193): labels.ParseSelectLabelArray("id=resolve_test_1", "num=2"),
}
testSelectorCache.UpdateIdentities(nil, deleted1)
time.Sleep(100 * time.Millisecond)
c.Assert(policy.policyMapChanges.changes, HasLen, 4)

cachedSelectorWorld := testSelectorCache.FindCachedIdentitySelector(api.ReservedEndpointSelectors[labels.IDNameWorld])
Expand Down
70 changes: 8 additions & 62 deletions pkg/policy/selectorcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"encoding/json"
"sort"
"strings"
"sync"
"sync/atomic"
"unsafe"

Expand Down Expand Up @@ -153,7 +152,7 @@ type identitySelector interface {
removeUser(CachedSelectionUser, identityNotifier) (last bool)

// This may be called while the NameManager lock is held
notifyUsers(sc *SelectorCache, added, deleted []identity.NumericIdentity)
notifyUsers(added, deleted []identity.NumericIdentity)

numUsers() int
}
Expand Down Expand Up @@ -184,17 +183,6 @@ func getIdentityCache(ids cache.IdentityCache) scIdentityCache {
return idCache
}

// userNotification stores the information needed to call
// IdentitySelectionUpdated callbacks to notify users of selector's
// identity changes. These are queued to be able to call the callbacks
// in FIFO order while not holding any locks.
type userNotification struct {
user CachedSelectionUser
selector CachedSelector
added []identity.NumericIdentity
deleted []identity.NumericIdentity
}

// SelectorCache caches identities, identity selectors, and the
// subsets of identities each selector selects.
type SelectorCache struct {
Expand All @@ -209,14 +197,6 @@ type SelectorCache struct {
selectors map[string]identitySelector

localIdentityNotifier identityNotifier

// userCond is a condition variable for receiving signals
// about addition of new elements in userNotes
userCond *sync.Cond
// userMutex protects userNotes and is linked to userCond
userMutex lock.Mutex
// userNotes holds a FIFO list of user notifications to be made
userNotes []userNotification
}

// GetModel returns the API model of the SelectorCache.
Expand All @@ -243,46 +223,12 @@ func (sc *SelectorCache) GetModel() models.SelectorCache {
return selCacheMdl
}

func (sc *SelectorCache) handleUserNotifications() {
for {
sc.userMutex.Lock()
for len(sc.userNotes) == 0 {
sc.userCond.Wait()
}
// get the current batch of notifications and release the lock so that SelectorCache
// can't block on userMutex while we call IdentitySelectionUpdated callbacks below.
notifications := sc.userNotes
sc.userNotes = nil
sc.userMutex.Unlock()

for _, n := range notifications {
n.user.IdentitySelectionUpdated(n.selector, n.added, n.deleted)
}
}
}

func (sc *SelectorCache) queueUserNotification(user CachedSelectionUser, selector CachedSelector, added, deleted []identity.NumericIdentity) {
sc.userMutex.Lock()
sc.userNotes = append(sc.userNotes, userNotification{
user: user,
selector: selector,
added: added,
deleted: deleted,
})
sc.userMutex.Unlock()
sc.userCond.Signal()
}

// NewSelectorCache creates a new SelectorCache with the given identities.
func NewSelectorCache(ids cache.IdentityCache) *SelectorCache {
sc := &SelectorCache{
return &SelectorCache{
idCache: getIdentityCache(ids),
selectors: make(map[string]identitySelector),
}
sc.userCond = sync.NewCond(&sc.userMutex)
go sc.handleUserNotifications()

return sc
}

// SetLocalIdentityNotifier injects the provided identityNotifier into the
Expand Down Expand Up @@ -429,10 +375,10 @@ type fqdnSelector struct {
//
// The caller is responsible for making sure the same identity is not
// present in both 'added' and 'deleted'.
func (f *fqdnSelector) notifyUsers(sc *SelectorCache, added, deleted []identity.NumericIdentity) {
func (f *fqdnSelector) notifyUsers(added, deleted []identity.NumericIdentity) {
for user := range f.users {
// pass 'f' to the user as '*fqdnSelector'
sc.queueUserNotification(user, f, added, deleted)
user.IdentitySelectionUpdated(f, added, deleted)
}
}

Expand Down Expand Up @@ -488,10 +434,10 @@ type labelIdentitySelector struct {
//
// The caller is responsible for making sure the same identity is not
// present in both 'added' and 'deleted'.
func (l *labelIdentitySelector) notifyUsers(sc *SelectorCache, added, deleted []identity.NumericIdentity) {
func (l *labelIdentitySelector) notifyUsers(added, deleted []identity.NumericIdentity) {
for user := range l.users {
// pass 'l' to the user as '*labelIdentitySelector'
sc.queueUserNotification(user, l, added, deleted)
user.IdentitySelectionUpdated(l, added, deleted)
}
}

Expand Down Expand Up @@ -621,7 +567,7 @@ func (sc *SelectorCache) updateFQDNSelector(fqdnSelec api.FQDNSelector, identiti
// getting the CIDR identities which correspond to this FQDNSelector. This
// is the primary difference here between FQDNSelector and IdentitySelector.
fqdnSel.updateSelections()
fqdnSel.notifyUsers(sc, added, deleted) // disjoint sets, see the comment above
fqdnSel.notifyUsers(added, deleted) // disjoint sets, see the comment above
}

// AddFQDNSelector adds the given api.FQDNSelector in to the selector cache. If
Expand Down Expand Up @@ -880,7 +826,7 @@ func (sc *SelectorCache) UpdateIdentities(added, deleted cache.IdentityCache) {
}
if len(dels)+len(adds) > 0 {
idSel.updateSelections()
idSel.notifyUsers(sc, adds, dels)
idSel.notifyUsers(adds, dels)
}
case *fqdnSelector:
// This is a no-op right now. We don't encode in the identities
Expand Down