Skip to content

Commit

Permalink
ipcache: aggregate labels from all IPs with local host identity
Browse files Browse the repository at this point in the history
[ upstream commit d50a525 ]

[ Backporter's notes: minor renames, otherwise a clean backport ]

The `reserved:host` identity is special: the numeric identity is fixed and the
set of labels is mutable. (The datapath requires this.) So, we need to
determine all prefixes that have the `reserved:host` label and capture their
labels. Then, we must aggregate *all* labels from all IPs and insert them as
the `reserved:host` identity labels.

However, the code as written has a race condition whenever the local node has
more than one IP address. This can happen when, for example vxlan or ipv6 is
enabled. The basic sequence is this:

1. Insert IP A as `reserved:host` in to the ipcache. ID 1 now has labels
  `reserved:host`
2. Insert IP A as `reserved:kube-apiserver` in to the ipcache. ID 1 is updated
  with labels `reserved:host, reserved:kube-apsierver`
3. Insert IP B as `reserved:host` in to the ipcache. ID 1 is updated with
  labels `reserved:host`. And now policies that select
  `reserved:kube-apiserver` are broken

Likewise, we need to always update the SelectorCache; we cannot short-circuit
if the ipcache already has that identity. Again, this is needed because the
identity is mutable. So this bug can take another form:

1. Insert IP A as `reserved:host` in to the ipcache. Because IP A is not known
to the ipcache, treat ID 1 as a new identity and update the selector cache
2. Insert IP A as `reserved:kube-apiserver`. Mutate the labels of ID 1. But,
because IP A already has ID 1, short-circuit the update to the selector cache
(if the Source is the same, which it _may_ be).
3. Now the selector cache has incorrect labels for ID 1.

Without this, when there are multiple IPs with the host label, the identity may
flap and the SelectorCache may be missing updates.

Fixes: #28259
Fixes: e0d403a
Fixes: 308c142

Signed-off-by: Casey Callendrello <cdc@isovalent.com>
  • Loading branch information
squeed authored and ti-mo committed Oct 6, 2023
1 parent 0e5bc15 commit 5eb57b9
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 14 deletions.
17 changes: 12 additions & 5 deletions pkg/identity/reserved.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,40 @@ var (
)

// AddReservedIdentity adds the reserved numeric identity with the respective
// label into the map of reserved identity cache.
func AddReservedIdentity(ni NumericIdentity, lbl string) {
// label into the map of reserved identity cache, and returns the resulting Identity.
// This identity must not be mutated!
func AddReservedIdentity(ni NumericIdentity, lbl string) *Identity {
identity := NewIdentity(ni, labels.Labels{lbl: labels.NewLabel(lbl, "", labels.LabelSourceReserved)})
cacheMU.Lock()
reservedIdentityCache[ni] = identity
cacheMU.Unlock()
return identity
}

// AddReservedIdentityWithLabels is the same as AddReservedIdentity but accepts
// multiple labels.
func AddReservedIdentityWithLabels(ni NumericIdentity, lbls labels.Labels) {
// multiple labels. Returns the resulting Identity.
// This identity must not be mutated!
func AddReservedIdentityWithLabels(ni NumericIdentity, lbls labels.Labels) *Identity {
identity := NewIdentity(ni, lbls)
cacheMU.Lock()
reservedIdentityCache[ni] = identity
cacheMU.Unlock()
return identity
}

// LookupReservedIdentity looks up a reserved identity by its NumericIdentity
// and returns it if found. Returns nil if not found.
// This identity must not be mutated!
func LookupReservedIdentity(ni NumericIdentity) *Identity {
cacheMU.RLock()
defer cacheMU.RUnlock()
return reservedIdentityCache[ni]
}

func init() {
iterateReservedIdentityLabels(AddReservedIdentityWithLabels)
iterateReservedIdentityLabels(func(ni NumericIdentity, lbls labels.Labels) {
AddReservedIdentityWithLabels(ni, lbls)
})
}

// IterateReservedIdentities iterates over all reserved identities and
Expand Down
7 changes: 6 additions & 1 deletion pkg/ipcache/ipcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
cmtypes "github.com/cilium/cilium/pkg/clustermesh/types"
identityPkg "github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/ipcache/types/fake"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/source"
testidentity "github.com/cilium/cilium/pkg/testutils/identity"
"github.com/cilium/cilium/pkg/types"
Expand All @@ -34,6 +35,7 @@ type IPCacheTestSuite struct {
var (
_ = Suite(&IPCacheTestSuite{})
IPIdentityCache *IPCache
PolicyHandler *mockUpdater
)

func Test(t *testing.T) {
Expand All @@ -43,10 +45,13 @@ func Test(t *testing.T) {
func (s *IPCacheTestSuite) SetUpTest(c *C) {
ctx, cancel := context.WithCancel(context.Background())
allocator := testidentity.NewMockIdentityAllocator(nil)
PolicyHandler = &mockUpdater{
identities: make(map[identityPkg.NumericIdentity]labels.LabelArray),
}
IPIdentityCache = NewIPCache(&Configuration{
Context: ctx,
IdentityAllocator: allocator,
PolicyHandler: &mockUpdater{},
PolicyHandler: PolicyHandler,
DatapathHandler: &mockTriggerer{},
NodeIDHandler: &fake.FakeNodeIDHandler{},
})
Expand Down
70 changes: 66 additions & 4 deletions pkg/ipcache/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,22 @@ type metadata struct {
// generate updates into the ipcache, policy engine and datapath.
queuedChangesMU lock.Mutex
queuedPrefixes map[netip.Prefix]struct{}

// reservedHostLock protects the localHostLabels map. Holders must
// always take the metadata read lock first.
reservedHostLock lock.Mutex

// reservedHostLabels collects all labels that apply to the host identity.
// see updateLocalHostLabels() for more info.
reservedHostLabels map[netip.Prefix]labels.Labels
}

func newMetadata() *metadata {
return &metadata{
m: make(map[netip.Prefix]PrefixInfo),
queuedPrefixes: make(map[netip.Prefix]struct{}),

reservedHostLabels: make(map[netip.Prefix]labels.Labels),
}
}

Expand Down Expand Up @@ -204,14 +214,13 @@ func (ipc *IPCache) InjectLabels(ctx context.Context, modifiedPrefixes []netip.P
oldID, entryExists := ipc.LookupByIP(pstr)
oldTunnelIP, oldEncryptionKey := ipc.GetHostIPCache(pstr)
prefixInfo := ipc.metadata.getLocked(prefix)
var newID *identity.Identity
if prefixInfo == nil {
if !entryExists {
// Already deleted, no new metadata to associate
continue
} // else continue below to remove the old entry
} else {
var newID *identity.Identity

// Insert to propagate the updated set of labels after removal.
newID, _, err = ipc.resolveIdentity(ctx, prefix, prefixInfo, identity.InvalidIdentity)
if err != nil {
Expand Down Expand Up @@ -303,6 +312,25 @@ func (ipc *IPCache) InjectLabels(ctx context.Context, modifiedPrefixes []netip.P
entriesToDelete[prefix] = oldID
}
}

// The reserved:host identity is special: the numeric ID is fixed,
// and the set of labels is mutable. Thus, whenever it changes,
// we must always update the SelectorCache (normally, this is elided
// when no changes are present).
if newID != nil && newID.ID == identity.ReservedIdentityHost {
idsToAdd[newID.ID] = newID.Labels.LabelArray()
}

// Again, more reserved:host bookkeeping: if this prefix is no longer ID 1 (because
// it is being deleted or changing IDs), we need to recompute the labels
// for reserved:host and push that to the SelectorCache
if entryExists && oldID.ID == identity.ReservedIdentityHost &&
(newID == nil || newID.ID != identity.ReservedIdentityHost) {

i := ipc.updateReservedHostLabels(prefix, nil)
idsToAdd[i.ID] = i.Labels.LabelArray()
}

}
// Don't hold lock while calling UpdateIdentities, as it will otherwise run into a deadlock
ipc.metadata.RUnlock()
Expand Down Expand Up @@ -452,8 +480,12 @@ func (ipc *IPCache) resolveIdentity(ctx context.Context, prefix netip.Prefix, in
// for itself). For all other identities, we avoid modifying
// the labels at runtime and instead opt to allocate new
// identities below.
identity.AddReservedIdentityWithLabels(identity.ReservedIdentityHost, lbls)
return identity.LookupReservedIdentity(identity.ReservedIdentityHost), false, nil
//
// As an extra gotcha, we need need to merge all labels for all IPs
// that resolve to the reserved:host identity, otherwise we can
// flap identities labels depending on which prefix writes first. See GH-28259.
i := ipc.updateReservedHostLabels(prefix, lbls)
return i, false, nil
}

// If no other labels are associated with this IP, we assume that it's
Expand Down Expand Up @@ -483,6 +515,36 @@ func (ipc *IPCache) resolveIdentity(ctx context.Context, prefix netip.Prefix, in
return id, isNew, err
}

// updateReservedHostLabels adds or removes labels that apply to the local host.
// The `reserved:host` identity is special: the numeric identity is fixed
// and the set of labels is mutable. (The datapath requires this.) So,
// we need to determine all prefixes that have the `reserved:host` label and
// capture their labels. Then, we must aggregate *all* labels from all prefixes and
// update the labels that correspond to the `reserved:host` identity.
//
// This could be termed a meta-ipcache. The ipcache metadata layer aggregates
// an arbitrary set of resources and labels to a prefix. Here, we are aggregating an arbitrary
// set of prefixes and labels to an identity.
func (ipc *IPCache) updateReservedHostLabels(prefix netip.Prefix, lbls labels.Labels) *identity.Identity {
ipc.metadata.reservedHostLock.Lock()
defer ipc.metadata.reservedHostLock.Unlock()
if lbls == nil {
delete(ipc.metadata.reservedHostLabels, prefix)
} else {
ipc.metadata.reservedHostLabels[prefix] = lbls
}

// aggregate all labels and update static identity
newLabels := labels.NewFrom(labels.LabelHost)
for _, l := range ipc.metadata.reservedHostLabels {
newLabels.MergeLabels(l)
}

log.WithField(logfields.Labels, newLabels).Debug("Merged labels for reserved:host identity")

return identity.AddReservedIdentityWithLabels(identity.ReservedIdentityHost, newLabels)
}

// RemoveLabelsExcluded removes the given labels from all IPs inside the IDMD
// except for the IPs / prefixes inside the given excluded set.
//
Expand Down
107 changes: 103 additions & 4 deletions pkg/ipcache/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,88 @@ func TestInjectLabels(t *testing.T) {
assert.Equal(t, identity.ReservedIdentityWorld, IPIdentityCache.ipToIdentityCache["0.0.0.0/0"].ID)
}

// Test that when multiple IPs have the `resolved:host` label, we correctly
// aggregate all labels *and* update the selector cache correctly.
// This reproduces GH-28259.
func TestUpdateLocalNode(t *testing.T) {
cancel := setupTest(t)
defer cancel()

ctx := context.Background()

bothLabels := labels.Labels{}
bothLabels.MergeLabels(labels.LabelHost)
bothLabels.MergeLabels(labels.LabelKubeAPIServer)

selectorCacheHas := func(lbls labels.Labels) {
t.Helper()
id := PolicyHandler.identities[identity.ReservedIdentityHost]
assert.NotNil(t, id)
assert.Equal(t, lbls.LabelArray(), id)
}

injectLabels := func(ip netip.Prefix) {
t.Helper()
remaining, err := IPIdentityCache.InjectLabels(ctx, []netip.Prefix{ip})
assert.NoError(t, err)
assert.Len(t, remaining, 0)
}

idIs := func(ip netip.Prefix, id identity.NumericIdentity) {
t.Helper()
assert.Equal(t, IPIdentityCache.ipToIdentityCache[ip.String()].ID, id)
}

// Mark .4 as local host
IPIdentityCache.metadata.upsertLocked(inClusterPrefix, source.Local, "node-uid", labels.LabelHost)
injectLabels(inClusterPrefix)
idIs(inClusterPrefix, identity.ReservedIdentityHost)
selectorCacheHas(labels.LabelHost)

// Mark .4 as kube-apiserver
// Note that in the actual code, we use `source.KubeAPIServer`. However,
// we use the same source in test case to try and ferret out more bugs.
IPIdentityCache.metadata.upsertLocked(inClusterPrefix, source.Local, "kube-uid", labels.LabelKubeAPIServer)
injectLabels(inClusterPrefix)
idIs(inClusterPrefix, identity.ReservedIdentityHost)
selectorCacheHas(bothLabels)

// Mark .5 as local host
IPIdentityCache.metadata.upsertLocked(inClusterPrefix2, source.Local, "node-uid", labels.LabelHost)
injectLabels(inClusterPrefix2)
idIs(inClusterPrefix, identity.ReservedIdentityHost)
idIs(inClusterPrefix2, identity.ReservedIdentityHost)
selectorCacheHas(bothLabels)

// remove kube-apiserver from .4
IPIdentityCache.metadata.remove(inClusterPrefix, "kube-uid", labels.LabelKubeAPIServer)
injectLabels(inClusterPrefix)
idIs(inClusterPrefix, identity.ReservedIdentityHost)
idIs(inClusterPrefix2, identity.ReservedIdentityHost)
selectorCacheHas(labels.LabelHost)

// add kube-apiserver back to .4
IPIdentityCache.metadata.upsertLocked(inClusterPrefix, source.Local, "kube-uid", labels.LabelKubeAPIServer)
injectLabels(inClusterPrefix)
idIs(inClusterPrefix, identity.ReservedIdentityHost)
idIs(inClusterPrefix2, identity.ReservedIdentityHost)
selectorCacheHas(bothLabels)

// remove host from .4
IPIdentityCache.metadata.remove(inClusterPrefix, "node-uid", labels.LabelHost)
injectLabels(inClusterPrefix)

// Verify that .4 now has just kube-apiserver and CIDRs
idIs(inClusterPrefix, identity.LocalIdentityFlag) // the first CIDR identity
id := PolicyHandler.identities[identity.LocalIdentityFlag]
assert.True(t, id.Has("reserved.kube-apiserver"))
assert.True(t, id.Has("cidr."+inClusterPrefix.String()))

// verify that id 1 is now just reserved:host
idIs(inClusterPrefix2, identity.ReservedIdentityHost)
selectorCacheHas(labels.LabelHost)
}

// TestInjectExisting tests "upgrading" an existing identity to the apiserver.
// This is a common occurrence on startup - and this tests ensures we don't
// regress the known issue in GH-24502
Expand Down Expand Up @@ -353,7 +435,7 @@ func TestOverrideIdentity(t *testing.T) {

ipc := NewIPCache(&Configuration{
IdentityAllocator: allocator,
PolicyHandler: &mockUpdater{},
PolicyHandler: newMockUpdater(),
DatapathHandler: &mockTriggerer{},
})
ctx := context.Background()
Expand Down Expand Up @@ -501,10 +583,11 @@ func setupTest(t *testing.T) (cleanup func()) {

ctx, cancel := context.WithCancel(context.Background())
allocator := testidentity.NewMockIdentityAllocator(nil)
PolicyHandler = newMockUpdater()
IPIdentityCache = NewIPCache(&Configuration{
Context: ctx,
IdentityAllocator: allocator,
PolicyHandler: &mockUpdater{},
PolicyHandler: PolicyHandler,
DatapathHandler: &mockTriggerer{},
NodeIDHandler: &fake.FakeNodeIDHandler{},
})
Expand All @@ -518,9 +601,25 @@ func setupTest(t *testing.T) (cleanup func()) {
}
}

type mockUpdater struct{}
func newMockUpdater() *mockUpdater {
return &mockUpdater{
identities: make(map[identity.NumericIdentity]labels.LabelArray),
}
}

func (m *mockUpdater) UpdateIdentities(_, _ cache.IdentityCache, _ *sync.WaitGroup) {}
type mockUpdater struct {
identities map[identity.NumericIdentity]labels.LabelArray
}

func (m *mockUpdater) UpdateIdentities(added, deleted cache.IdentityCache, _ *sync.WaitGroup) {
for nid, lbls := range added {
m.identities[nid] = lbls
}

for nid := range deleted {
delete(m.identities, nid)
}
}

type mockTriggerer struct{}

Expand Down

0 comments on commit 5eb57b9

Please sign in to comment.