Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: Use new asynchronous IPCache API for Manager (v2) #23208

Merged
merged 9 commits into from
May 17, 2023
2 changes: 1 addition & 1 deletion daemon/cmd/datapath.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (d *Daemon) syncHostIPs() error {

func (d *Daemon) sourceByIP(ip net.IP, defaultSrc source.Source) source.Source {
if addr, ok := ippkg.AddrFromIP(ip); ok {
lbls := d.ipcache.GetIDMetadataByIP(addr)
lbls := d.ipcache.GetMetadataLabelsByIP(addr)
if lbls.Has(labels.LabelKubeAPIServer[labels.IDNameKubeAPIServer]) {
return source.KubeAPIServer
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/ipcache/ipcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ func (ipc *IPCache) GetHostIP(ip string) net.IP {
return hostIP
}

func (ipc *IPCache) GetHostIPCache(ip string) (net.IP, uint8) {
ipc.mutex.RLock()
defer ipc.mutex.RUnlock()
asauber marked this conversation as resolved.
Show resolved Hide resolved
return ipc.getHostIPCache(ip)
}

func (ipc *IPCache) getHostIPCache(ip string) (net.IP, uint8) {
ipKeyPair := ipc.ipToHostIPCache[ip]
return ipKeyPair.IP, ipKeyPair.Key
Expand Down
116 changes: 69 additions & 47 deletions pkg/ipcache/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
"net"
"net/netip"
"sync"
"time"
Expand All @@ -33,7 +34,7 @@ var (
)

// metadata contains the ipcache metadata. Mainily it holds a map which maps IP
// prefixes (x.x.x.x/32) to a set of information (prefixInfo).
// prefixes (x.x.x.x/32) to a set of information (PrefixInfo).
//
// When allocating an identity to associate with each prefix, the
// identity allocation routines will merge this set of labels into the
Expand All @@ -49,13 +50,13 @@ var (
// labels.Labels
// source.Source
// end
// subgraph prefixInfo
// subgraph PrefixInfo
// UA[ResourceID]-->LA[resourceInfo]
// UB[ResourceID]-->LB[resourceInfo]
// ...
// end
// subgraph identityMetadata
// IP_Prefix-->prefixInfo
// IP_Prefix-->PrefixInfo
// end
//
// ```
Expand All @@ -68,7 +69,7 @@ type metadata struct {
lock.RWMutex

// m is the actual map containing the mappings.
m map[netip.Prefix]prefixInfo
m map[netip.Prefix]PrefixInfo

// queued* handle updates into the IPCache. Whenever a label is added
// or removed from a specific IP prefix, that prefix is added into
Expand All @@ -81,7 +82,7 @@ type metadata struct {

func newMetadata() *metadata {
return &metadata{
m: make(map[netip.Prefix]prefixInfo),
m: make(map[netip.Prefix]PrefixInfo),
queuedPrefixes: make(map[netip.Prefix]struct{}),
}
}
Expand Down Expand Up @@ -109,7 +110,7 @@ func (m *metadata) enqueuePrefixUpdates(prefixes ...netip.Prefix) {

func (m *metadata) upsertLocked(prefix netip.Prefix, src source.Source, resource types.ResourceID, info ...IPMetadata) {
if _, ok := m.m[prefix]; !ok {
m.m[prefix] = make(prefixInfo)
m.m[prefix] = make(PrefixInfo)
}
if _, ok := m.m[prefix][resource]; !ok {
m.m[prefix][resource] = &resourceInfo{
Expand All @@ -124,20 +125,28 @@ func (m *metadata) upsertLocked(prefix netip.Prefix, src source.Source, resource
m.m[prefix].logConflicts(log.WithField(logfields.CIDR, prefix))
}

// GetIDMetadataByIP returns the associated labels with an IP. The caller must
// not modifying the returned object as it's a live reference to the underlying
// map.
func (ipc *IPCache) GetIDMetadataByIP(addr netip.Addr) labels.Labels {
// GetMetadataLabelsByIP returns the associated labels with an IP.
func (ipc *IPCache) GetMetadataLabelsByIP(addr netip.Addr) labels.Labels {
prefix := netip.PrefixFrom(addr, addr.BitLen())
ipc.metadata.RLock()
defer ipc.metadata.RUnlock()
if info := ipc.metadata.getLocked(prefix); info != nil {
if info := ipc.GetMetadataByPrefix(prefix); info != nil {
return info.ToLabels()
}
return nil
}

func (m *metadata) getLocked(prefix netip.Prefix) prefixInfo {
// GetMetadataByPrefix returns full metadata for a given IP as a copy.
func (ipc *IPCache) GetMetadataByPrefix(prefix netip.Prefix) PrefixInfo {
ipc.metadata.RLock()
defer ipc.metadata.RUnlock()
m := ipc.metadata.getLocked(prefix)
n := make(PrefixInfo, len(m))
for k, v := range m {
n[k] = v.DeepCopy()
}
return n
}

func (m *metadata) getLocked(prefix netip.Prefix) PrefixInfo {
return m.m[prefix]
}

Expand Down Expand Up @@ -166,6 +175,14 @@ func (ipc *IPCache) InjectLabels(ctx context.Context, modifiedPrefixes []netip.P
return modifiedPrefixes, errors.New("k8s cache not fully synced")
}

type ipcacheEntry struct {
identity Identity
tunnelPeer net.IP
encryptKey uint8

force bool
}

var (
// previouslyAllocatedIdentities maps IP Prefix -> Identity for
// old identities where the prefix will now map to a new identity
Expand All @@ -175,15 +192,16 @@ func (ipc *IPCache) InjectLabels(ctx context.Context, modifiedPrefixes []netip.P
idsToAdd = make(map[identity.NumericIdentity]labels.LabelArray)
idsToDelete = make(map[identity.NumericIdentity]labels.LabelArray)
// entriesToReplace stores the identity to replace in the ipcache.
entriesToReplace = make(map[netip.Prefix]Identity)
entriesToDelete = make(map[netip.Prefix]Identity)
forceIPCacheUpdate = make(map[netip.Prefix]bool) // prefix => force
entriesToReplace = make(map[netip.Prefix]ipcacheEntry)
entriesToDelete = make(map[netip.Prefix]Identity)
)

ipc.metadata.RLock()

for i, prefix := range modifiedPrefixes {
oldID, entryExists := ipc.LookupByIP(prefix.String())
pstr := prefix.String()
oldID, entryExists := ipc.LookupByIP(pstr)
oldTunnelIP, oldEncryptionKey := ipc.GetHostIPCache(pstr)
prefixInfo := ipc.metadata.getLocked(prefix)
if prefixInfo == nil {
if !entryExists {
Expand Down Expand Up @@ -218,27 +236,31 @@ func (ipc *IPCache) InjectLabels(ctx context.Context, modifiedPrefixes []netip.P
break
}

// We can safely skip the ipcache upsert if the ID and source
// in the metadata cache match the ipcache exactly.
// Note that checking ID alone is insufficient, see GH-24502
if oldID.ID == newID.ID && prefixInfo.Source() == oldID.Source {
// We can safely skip the ipcache upsert if the entry matches with
// the entry in the metadata cache exactly.
// Note that checking ID alone is insufficient, see GH-24502.
if oldID.ID == newID.ID && prefixInfo.Source() == oldID.Source &&
oldTunnelIP.Equal(prefixInfo.TunnelPeer().IP()) &&
oldEncryptionKey == prefixInfo.EncryptKey().Uint8() {
goto releaseIdentity
}
christarazi marked this conversation as resolved.
Show resolved Hide resolved

idsToAdd[newID.ID] = newID.Labels.LabelArray()
entriesToReplace[prefix] = Identity{
ID: newID.ID,
Source: prefixInfo.Source(),
createdFromMetadata: true,
}
// IPCache.Upsert() and friends currently require a
// Source to be provided during upsert. If the old
// Source was higher precedence due to labels that
// have now been removed, then we need to explicitly
// work around that to remove the old higher-priority
// identity and replace it with this new identity.
if entryExists && prefixInfo.Source() != oldID.Source && oldID.ID != newID.ID {
forceIPCacheUpdate[prefix] = true
entriesToReplace[prefix] = ipcacheEntry{
identity: Identity{
ID: newID.ID,
Source: prefixInfo.Source(),
createdFromMetadata: true,
},
tunnelPeer: prefixInfo.TunnelPeer().IP(),
encryptKey: prefixInfo.EncryptKey().Uint8(),
// IPCache.Upsert() and friends currently require a
// Source to be provided during upsert. If the old
// Source was higher precedence due to labels that
// have now been removed, then we need to explicitly
// work around that to remove the old higher-priority
// identity and replace it with this new identity.
force: entryExists && prefixInfo.Source() != oldID.Source && oldID.ID != newID.ID,
}
}
releaseIdentity:
Expand Down Expand Up @@ -271,17 +293,16 @@ func (ipc *IPCache) InjectLabels(ctx context.Context, modifiedPrefixes []netip.P

ipc.mutex.Lock()
defer ipc.mutex.Unlock()
for p, id := range entriesToReplace {
for p, entry := range entriesToReplace {
prefix := p.String()
hIP, key := ipc.getHostIPCache(prefix)
meta := ipc.getK8sMetadata(prefix)
if _, err2 := ipc.upsertLocked(
prefix,
hIP,
key,
entry.tunnelPeer,
entry.encryptKey,
meta,
id,
forceIPCacheUpdate[p],
entry.identity,
entry.force,
); err2 != nil {
// It's plausible to pull the same information twice
// from different sources, for instance in etcd mode
Expand All @@ -291,13 +312,13 @@ func (ipc *IPCache) InjectLabels(ctx context.Context, modifiedPrefixes []netip.P
// identity is unchanged, then we can safely ignore the
// error message.
oldID, ok := previouslyAllocatedIdentities[p]
if !(ok && oldID.ID == id.ID && errors.Is(err2, &ErrOverwrite{
if !(ok && oldID.ID == entry.identity.ID && errors.Is(err2, &ErrOverwrite{
ExistingSrc: oldID.Source,
NewSrc: id.Source,
NewSrc: entry.identity.Source,
})) {
log.WithError(err2).WithFields(logrus.Fields{
logfields.IPAddr: prefix,
logfields.Identity: id,
logfields.Identity: entry.identity.ID,
}).Error("Failed to replace ipcache entry with new identity after label removal. Traffic may be disrupted.")
}
}
Expand Down Expand Up @@ -364,7 +385,7 @@ func (ipc *IPCache) UpdatePolicyMaps(ctx context.Context, addedIdentities, delet

// resolveIdentity will either return a previously-allocated identity for the
// given prefix or allocate a new one corresponding to the labels associated
// with the specified prefixInfo.
// with the specified PrefixInfo.
//
// This function will take an additional reference on the returned identity.
// The caller *must* ensure that this reference is eventually released via
Expand All @@ -376,7 +397,7 @@ func (ipc *IPCache) UpdatePolicyMaps(ctx context.Context, addedIdentities, delet
// - If the entry is not inserted (for instance, because the bpf IPCache map
// already has the same IP -> identity entry in the map), immediately release
// the reference.
func (ipc *IPCache) resolveIdentity(ctx context.Context, prefix netip.Prefix, info prefixInfo, restoredIdentity identity.NumericIdentity) (*identity.Identity, bool, error) {
func (ipc *IPCache) resolveIdentity(ctx context.Context, prefix netip.Prefix, info PrefixInfo, restoredIdentity identity.NumericIdentity) (*identity.Identity, bool, error) {
// Override identities always take precedence
if identityOverrideLabels, ok := info.identityOverride(); ok {
return ipc.IdentityAllocator.AllocateIdentity(ctx, identityOverrideLabels, false, identity.InvalidIdentity)
Expand Down Expand Up @@ -425,7 +446,8 @@ func (ipc *IPCache) resolveIdentity(ctx context.Context, prefix netip.Prefix, in
// we allow more arbitrary labels to be associated with these IPs that
// correspond to remote nodes.
if !lbls.Has(labels.LabelRemoteNode[labels.IDNameRemoteNode]) &&
!lbls.Has(labels.LabelHealth[labels.IDNameHealth]) {
!lbls.Has(labels.LabelHealth[labels.IDNameHealth]) &&
!lbls.Has(labels.LabelIngress[labels.IDNameIngress]) {
cidrLabels := cidrlabels.GetCIDRLabels(prefix)
lbls.MergeLabels(cidrLabels)
}
Expand Down