Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v1.14] ipcache: fix flapping labels in SelectorCache when reserved:host identity has multiple IPs #28418

Merged
merged 1 commit into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 12 additions & 5 deletions pkg/identity/reserved.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,40 @@ var (
)

// AddReservedIdentity adds the reserved numeric identity with the respective
// label into the map of reserved identity cache.
func AddReservedIdentity(ni NumericIdentity, lbl string) {
// label into the map of reserved identity cache, and returns the resulting Identity.
// This identity must not be mutated!
func AddReservedIdentity(ni NumericIdentity, lbl string) *Identity {
identity := NewIdentity(ni, labels.Labels{lbl: labels.NewLabel(lbl, "", labels.LabelSourceReserved)})
cacheMU.Lock()
reservedIdentityCache[ni] = identity
cacheMU.Unlock()
return identity
}

// AddReservedIdentityWithLabels is the same as AddReservedIdentity but accepts
// multiple labels.
func AddReservedIdentityWithLabels(ni NumericIdentity, lbls labels.Labels) {
// multiple labels. Returns the resulting Identity.
// This identity must not be mutated!
func AddReservedIdentityWithLabels(ni NumericIdentity, lbls labels.Labels) *Identity {
identity := NewIdentity(ni, lbls)
cacheMU.Lock()
reservedIdentityCache[ni] = identity
cacheMU.Unlock()
return identity
}

// LookupReservedIdentity looks up a reserved identity by its NumericIdentity
// and returns it if found. Returns nil if not found.
// This identity must not be mutated!
func LookupReservedIdentity(ni NumericIdentity) *Identity {
cacheMU.RLock()
defer cacheMU.RUnlock()
return reservedIdentityCache[ni]
}

func init() {
iterateReservedIdentityLabels(AddReservedIdentityWithLabels)
iterateReservedIdentityLabels(func(ni NumericIdentity, lbls labels.Labels) {
AddReservedIdentityWithLabels(ni, lbls)
})
}

// IterateReservedIdentities iterates over all reserved identities and
Expand Down
7 changes: 6 additions & 1 deletion pkg/ipcache/ipcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
cmtypes "github.com/cilium/cilium/pkg/clustermesh/types"
identityPkg "github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/ipcache/types/fake"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/source"
testidentity "github.com/cilium/cilium/pkg/testutils/identity"
"github.com/cilium/cilium/pkg/types"
Expand All @@ -34,6 +35,7 @@ type IPCacheTestSuite struct {
var (
_ = Suite(&IPCacheTestSuite{})
IPIdentityCache *IPCache
PolicyHandler *mockUpdater
)

func Test(t *testing.T) {
Expand All @@ -43,10 +45,13 @@ func Test(t *testing.T) {
func (s *IPCacheTestSuite) SetUpTest(c *C) {
ctx, cancel := context.WithCancel(context.Background())
allocator := testidentity.NewMockIdentityAllocator(nil)
PolicyHandler = &mockUpdater{
identities: make(map[identityPkg.NumericIdentity]labels.LabelArray),
}
IPIdentityCache = NewIPCache(&Configuration{
Context: ctx,
IdentityAllocator: allocator,
PolicyHandler: &mockUpdater{},
PolicyHandler: PolicyHandler,
DatapathHandler: &mockTriggerer{},
NodeIDHandler: &fake.FakeNodeIDHandler{},
})
Expand Down
70 changes: 66 additions & 4 deletions pkg/ipcache/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,22 @@ type metadata struct {
// generate updates into the ipcache, policy engine and datapath.
queuedChangesMU lock.Mutex
queuedPrefixes map[netip.Prefix]struct{}

// reservedHostLock protects the localHostLabels map. Holders must
// always take the metadata read lock first.
reservedHostLock lock.Mutex

// reservedHostLabels collects all labels that apply to the host identity.
// see updateLocalHostLabels() for more info.
reservedHostLabels map[netip.Prefix]labels.Labels
}

func newMetadata() *metadata {
return &metadata{
m: make(map[netip.Prefix]PrefixInfo),
queuedPrefixes: make(map[netip.Prefix]struct{}),

reservedHostLabels: make(map[netip.Prefix]labels.Labels),
}
}

Expand Down Expand Up @@ -204,14 +214,13 @@ func (ipc *IPCache) InjectLabels(ctx context.Context, modifiedPrefixes []netip.P
oldID, entryExists := ipc.LookupByIP(pstr)
oldTunnelIP, oldEncryptionKey := ipc.GetHostIPCache(pstr)
prefixInfo := ipc.metadata.getLocked(prefix)
var newID *identity.Identity
if prefixInfo == nil {
if !entryExists {
// Already deleted, no new metadata to associate
continue
} // else continue below to remove the old entry
} else {
var newID *identity.Identity

// Insert to propagate the updated set of labels after removal.
newID, _, err = ipc.resolveIdentity(ctx, prefix, prefixInfo, identity.InvalidIdentity)
if err != nil {
Expand Down Expand Up @@ -303,6 +312,25 @@ func (ipc *IPCache) InjectLabels(ctx context.Context, modifiedPrefixes []netip.P
entriesToDelete[prefix] = oldID
}
}

// The reserved:host identity is special: the numeric ID is fixed,
// and the set of labels is mutable. Thus, whenever it changes,
// we must always update the SelectorCache (normally, this is elided
// when no changes are present).
if newID != nil && newID.ID == identity.ReservedIdentityHost {
idsToAdd[newID.ID] = newID.Labels.LabelArray()
}

// Again, more reserved:host bookkeeping: if this prefix is no longer ID 1 (because
// it is being deleted or changing IDs), we need to recompute the labels
// for reserved:host and push that to the SelectorCache
if entryExists && oldID.ID == identity.ReservedIdentityHost &&
(newID == nil || newID.ID != identity.ReservedIdentityHost) {

i := ipc.updateReservedHostLabels(prefix, nil)
idsToAdd[i.ID] = i.Labels.LabelArray()
}

}
// Don't hold lock while calling UpdateIdentities, as it will otherwise run into a deadlock
ipc.metadata.RUnlock()
Expand Down Expand Up @@ -452,8 +480,12 @@ func (ipc *IPCache) resolveIdentity(ctx context.Context, prefix netip.Prefix, in
// for itself). For all other identities, we avoid modifying
// the labels at runtime and instead opt to allocate new
// identities below.
identity.AddReservedIdentityWithLabels(identity.ReservedIdentityHost, lbls)
return identity.LookupReservedIdentity(identity.ReservedIdentityHost), false, nil
//
// As an extra gotcha, we need need to merge all labels for all IPs
// that resolve to the reserved:host identity, otherwise we can
// flap identities labels depending on which prefix writes first. See GH-28259.
i := ipc.updateReservedHostLabels(prefix, lbls)
return i, false, nil
}

// If no other labels are associated with this IP, we assume that it's
Expand Down Expand Up @@ -483,6 +515,36 @@ func (ipc *IPCache) resolveIdentity(ctx context.Context, prefix netip.Prefix, in
return id, isNew, err
}

// updateReservedHostLabels adds or removes labels that apply to the local host.
// The `reserved:host` identity is special: the numeric identity is fixed
// and the set of labels is mutable. (The datapath requires this.) So,
// we need to determine all prefixes that have the `reserved:host` label and
// capture their labels. Then, we must aggregate *all* labels from all prefixes and
// update the labels that correspond to the `reserved:host` identity.
//
// This could be termed a meta-ipcache. The ipcache metadata layer aggregates
// an arbitrary set of resources and labels to a prefix. Here, we are aggregating an arbitrary
// set of prefixes and labels to an identity.
func (ipc *IPCache) updateReservedHostLabels(prefix netip.Prefix, lbls labels.Labels) *identity.Identity {
ipc.metadata.reservedHostLock.Lock()
defer ipc.metadata.reservedHostLock.Unlock()
if lbls == nil {
delete(ipc.metadata.reservedHostLabels, prefix)
} else {
ipc.metadata.reservedHostLabels[prefix] = lbls
}

// aggregate all labels and update static identity
newLabels := labels.NewFrom(labels.LabelHost)
for _, l := range ipc.metadata.reservedHostLabels {
newLabels.MergeLabels(l)
}

log.WithField(logfields.Labels, newLabels).Debug("Merged labels for reserved:host identity")

return identity.AddReservedIdentityWithLabels(identity.ReservedIdentityHost, newLabels)
}

// RemoveLabelsExcluded removes the given labels from all IPs inside the IDMD
// except for the IPs / prefixes inside the given excluded set.
//
Expand Down
107 changes: 103 additions & 4 deletions pkg/ipcache/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,88 @@ func TestInjectLabels(t *testing.T) {
assert.Equal(t, identity.ReservedIdentityWorld, IPIdentityCache.ipToIdentityCache["0.0.0.0/0"].ID)
}

// Test that when multiple IPs have the `resolved:host` label, we correctly
// aggregate all labels *and* update the selector cache correctly.
// This reproduces GH-28259.
func TestUpdateLocalNode(t *testing.T) {
cancel := setupTest(t)
defer cancel()

ctx := context.Background()

bothLabels := labels.Labels{}
bothLabels.MergeLabels(labels.LabelHost)
bothLabels.MergeLabels(labels.LabelKubeAPIServer)

selectorCacheHas := func(lbls labels.Labels) {
t.Helper()
id := PolicyHandler.identities[identity.ReservedIdentityHost]
assert.NotNil(t, id)
assert.Equal(t, lbls.LabelArray(), id)
}

injectLabels := func(ip netip.Prefix) {
t.Helper()
remaining, err := IPIdentityCache.InjectLabels(ctx, []netip.Prefix{ip})
assert.NoError(t, err)
assert.Len(t, remaining, 0)
}

idIs := func(ip netip.Prefix, id identity.NumericIdentity) {
t.Helper()
assert.Equal(t, IPIdentityCache.ipToIdentityCache[ip.String()].ID, id)
}

// Mark .4 as local host
IPIdentityCache.metadata.upsertLocked(inClusterPrefix, source.Local, "node-uid", labels.LabelHost)
injectLabels(inClusterPrefix)
idIs(inClusterPrefix, identity.ReservedIdentityHost)
selectorCacheHas(labels.LabelHost)

// Mark .4 as kube-apiserver
// Note that in the actual code, we use `source.KubeAPIServer`. However,
// we use the same source in test case to try and ferret out more bugs.
IPIdentityCache.metadata.upsertLocked(inClusterPrefix, source.Local, "kube-uid", labels.LabelKubeAPIServer)
injectLabels(inClusterPrefix)
idIs(inClusterPrefix, identity.ReservedIdentityHost)
selectorCacheHas(bothLabels)

// Mark .5 as local host
IPIdentityCache.metadata.upsertLocked(inClusterPrefix2, source.Local, "node-uid", labels.LabelHost)
injectLabels(inClusterPrefix2)
idIs(inClusterPrefix, identity.ReservedIdentityHost)
idIs(inClusterPrefix2, identity.ReservedIdentityHost)
selectorCacheHas(bothLabels)

// remove kube-apiserver from .4
IPIdentityCache.metadata.remove(inClusterPrefix, "kube-uid", labels.LabelKubeAPIServer)
injectLabels(inClusterPrefix)
idIs(inClusterPrefix, identity.ReservedIdentityHost)
idIs(inClusterPrefix2, identity.ReservedIdentityHost)
selectorCacheHas(labels.LabelHost)

// add kube-apiserver back to .4
IPIdentityCache.metadata.upsertLocked(inClusterPrefix, source.Local, "kube-uid", labels.LabelKubeAPIServer)
injectLabels(inClusterPrefix)
idIs(inClusterPrefix, identity.ReservedIdentityHost)
idIs(inClusterPrefix2, identity.ReservedIdentityHost)
selectorCacheHas(bothLabels)

// remove host from .4
IPIdentityCache.metadata.remove(inClusterPrefix, "node-uid", labels.LabelHost)
injectLabels(inClusterPrefix)

// Verify that .4 now has just kube-apiserver and CIDRs
idIs(inClusterPrefix, identity.LocalIdentityFlag) // the first CIDR identity
id := PolicyHandler.identities[identity.LocalIdentityFlag]
assert.True(t, id.Has("reserved.kube-apiserver"))
assert.True(t, id.Has("cidr."+inClusterPrefix.String()))

// verify that id 1 is now just reserved:host
idIs(inClusterPrefix2, identity.ReservedIdentityHost)
selectorCacheHas(labels.LabelHost)
}

// TestInjectExisting tests "upgrading" an existing identity to the apiserver.
// This is a common occurrence on startup - and this tests ensures we don't
// regress the known issue in GH-24502
Expand Down Expand Up @@ -353,7 +435,7 @@ func TestOverrideIdentity(t *testing.T) {

ipc := NewIPCache(&Configuration{
IdentityAllocator: allocator,
PolicyHandler: &mockUpdater{},
PolicyHandler: newMockUpdater(),
DatapathHandler: &mockTriggerer{},
})
ctx := context.Background()
Expand Down Expand Up @@ -501,10 +583,11 @@ func setupTest(t *testing.T) (cleanup func()) {

ctx, cancel := context.WithCancel(context.Background())
allocator := testidentity.NewMockIdentityAllocator(nil)
PolicyHandler = newMockUpdater()
IPIdentityCache = NewIPCache(&Configuration{
Context: ctx,
IdentityAllocator: allocator,
PolicyHandler: &mockUpdater{},
PolicyHandler: PolicyHandler,
DatapathHandler: &mockTriggerer{},
NodeIDHandler: &fake.FakeNodeIDHandler{},
})
Expand All @@ -518,9 +601,25 @@ func setupTest(t *testing.T) (cleanup func()) {
}
}

type mockUpdater struct{}
func newMockUpdater() *mockUpdater {
return &mockUpdater{
identities: make(map[identity.NumericIdentity]labels.LabelArray),
}
}

func (m *mockUpdater) UpdateIdentities(_, _ cache.IdentityCache, _ *sync.WaitGroup) {}
type mockUpdater struct {
identities map[identity.NumericIdentity]labels.LabelArray
}

func (m *mockUpdater) UpdateIdentities(added, deleted cache.IdentityCache, _ *sync.WaitGroup) {
for nid, lbls := range added {
m.identities[nid] = lbls
}

for nid := range deleted {
delete(m.identities, nid)
}
}

type mockTriggerer struct{}

Expand Down