Skip to content

Commit

Permalink
ipcache/md: Handle deletions asynchronously
Browse files Browse the repository at this point in the history
Now that the IPCache handles metadata updates incrementally, we can
switch the deletion path over to use this same incremental update logic.
This allows the users of the deletion APIs to inject updates into the
ipcache in a similar manner to the users of the add APIs, and have the
updates incrementally triggered into the subsequent subsystems (policy,
datapath) consistently. This means that updates into the ipcache from
both add and delete paths for the kube-apiserver policy feature will
only actually occur from a single goroutine, triggered by the
TriggerLabelInjection call.

This removes the need to reason about multiple concurrent adds or
deletions into the IPcache occurring from the kube-apiserver policy
feature, and it also lays the path to do the same in future for CIDR
policy and other subsystems. This way, each user of the IPCache can
propagate the information that it intends to push into the IPCache, then
the IPCache itself can decide how to handle those updates and how to
combine information from the various subsystems.

Some key observations here:
* The previous commit actually includes 90% of the logic required to
  implement deletions, based partially on the previous 'add' code and
  partially on the code being deleted here.
* What changes here is adding support in InjectLabels() for the case
  where a set of labels is removed from a prefix. If all labels are
  removed, then this results in the 'IPCache.metadata' map having no
  corresponding entry, so in this case the corresponding old identity
  currently in 'IPCache' should be removed from the selectorcache,
  policymaps, and the IPCache.
  * Caveat: This is only the case if _only_ the metadata map has
    references to the identity. At this point in time, CIDR policies for
    instance are not yet converted over to the metadata map approach for
    associating labels with prefixes, so that path may independently
    allocate their own identities. If those are still referenced from
    CIDR policy, then the label injector should simply remove references
    to the corresponding identities but not remove it from the ipcache.
* Another case is when there are some set of labels (eg A, B) associated
  with a prefix, then one set (eg B) is removed. The result is that a
  previous identity with labels (A, B) must be removed, and a new
  identity with labels (A) should be allocated / associated with the
  prefix. In general, this is very similar to the existing case where a
  set of labels is expanded by associating new labels with the prefix
  (already handled in the previous commit).
  * This also has a curly case: Each set of labels has a source
    associated, for instance initially there could be "remote-node"
    (source: custom-resource) and "kube-apiserver" (source:
    kube-apiserver). When previously upserting into the IPCache, the
    source will be kube-apiserver. If the kube apiserver is no longer
    associated with the IP, and hence that label removed, then the
    resulting set of labels will only be "remote-node", with source
    "custom-resource". Given that the source "custom-resource" has a
    lower priority in pkg/source than kube-apiserver source, we cannot
    update the ipcache directly with the new set of labels using the
    "custom-resource" source. However, the label removal is still
    legitimate. To work around the clunky APIs, the function here just
    overrides the source check in the IPCache.Upsert(). We should be
    able to remove this clunkiness over time when the metadata map is
    the primary source of information for prefixes, but more refactoring
    is necessary to get to that point.
* Now that the label removal doesn't have its own independent logic from
  a separate goroutine, there is no longer a need to use the
  'applyChangesMU' mutex in the metadata cache to ensure safety around
  the critical section. Furthermore, the core InjectLabels() call
  doesn't modify the metadata map. So, we can remove one lock and reduce
  the other lock down to a read mutex rather than holding it for write.

Co-authored-by: Chris Tarazi <chris@isovalent.com>
Signed-off-by: Joe Stringer <joe@cilium.io>
  • Loading branch information
joestringer and christarazi committed Sep 1, 2022
1 parent 96c843d commit 308c142
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 225 deletions.
242 changes: 37 additions & 205 deletions pkg/ipcache/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ type metadata struct {
// m is the actual map containing the mappings.
m map[string]prefixInfo

// applyChangesMU protects InjectLabels and RemoveLabelsExcluded from being
// run in parallel
applyChangesMU lock.Mutex

// queued* handle updates into the IPCache. Whenever a label is added
// or removed from a specific IP prefix, that prefix is added into
// 'queuedPrefixes'. Each time label injection is triggered, it will
Expand Down Expand Up @@ -191,24 +187,21 @@ func (ipc *IPCache) InjectLabels(modifiedCIDRs []string) (remainingCIDRs []strin
idsToAdd = make(map[identity.NumericIdentity]labels.LabelArray)
idsToDelete = make(map[identity.NumericIdentity]labels.LabelArray)
// entriesToReplace stores the identity to replace in the ipcache.
entriesToReplace = make(map[string]Identity)
entriesToReplace = make(map[string]Identity)
entriesToDelete = make(map[string]Identity)
forceIPCacheUpdate = make(map[string]bool) // prefix => force
)

ipc.metadata.applyChangesMU.Lock()
defer ipc.metadata.applyChangesMU.Unlock()

ipc.metadata.Lock()
ipc.metadata.RLock()

for i, prefix := range modifiedCIDRs {
id, entryExists := ipc.LookupByIP(prefix)
prefixInfo := ipc.metadata.getLocked(prefix)
if prefixInfo == nil {
log.WithFields(logrus.Fields{
logfields.IPAddr: prefix,
logfields.Identity: id,
}).Warning(
"IPCache metadata unexpectedly removed before handling metadata update",
)
if !entryExists {
// Already deleted, no new metadata to associate
continue
} // else continue below to remove the old entry
} else {
var newID *identity.Identity

Expand Down Expand Up @@ -254,6 +247,15 @@ func (ipc *IPCache) InjectLabels(modifiedCIDRs []string) (remainingCIDRs []strin
ID: newID.ID,
Source: prefixInfo.Source(),
}
// IPCache.Upsert() and friends currently require a
// Source to be provided during upsert. If the old
// Source was higher precedence due to labels that
// have now been removed, then we need to explicitly
// work around that to remove the old higher-priority
// identity and replace it with this new identity.
if entryExists && prefixInfo.Source() != id.Source {
forceIPCacheUpdate[prefix] = true
}
}
releaseIdentity:
if entryExists {
Expand All @@ -270,7 +272,7 @@ func (ipc *IPCache) InjectLabels(modifiedCIDRs []string) (remainingCIDRs []strin
}
}
// Don't hold lock while calling UpdateIdentities, as it will otherwise run into a deadlock
ipc.metadata.Unlock()
ipc.metadata.RUnlock()

// Recalculate policy first before upserting into the ipcache.
if len(idsToAdd) > 0 {
Expand All @@ -288,7 +290,7 @@ func (ipc *IPCache) InjectLabels(modifiedCIDRs []string) (remainingCIDRs []strin
key,
meta,
id,
true,
forceIPCacheUpdate[ip],
); err2 != nil {
log.WithError(err2).WithFields(logrus.Fields{
logfields.IPAddr: ip,
Expand All @@ -297,7 +299,7 @@ func (ipc *IPCache) InjectLabels(modifiedCIDRs []string) (remainingCIDRs []strin
}
}

for _, id := range previouslyAllocatedIdentities {
for prefix, id := range previouslyAllocatedIdentities {
realID := ipc.IdentityAllocator.LookupIdentityByID(context.TODO(), id.ID)
if realID == nil {
continue
Expand All @@ -321,11 +323,17 @@ func (ipc *IPCache) InjectLabels(modifiedCIDRs []string) (remainingCIDRs []strin
// logic for handling the removal of these identities.
if released {
idsToDelete[id.ID] = nil // SelectorCache removal
if _, ok := entriesToReplace[prefix]; !ok {
entriesToDelete[prefix] = id // IPCache removal
}
}
}
if len(idsToDelete) > 0 {
ipc.UpdatePolicyMaps(context.TODO(), nil, idsToDelete)
}
for ip, id := range entriesToDelete {
ipc.deleteLocked(ip, id.Source)
}

return remainingCIDRs, err
}
Expand Down Expand Up @@ -441,30 +449,24 @@ func (ipc *IPCache) injectLabelsForCIDR(p string, lbls labels.Labels) (*identity
}

// RemoveLabelsExcluded removes the given labels from all IPs inside the IDMD
// except for the IPs / prefixes inside the given excluded set. This may cause
// updates to the ipcache, as well as to the identity and policy logic via
// 'updater' and 'triggerer'.
// except for the IPs / prefixes inside the given excluded set.
//
// The caller must subsequently call IPCache.TriggerLabelInjection() to push
// these changes down into the policy engine and ipcache datapath maps.
func (ipc *IPCache) RemoveLabelsExcluded(
lbls labels.Labels,
toExclude map[string]struct{},
src source.Source,
rid types.ResourceID,
) {
ipc.metadata.applyChangesMU.Lock()
defer ipc.metadata.applyChangesMU.Unlock()

ipc.metadata.Lock()
defer ipc.metadata.Unlock()

oldSet := ipc.metadata.filterByLabels(lbls)
toRemove := make(map[string]labels.Labels)
for _, ip := range oldSet {
if _, ok := toExclude[ip]; !ok {
toRemove[ip] = lbls
ipc.removeLabels(ip, lbls, rid)
}
}

ipc.removeLabelsFromIPs(toRemove, src, rid)
}

// filterByLabels returns all the prefixes inside the ipcache metadata map
Expand All @@ -484,192 +486,22 @@ func (m *metadata) filterByLabels(filter labels.Labels) []string {
return matching
}

// removeLabelsFromIPs removes all given prefixes at once. This function will
// trigger policy update and recalculation if necessary on behalf of the
// caller.
//
// A prefix will only be removed from the IDMD if the set of labels becomes
// empty.
//
// Assumes that the ipcache metadata lock is taken!
func (ipc *IPCache) removeLabelsFromIPs(
m map[string]labels.Labels,
src source.Source,
rid types.ResourceID,
) {
var (
idsToAdd = make(map[identity.NumericIdentity]labels.LabelArray)
idsToDelete = make(map[identity.NumericIdentity]labels.LabelArray)
// toReplace stores the identity to replace in the ipcache.
toReplace = make(map[string]Identity)
)

ipc.Lock()
defer ipc.Unlock()

for prefix, lbls := range m {
id, exists := ipc.LookupByIPRLocked(prefix)
if !exists {
continue
}

idsToDelete[id.ID] = nil // labels for deletion don't matter to UpdateIdentities()

// Insert to propagate the updated set of labels after removal.
l := ipc.removeLabels(prefix, lbls, src, rid)
if len(l) > 0 {
// If for example kube-apiserver label is removed from the local
// host identity (when the kube-apiserver is running within the
// cluster and it is no longer running on the current local host),
// then removeLabels() will return a non-empty set representing the
// new full set of labels to associate with the node (in this
// example, just the local host label). In order to propagate the
// new identity, we must emit a delete event for the old identity
// and then an add event for the new identity.

// If host identity has changed, update its labels.
if id.ID == identity.ReservedIdentityHost {
identity.AddReservedIdentityWithLabels(id.ID, l)
}

newID, _, err := ipc.IdentityAllocator.AllocateIdentity(context.TODO(), l, false, identity.InvalidIdentity)
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
logfields.IPAddr: prefix,
logfields.Identity: id,
logfields.IdentityLabels: l, // new labels
logfields.Labels: lbls, // removed labels
}).Error(
"Failed to allocate new identity after dissociating labels from existing identity. Traffic may be disrupted.",
)
continue
}
idsToAdd[newID.ID] = l.LabelArray()
toReplace[prefix] = Identity{
ID: newID.ID,
Source: sourceByLabels(src, l),
}
}
}
if len(idsToDelete) > 0 {
ipc.UpdatePolicyMaps(context.TODO(), idsToAdd, idsToDelete)
}
for ip, id := range toReplace {
hIP, key := ipc.getHostIPCache(ip)
meta := ipc.getK8sMetadata(ip)
if _, err := ipc.upsertLocked(
ip,
hIP,
key,
meta,
id,
true, /* force upsert */
); err != nil {
log.WithError(err).WithFields(logrus.Fields{
logfields.IPAddr: ip,
logfields.Identity: id,
}).Error("Failed to replace ipcache entry with new identity after label removal. Traffic may be disrupted.")
}
}
}

// removeLabels removes the given labels association with the given prefix. The
// leftover labels are returned, if any. If there are leftover labels, the
// caller must allocate a new identity and do the following *in order* to avoid
// drops:
// 1. policy recalculation must be implemented into the datapath and
// 2. new identity must have a new entry upserted into the IPCache
//
// Note: GH-17962, triggering policy recalculation doesn't actually *implement*
// the changes into datapath (because it's an async call), this is a known
// issue. There's a very small window for drops when two policies select the
// same traffic and the identity changes. For example, this is possible if an
// IP is associated with the kube-apiserver and referenced inside a ToCIDR
// policy, and then the IP is no longer associated with the kube-apiserver.
// removeLabels asynchronously removes the labels association for a prefix.
//
// Identities are deallocated and their subequent entry in the IPCache is
// removed if the prefix is no longer associated with any labels.
//
// This function assumes that the ipcache metadata and the IPIdentityCache
// locks are taken!
func (ipc *IPCache) removeLabels(prefix string, lbls labels.Labels, src source.Source, resource types.ResourceID) labels.Labels {
// This function assumes that the ipcache metadata lock is held for writing.
func (ipc *IPCache) removeLabels(prefix string, lbls labels.Labels, resource types.ResourceID) {
info, ok := ipc.metadata.m[prefix]
if !ok {
return nil
return
}
info[resource].unmerge(lbls)
if !info[resource].isValid() {
delete(info, resource)
}
allLbls := info.ToLabels() // to return at the end of the function
if !info.isValid() { // Labels empty, delete
// No labels left. Example case: when the kube-apiserver is running
// outside of the cluster, meaning that the IDMD only ever had the
// kube-apiserver label (CIDR labels are not added) and it's now being
// removed.
if !info.isValid() { // Labels empty, delete
delete(ipc.metadata.m, prefix)
}

id, exists := ipc.LookupByIPRLocked(prefix)
if !exists {
log.WithFields(logrus.Fields{
logfields.CIDR: prefix,
logfields.Labels: lbls,
}).Warn(
"Identity for prefix was unexpectedly not found in ipcache, unable " +
"to remove labels from prefix. If a network policy is applied, check " +
"for any drops. It's possible that insertion or removal from " +
"the ipcache failed.",
)
return nil
}

realID := ipc.IdentityAllocator.LookupIdentityByID(context.TODO(), id.ID)
if realID == nil {
log.WithFields(logrus.Fields{
logfields.CIDR: prefix,
logfields.Labels: lbls,
logfields.Identity: id,
}).Warn(
"Identity unexpectedly not found within the identity allocator, " +
"unable to remove labels from prefix. It's possible that insertion " +
"or removal from the ipcache failed.",
)
return nil
}
released, err := ipc.IdentityAllocator.Release(context.TODO(), realID, false)
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
logfields.IPAddr: prefix,
logfields.Labels: lbls,
logfields.Identity: realID,
logfields.IdentityLabels: realID.Labels,
}).Error(
"Failed to release assigned identity to IP while removing label association, this might be a leak.",
)
return nil
}
if released {
ipc.deleteLocked(prefix, sourceByLabels(src, lbls))
return nil
}

// Generate new identity with the label removed. This should be the case
// where the existing identity had >1 refcount, meaning that something was
// referring to it.
//
// If kube-apiserver is inside the cluster, this path is always hit
// (because even if we remove the kube-apiserver from that node, we
// need to inject the identity corresponding to "host" or "remote-node"
// (without apiserver label)
return allLbls
}

func sourceByLabels(d source.Source, lbls labels.Labels) source.Source {
if lbls.Has(labels.LabelKubeAPIServer[labels.IDNameKubeAPIServer]) {
return source.KubeAPIServer
}
return d
ipc.metadata.enqueuePrefixUpdates(prefix)
}

// TriggerLabelInjection triggers the label injection controller to iterate
Expand Down
Loading

0 comments on commit 308c142

Please sign in to comment.