Skip to content

Commit

Permalink
ipcache: Introduce new asynchronous API
Browse files Browse the repository at this point in the history
Introduce a new generic API for associating information with the
IPs in the IPCache, which accounts for multiple sources of information
such as labels coming from different resources (eg Services ->
reserved:kube-apiserver, NetPol -> CIDR labels).

The primary core of this API is the UpsertMetadata(...) function, which
takes the following parameters:
- prefix: IP (range) that this info applies to;
- src: info source of the information;
- resource: specific resource name in the information source,
- aux: variable length list of information to associate with the prefix.

'aux' is typed as IPMetadata, which is effectively just an interface{}
to allow any information to be associated with the IPCache. Developers
should read the comments around IPMetadata and expand the IPCache
package in the relevant areas (particularly pkg/ipcache/types.go and the
InjectLabels() call) to ensure that the IPCache package correctly
handles the new information and effectively merges the different sources
of info correctly.

After info is upserted into the IPCache via this new API, it will
automatically trigger an out-of-band resolution of what the new IPCache
entry for the prefix should look like, taking into account each piece of
source information from various resources.

In this patch, we switch the current kube-apiserver logic over to the
new API as an initial example, removing the need for the caller to
trigger label injection since the new API will automatically schedule
this. Future work will expand this to switch other subsystems over to
the new APIs, introducing new resourceInfo fields and merging logic in
the ipcache package to decide how complementary (or even conflicting)
information should be combined in order to generate IPCache entries.

Signed-off-by: Joe Stringer <joe@cilium.io>
Signed-off-by: Chris Tarazi <chris@isovalent.com>
  • Loading branch information
joestringer committed Sep 1, 2022
1 parent 3828475 commit 3d191bc
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 33 deletions.
38 changes: 38 additions & 0 deletions pkg/ipcache/ipcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ package ipcache

import (
"net"
"net/netip"

"github.com/sirupsen/logrus"

"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/identity/cache"
ipcacheTypes "github.com/cilium/cilium/pkg/ipcache/types"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/metrics"
Expand Down Expand Up @@ -424,6 +426,42 @@ func (ipc *IPCache) DumpToListener(listener IPIdentityMappingListener) {
ipc.RUnlock()
}

// UpsertMetadata upserts a given IP and some corresponding information into
// the ipcache metadata map. See IPMetadata for a list of types that are valid
// to pass into this function. This will trigger asynchronous calculation of
// any datapath updates necessary to implement the logic associated with the
// specified metadata.
func (ipc *IPCache) UpsertMetadata(prefix netip.Prefix, src source.Source, resource ipcacheTypes.ResourceID, aux ...IPMetadata) {
ipc.metadata.Lock()
ipc.metadata.upsertLocked(prefix, src, resource, aux...)
ipc.metadata.Unlock()
ipc.metadata.enqueuePrefixUpdates(prefix)
ipc.TriggerLabelInjection()
}

func (ipc *IPCache) RemoveMetadata(prefix netip.Prefix, resource ipcacheTypes.ResourceID, aux ...IPMetadata) {
ipc.metadata.Lock()
ipc.metadata.remove(prefix, resource, aux...)
ipc.metadata.Unlock()
ipc.metadata.enqueuePrefixUpdates(prefix)
ipc.TriggerLabelInjection()
}

// UpsertLabels upserts a given IP and its corresponding labels associated
// with it into the ipcache metadata map. The given labels are not modified nor
// is its reference saved, as they're copied when inserting into the map.
// This will trigger asynchronous calculation of any local identity changes
// that must occur to associate the specified labels with the prefix, and push
// any datapath updates necessary to implement the logic associated with the
// metadata currently associated with the 'prefix'.
func (ipc *IPCache) UpsertLabels(prefix netip.Prefix, lbls labels.Labels, src source.Source, resource ipcacheTypes.ResourceID) {
ipc.UpsertMetadata(prefix, src, resource, lbls)
}

func (ipc *IPCache) RemoveLabels(cidr netip.Prefix, lbls labels.Labels, resource ipcacheTypes.ResourceID) {
ipc.RemoveMetadata(cidr, resource, lbls)
}

// DumpToListenerLocked dumps the entire contents of the IPCache by triggering
// the listener's "OnIPIdentityCacheChange" method for each entry in the cache.
// The caller *MUST* grab the IPCache.Lock for reading before calling this
Expand Down
13 changes: 0 additions & 13 deletions pkg/ipcache/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,6 @@ func (m *metadata) enqueuePrefixUpdates(prefixes ...netip.Prefix) {
}
}

// UpsertMetadata upserts a given IP and its corresponding labels associated
// with it into the ipcache metadata map. The given labels are not modified nor
// is its reference saved, as they're copied when inserting into the map.
//
// The caller must subsequently call ipc.TriggerLabelInjection() to implement
// these metadata updates into the datapath.
func (ipc *IPCache) UpsertMetadata(prefix netip.Prefix, lbls labels.Labels, src source.Source, rid types.ResourceID) {
ipc.metadata.Lock()
ipc.metadata.upsertLocked(prefix, src, rid, lbls)
ipc.metadata.Unlock()
ipc.metadata.enqueuePrefixUpdates(prefix)
}

func (m *metadata) upsertLocked(prefix netip.Prefix, src source.Source, resource types.ResourceID, info ...IPMetadata) {
if _, ok := m.m[prefix]; !ok {
m.m[prefix] = make(prefixInfo)
Expand Down
14 changes: 7 additions & 7 deletions pkg/ipcache/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestInjectLabels(t *testing.T) {

// Insert kube-apiserver IP from outside of the cluster. This should create
// a CIDR ID for this IP.
IPIdentityCache.UpsertMetadata(inClusterPrefix, labels.LabelKubeAPIServer, source.KubeAPIServer, "kube-uid")
IPIdentityCache.metadata.upsertLocked(inClusterPrefix, source.KubeAPIServer, "kube-uid", labels.LabelKubeAPIServer)
assert.Len(t, IPIdentityCache.metadata.m, 2)
remaining, err = IPIdentityCache.InjectLabels(ctx, []netip.Prefix{inClusterPrefix})
assert.NoError(t, err)
Expand All @@ -49,7 +49,7 @@ func TestInjectLabels(t *testing.T) {
// Upsert node labels to the kube-apiserver to validate that the CIDR ID is
// deallocated and the kube-apiserver reserved ID is associated with this
// IP now.
IPIdentityCache.UpsertMetadata(inClusterPrefix, labels.LabelRemoteNode, source.CustomResource, "node-uid")
IPIdentityCache.metadata.upsertLocked(inClusterPrefix, source.CustomResource, "node-uid", labels.LabelRemoteNode)
assert.Len(t, IPIdentityCache.metadata.m, 2)
remaining, err = IPIdentityCache.InjectLabels(ctx, []netip.Prefix{inClusterPrefix})
assert.NoError(t, err)
Expand All @@ -61,8 +61,8 @@ func TestInjectLabels(t *testing.T) {
func TestFilterMetadataByLabels(t *testing.T) {
setupTest(t)

IPIdentityCache.UpsertMetadata(netip.MustParsePrefix("2.1.1.1/32"), labels.LabelWorld, source.Generated, "gen-uid")
IPIdentityCache.UpsertMetadata(netip.MustParsePrefix("3.1.1.1/32"), labels.LabelWorld, source.Generated, "gen-uid-2")
IPIdentityCache.metadata.upsertLocked(netip.MustParsePrefix("2.1.1.1/32"), source.Generated, "gen-uid", labels.LabelWorld)
IPIdentityCache.metadata.upsertLocked(netip.MustParsePrefix("3.1.1.1/32"), source.Generated, "gen-uid-2", labels.LabelWorld)

assert.Len(t, IPIdentityCache.metadata.filterByLabels(labels.LabelKubeAPIServer), 1)
assert.Len(t, IPIdentityCache.metadata.filterByLabels(labels.LabelWorld), 2)
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestRemoveLabelsFromIPs(t *testing.T) {
// Entry with only kube-apiserver labels means kube-apiserver is outside of
// the cluster, and thus will have a CIDR identity when InjectLabels() is
// called.
IPIdentityCache.UpsertMetadata(worldPrefix, labels.LabelKubeAPIServer, source.CustomResource, "kube-uid")
IPIdentityCache.metadata.upsertLocked(worldPrefix, source.CustomResource, "kube-uid", labels.LabelKubeAPIServer)
remaining, err = IPIdentityCache.InjectLabels(ctx, []netip.Prefix{worldPrefix})
assert.NoError(t, err)
assert.Len(t, remaining, 0)
Expand Down Expand Up @@ -128,8 +128,8 @@ func setupTest(t *testing.T) {
})
IPIdentityCache.k8sSyncedChecker = &mockK8sSyncedChecker{}

IPIdentityCache.UpsertMetadata(worldPrefix, labels.LabelKubeAPIServer, source.CustomResource, "kube-uid")
IPIdentityCache.UpsertMetadata(worldPrefix, labels.LabelHost, source.Local, "host-uid")
IPIdentityCache.metadata.upsertLocked(worldPrefix, source.CustomResource, "kube-uid", labels.LabelKubeAPIServer)
IPIdentityCache.metadata.upsertLocked(worldPrefix, source.Local, "host-uid", labels.LabelHost)
}

type mockK8sSyncedChecker struct{}
Expand Down
4 changes: 1 addition & 3 deletions pkg/k8s/watchers/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,8 @@ func (k *K8sWatcher) handleKubeAPIServerServiceEPChanges(desiredIPs map[netip.Pr
)

for ip := range desiredIPs {
k.ipcache.UpsertMetadata(ip, labels.LabelKubeAPIServer, src, rid)
k.ipcache.UpsertLabels(ip, labels.LabelKubeAPIServer, src, rid)
}

k.ipcache.TriggerLabelInjection()
}

func insertK8sPrefix(desiredIPs map[netip.Prefix]struct{}, addr string, resource ipcacheTypes.ResourceID) {
Expand Down
9 changes: 3 additions & 6 deletions pkg/node/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ type nodeEntry struct {
type IPCache interface {
Upsert(ip string, hostIP net.IP, hostKey uint8, k8sMeta *ipcache.K8sMetadata, newIdentity ipcache.Identity) (bool, error)
Delete(IP string, source source.Source) bool
TriggerLabelInjection()
UpsertMetadata(prefix netip.Prefix, lbls labels.Labels, src source.Source, rid ipcacheTypes.ResourceID)
UpsertLabels(prefix netip.Prefix, lbls labels.Labels, src source.Source, rid ipcacheTypes.ResourceID)
}

// Configuration is the set of configuration options the node manager depends
Expand Down Expand Up @@ -561,18 +560,16 @@ func (m *Manager) NodeUpdated(n nodeTypes.Node) {
}
entry.mutex.Unlock()
}

m.ipcache.TriggerLabelInjection()
}

// upsertIntoIDMD upserts the given CIDR into the ipcache.identityMetadata
// (IDMD) map. The given node identity determines which labels are associated
// with the CIDR.
func (m *Manager) upsertIntoIDMD(prefix netip.Prefix, id identity.NumericIdentity, rid ipcacheTypes.ResourceID) {
if id == identity.ReservedIdentityHost {
m.ipcache.UpsertMetadata(prefix, labels.LabelHost, source.Local, rid)
m.ipcache.UpsertLabels(prefix, labels.LabelHost, source.Local, rid)
} else {
m.ipcache.UpsertMetadata(prefix, labels.LabelRemoteNode, source.CustomResource, rid)
m.ipcache.UpsertLabels(prefix, labels.LabelRemoteNode, source.CustomResource, rid)
}
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/node/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,7 @@ func (i *ipcacheMock) Delete(ip string, source source.Source) bool {
return false
}

func (i *ipcacheMock) TriggerLabelInjection() {
}

func (i *ipcacheMock) UpsertMetadata(netip.Prefix, labels.Labels, source.Source, ipcacheTypes.ResourceID) {
func (i *ipcacheMock) UpsertLabels(netip.Prefix, labels.Labels, source.Source, ipcacheTypes.ResourceID) {
}

type signalNodeHandler struct {
Expand Down

0 comments on commit 3d191bc

Please sign in to comment.