Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure IPCache to handle metadata merging #19765

Merged
merged 11 commits into from
Sep 1, 2022
20 changes: 13 additions & 7 deletions daemon/cmd/datapath.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package cmd
import (
"fmt"
"net"
"net/netip"
"os"
"strings"
"time"
Expand Down Expand Up @@ -257,7 +258,7 @@ func (d *Daemon) syncEndpointsAndHostIPs() error {
// ipcache. Until then, it is expected to succeed.
d.ipcache.Upsert(ipIDPair.PrefixString(), nil, hostKey, nil, ipcache.Identity{
ID: ipIDPair.ID,
Source: d.sourceByIP(ipIDPair.IP.String(), source.Local),
Source: d.sourceByIP(ipIDPair.IP, source.Local),
})
}

Expand All @@ -271,7 +272,7 @@ func (d *Daemon) syncEndpointsAndHostIPs() error {
log.Debugf("Removed outdated host ip %s from endpoint map", hostIP)
}

d.ipcache.Delete(hostIP, d.sourceByIP(hostIP, source.Local))
d.ipcache.Delete(hostIP, d.sourceByIP(ip, source.Local))
}
}

Expand All @@ -290,11 +291,16 @@ func (d *Daemon) syncEndpointsAndHostIPs() error {
return nil
}

func (d *Daemon) sourceByIP(prefix string, defaultSrc source.Source) source.Source {
if lbls := d.ipcache.GetIDMetadataByIP(prefix); lbls.Has(
labels.LabelKubeAPIServer[labels.IDNameKubeAPIServer],
) {
return source.KubeAPIServer
func (d *Daemon) sourceByIP(ip net.IP, defaultSrc source.Source) source.Source {
if addr, ok := netip.AddrFromSlice(ip); ok {
lbls := d.ipcache.GetIDMetadataByIP(addr)
if lbls.Has(labels.LabelKubeAPIServer[labels.IDNameKubeAPIServer]) {
return source.KubeAPIServer
}
} else {
log.WithFields(logrus.Fields{
logfields.IPAddr: ip,
}).Warning("BUG: Invalid addr detected in host stack. Please report this bug to the Cilium developers.")
}
return defaultSrc
}
Expand Down
50 changes: 28 additions & 22 deletions pkg/ipcache/cidr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"net"
"net/netip"
"strings"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -48,31 +49,34 @@ func (ipc *IPCache) AllocateCIDRs(
newlyAllocatedIdentities = map[string]*identity.Identity{}
}

allocateCtx, cancel := context.WithTimeout(context.Background(), option.Config.IPAllocationTimeout)
defer cancel()

ipc.Lock()
allocatedIdentities := make(map[string]*identity.Identity, len(prefixes))
allocatedIdentities := make(map[netip.Prefix]*identity.Identity, len(prefixes))
for i, p := range prefixes {
if p == nil {
continue
}

lbls := cidr.GetCIDRLabels(p)
lbls.MergeLabels(ipc.GetIDMetadataByIP(p.IP.String()))
prefix := ip.IPNetToPrefix(p)
lbls.MergeLabels(ipc.metadata.get(prefix).ToLabels())
oldNID := identity.InvalidIdentity
if oldNIDs != nil && len(oldNIDs) > i {
oldNID = oldNIDs[i]
}
id, isNew, err := ipc.allocate(p, lbls, oldNID)
id, isNew, err := ipc.allocate(allocateCtx, prefix, lbls, oldNID)
if err != nil {
ipc.IdentityAllocator.ReleaseSlice(context.Background(), nil, usedIdentities)
ipc.Unlock()
return nil, err
}

prefixStr := p.String()
usedIdentities = append(usedIdentities, id)
allocatedIdentities[prefixStr] = id
allocatedIdentities[prefix] = id
if isNew {
newlyAllocatedIdentities[prefixStr] = id
newlyAllocatedIdentities[prefix.String()] = id
}
}
ipc.Unlock()
Expand Down Expand Up @@ -166,15 +170,8 @@ func (ipc *IPCache) UpsertGeneratedIdentities(newlyAllocatedIdentities map[strin
//
// It is up to the caller to provide the full set of labels for identity
// allocation.
func (ipc *IPCache) allocate(prefix *net.IPNet, lbls labels.Labels, oldNID identity.NumericIdentity) (*identity.Identity, bool, error) {
if prefix == nil {
return nil, false, nil
}

allocateCtx, cancel := context.WithTimeout(context.Background(), option.Config.IPAllocationTimeout)
defer cancel()

id, isNew, err := ipc.IdentityAllocator.AllocateIdentity(allocateCtx, lbls, false, oldNID)
func (ipc *IPCache) allocate(ctx context.Context, prefix netip.Prefix, lbls labels.Labels, oldNID identity.NumericIdentity) (*identity.Identity, bool, error) {
id, isNew, err := ipc.IdentityAllocator.AllocateIdentity(ctx, lbls, false, oldNID)
if err != nil {
return nil, isNew, fmt.Errorf("failed to allocate identity for cidr %s: %s", prefix, err)
}
Expand All @@ -186,7 +183,7 @@ func (ipc *IPCache) allocate(prefix *net.IPNet, lbls labels.Labels, oldNID ident
return id, isNew, err
}

func (ipc *IPCache) releaseCIDRIdentities(ctx context.Context, identities map[string]*identity.Identity) {
func (ipc *IPCache) releaseCIDRIdentities(ctx context.Context, identities map[netip.Prefix]*identity.Identity) {
// Create a critical section for identity release + removal from ipcache.
// Otherwise, it's possible to trigger the following race condition:
//
Expand All @@ -212,7 +209,7 @@ func (ipc *IPCache) releaseCIDRIdentities(ctx context.Context, identities map[st
}

if released {
ipc.deleteLocked(prefix, source.Generated)
ipc.deleteLocked(prefix.String(), source.Generated)
}
}
}
Expand All @@ -224,16 +221,17 @@ func (ipc *IPCache) ReleaseCIDRIdentitiesByCIDR(prefixes []*net.IPNet) {
releaseCtx, cancel := context.WithTimeout(context.TODO(), option.Config.KVstoreConnectivityTimeout)
defer cancel()

identities := make(map[string]*identity.Identity, len(prefixes))
identities := make(map[netip.Prefix]*identity.Identity, len(prefixes))
for _, prefix := range prefixes {
if prefix == nil {
continue
}

p := ip.IPNetToPrefix(prefix)
if id := ipc.IdentityAllocator.LookupIdentity(releaseCtx, cidr.GetCIDRLabels(prefix)); id != nil {
identities[prefix.String()] = id
identities[p] = id
} else {
log.Errorf("Unable to find identity of previously used CIDR %s", prefix.String())
log.Errorf("Unable to find identity of previously used CIDR %s", p.String())
}
}

Expand All @@ -243,7 +241,7 @@ func (ipc *IPCache) ReleaseCIDRIdentitiesByCIDR(prefixes []*net.IPNet) {
// ReleaseCIDRIdentitiesByID releases the specified identities.
// When the last use of the identity is released, the ipcache entry is deleted.
func (ipc *IPCache) ReleaseCIDRIdentitiesByID(ctx context.Context, identities []identity.NumericIdentity) {
fullIdentities := make(map[string]*identity.Identity, len(identities))
fullIdentities := make(map[netip.Prefix]*identity.Identity, len(identities))
for _, nid := range identities {
if id := ipc.IdentityAllocator.LookupIdentityByID(ctx, nid); id != nil {
cidr, ok := cidrLabelToPrefix(id.CIDRLabel.String())
Expand All @@ -254,7 +252,15 @@ func (ipc *IPCache) ReleaseCIDRIdentitiesByID(ctx context.Context, identities []
}).Warn("Unexpected release of non-CIDR identity, will leak this identity. Please report this issue to the developers.")
continue
}
fullIdentities[cidr] = id
prefix, err := netip.ParsePrefix(strings.TrimPrefix(cidr, labels.LabelSourceCIDR+":"))
if err != nil {
log.WithFields(logrus.Fields{
logfields.Identity: nid,
logfields.Labels: id.Labels,
}).Warn("BUG: Cannot parse prefix from CIDR label during CIDR identity release. Please report this issue to the developers.")
continue
}
fullIdentities[prefix] = id
} else {
log.WithFields(logrus.Fields{
logfields.Identity: nid,
Expand Down
38 changes: 38 additions & 0 deletions pkg/ipcache/ipcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ package ipcache

import (
"net"
"net/netip"

"github.com/sirupsen/logrus"

"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/identity/cache"
ipcacheTypes "github.com/cilium/cilium/pkg/ipcache/types"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/metrics"
Expand Down Expand Up @@ -424,6 +426,42 @@ func (ipc *IPCache) DumpToListener(listener IPIdentityMappingListener) {
ipc.RUnlock()
}

// UpsertMetadata upserts a given IP and some corresponding information into
// the ipcache metadata map. See IPMetadata for a list of types that are valid
// to pass into this function. This will trigger asynchronous calculation of
// any datapath updates necessary to implement the logic associated with the
// specified metadata.
func (ipc *IPCache) UpsertMetadata(prefix netip.Prefix, src source.Source, resource ipcacheTypes.ResourceID, aux ...IPMetadata) {
ipc.metadata.Lock()
ipc.metadata.upsertLocked(prefix, src, resource, aux...)
ipc.metadata.Unlock()
ipc.metadata.enqueuePrefixUpdates(prefix)
ipc.TriggerLabelInjection()
}

func (ipc *IPCache) RemoveMetadata(prefix netip.Prefix, resource ipcacheTypes.ResourceID, aux ...IPMetadata) {
ipc.metadata.Lock()
ipc.metadata.remove(prefix, resource, aux...)
ipc.metadata.Unlock()
ipc.metadata.enqueuePrefixUpdates(prefix)
ipc.TriggerLabelInjection()
}

// UpsertLabels upserts a given IP and its corresponding labels associated
// with it into the ipcache metadata map. The given labels are not modified nor
// is its reference saved, as they're copied when inserting into the map.
// This will trigger asynchronous calculation of any local identity changes
// that must occur to associate the specified labels with the prefix, and push
// any datapath updates necessary to implement the logic associated with the
// metadata currently associated with the 'prefix'.
func (ipc *IPCache) UpsertLabels(prefix netip.Prefix, lbls labels.Labels, src source.Source, resource ipcacheTypes.ResourceID) {
ipc.UpsertMetadata(prefix, src, resource, lbls)
}

func (ipc *IPCache) RemoveLabels(cidr netip.Prefix, lbls labels.Labels, resource ipcacheTypes.ResourceID) {
ipc.RemoveMetadata(cidr, resource, lbls)
}

// DumpToListenerLocked dumps the entire contents of the IPCache by triggering
// the listener's "OnIPIdentityCacheChange" method for each entry in the cache.
// The caller *MUST* grab the IPCache.Lock for reading before calling this
Expand Down
Loading