Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

auth: use NodeManager instead of k8s.CiliumNodeResource in auth gc #26592

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
79 changes: 50 additions & 29 deletions pkg/auth/authmap_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ package auth
import (
"context"
"fmt"
"net"
"time"

"github.com/sirupsen/logrus"

datapathTypes "github.com/cilium/cilium/pkg/datapath/types"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/identity/cache"
ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/k8s/resource"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/node/addressing"
"github.com/cilium/cilium/pkg/node/manager"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
"github.com/cilium/cilium/pkg/policy"
)

Expand Down Expand Up @@ -80,37 +80,58 @@ func (r *authMapGarbageCollector) cleanup(ctx context.Context) error {

// Nodes

func (r *authMapGarbageCollector) handleCiliumNodeEvent(_ context.Context, e resource.Event[*ciliumv2.CiliumNode]) (err error) {
func (r *authMapGarbageCollector) subscribeToNodeEvents(nodeManager manager.NodeManager) {
nodeManager.Subscribe(r)

r.logger.Debug("Nodes synced")
r.ciliumNodesSynced = true
}

func (r *authMapGarbageCollector) NodeAdd(newNode nodeTypes.Node) error {
r.ciliumNodesMutex.Lock()
defer r.ciliumNodesMutex.Unlock()

defer func() { e.Done(err) }()

switch e.Kind {
case resource.Upsert:
if r.ciliumNodesDiscovered != nil {
remoteNodeIDs := r.remoteNodeIDs(e.Object)
r.logger.
WithField("key", e.Key).
WithField("node_ids", remoteNodeIDs).
Debug("Node discovered - mark to keep")
for _, rID := range remoteNodeIDs {
r.ciliumNodesDiscovered[rID] = struct{}{}
}
}
case resource.Sync:
r.logger.Debug("Nodes synced")
r.ciliumNodesSynced = true
case resource.Delete:
remoteNodeIDs := r.remoteNodeIDs(e.Object)
if r.ciliumNodesDiscovered != nil {
remoteNodeIDs := r.remoteNodeIDs(newNode)
r.logger.
WithField("key", e.Key).
WithField("name", newNode.Identity().Name).
WithField("cluster", newNode.Identity().Cluster).
WithField("node_ids", remoteNodeIDs).
Debug("Node deleted - mark for deletion")
Debug("Node discovered - mark to keep")
for _, rID := range remoteNodeIDs {
r.ciliumNodesDeleted[rID] = struct{}{}
r.ciliumNodesDiscovered[rID] = struct{}{}
}
}

return nil
}

func (r *authMapGarbageCollector) NodeUpdate(oldNode, newNode nodeTypes.Node) error {
return nil
}

func (r *authMapGarbageCollector) NodeDelete(deletedNode nodeTypes.Node) error {
r.ciliumNodesMutex.Lock()
defer r.ciliumNodesMutex.Unlock()

remoteNodeIDs := r.remoteNodeIDs(deletedNode)
r.logger.
WithField("name", deletedNode.Identity().Name).
WithField("cluster", deletedNode.Identity().Cluster).
WithField("node_ids", remoteNodeIDs).
Debug("Node deleted - mark for deletion")
for _, rID := range remoteNodeIDs {
r.ciliumNodesDeleted[rID] = struct{}{}
}

return nil
}

func (r *authMapGarbageCollector) NodeValidateImplementation(node nodeTypes.Node) error {
return nil
}

func (r *authMapGarbageCollector) NodeConfigurationChanged(config datapathTypes.LocalNodeConfiguration) error {
return nil
}

Expand Down Expand Up @@ -184,12 +205,12 @@ func (r *authMapGarbageCollector) cleanupDeletedNode(nodeID uint16) error {
})
}

func (r *authMapGarbageCollector) remoteNodeIDs(node *ciliumv2.CiliumNode) []uint16 {
func (r *authMapGarbageCollector) remoteNodeIDs(node nodeTypes.Node) []uint16 {
var remoteNodeIDs []uint16

for _, addr := range node.Spec.Addresses {
for _, addr := range node.IPAddresses {
if addr.Type == addressing.NodeInternalIP {
nodeID, exists := r.ipCache.GetNodeID(net.ParseIP(addr.IP))
nodeID, exists := r.ipCache.GetNodeID(addr.IP)
if !exists {
// This might be the case at startup, when new nodes aren't yet known to the nodehandler
// and therefore no node id has been assigned to them.
Expand Down
66 changes: 25 additions & 41 deletions pkg/auth/authmap_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@ package auth

import (
"context"
"net"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/identity/cache"
ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/k8s/resource"
"github.com/cilium/cilium/pkg/node/addressing"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
"github.com/cilium/cilium/pkg/policy"
)

Expand Down Expand Up @@ -120,12 +119,12 @@ func Test_authMapGarbageCollector_cleanupNodes(t *testing.T) {
assert.Empty(t, gc.ciliumNodesDeleted)
assert.False(t, gc.ciliumNodesSynced)

err := gc.handleCiliumNodeEvent(ctx, ciliumNodeEvent(resource.Upsert, "172.18.0.1"))
err := gc.NodeAdd(ciliumNodeEvent("172.18.0.1"))
assert.NoError(t, err, "Handling a node event should never result in an error")
assert.Len(t, authMap.entries, 5, "Node events should never modify the map directly")
assert.Len(t, gc.ciliumNodesDiscovered, 2, "Discovered nodes should be kept in the internal state")

err = gc.handleCiliumNodeEvent(ctx, ciliumNodeEvent(resource.Upsert, "172.18.0.2"))
err = gc.NodeAdd(ciliumNodeEvent("172.18.0.2"))
assert.NoError(t, err, "Handling a node event should never result in an error")
assert.Len(t, authMap.entries, 5, "Node events should never modify the map directly")
assert.Len(t, gc.ciliumNodesDiscovered, 3, "Discovered nodes should be kept in the internal state")
Expand All @@ -135,17 +134,14 @@ func Test_authMapGarbageCollector_cleanupNodes(t *testing.T) {
assert.Len(t, authMap.entries, 5, "GC run before the initial sync should not delete any entries from the auth map")
assert.Len(t, gc.ciliumNodesDiscovered, 3, "GC run before the initial sync should not delete the discovered nodes")

err = gc.handleCiliumNodeEvent(ctx, ciliumNodeEvent(resource.Sync, ""))
assert.NoError(t, err, "Handling a node event event should never result in an error")
assert.Len(t, authMap.entries, 5, "Node events should never modify the map directly")
assert.True(t, gc.ciliumNodesSynced, "Node sync event will mark the nodes as synced")
gc.ciliumNodesSynced = true // Node sync event will mark the nodes as synced

err = gc.handleCiliumNodeEvent(ctx, ciliumNodeEvent(resource.Delete, "172.18.0.2"))
err = gc.NodeDelete(ciliumNodeEvent("172.18.0.2"))
assert.NoError(t, err, "Handling a node event should never result in an error")
assert.Len(t, authMap.entries, 5, "Node events should never modify the map directly")
assert.Len(t, gc.ciliumNodesDeleted, 1, "Deleted nodes after the sync and before the initial GC run should already be kept")

err = gc.handleCiliumNodeEvent(ctx, ciliumNodeEvent(resource.Upsert, "172.18.0.3"))
err = gc.NodeAdd(ciliumNodeEvent("172.18.0.3"))
assert.NoError(t, err, "Handling a node event should never result in an error")
assert.Len(t, authMap.entries, 5, "Node events should never modify the map directly")
assert.Len(t, gc.ciliumNodesDiscovered, 4, "Discovered nodes after the sync event should be kept until the first GC run")
Expand All @@ -159,12 +155,12 @@ func Test_authMapGarbageCollector_cleanupNodes(t *testing.T) {
assert.Nil(t, gc.ciliumNodesDiscovered, "First GC run after the initial sync should reset the option to discover nodes")
assert.Empty(t, gc.ciliumNodesDeleted, "GC runs should delete the successfully garbage collected entries from the list of deleted nodes")

err = gc.handleCiliumNodeEvent(ctx, ciliumNodeEvent(resource.Upsert, "172.18.0.5"))
err = gc.NodeAdd(ciliumNodeEvent("172.18.0.5"))
assert.NoError(t, err, "Handling a node should never result in an error")
assert.Len(t, authMap.entries, 3, "Node should never modify the map directly")
assert.Nil(t, gc.ciliumNodesDiscovered, "Discovered nodes after the first GC run should no longer be of any interest")

err = gc.handleCiliumNodeEvent(ctx, ciliumNodeEvent(resource.Delete, "172.18.0.3"))
err = gc.NodeDelete(ciliumNodeEvent("172.18.0.3"))
assert.NoError(t, err, "Handling a node event should never result in an error")
assert.Len(t, authMap.entries, 3, "Node events should never modify the map directly")
assert.Len(t, gc.ciliumNodesDeleted, 1, "Deleted nodes should be kept for the next GC run")
Expand Down Expand Up @@ -276,10 +272,10 @@ func Test_authMapGarbageCollector_cleanup(t *testing.T) {

assert.Len(t, authMap.entries, 7)

require.NoError(t, gc.handleCiliumNodeEvent(ctx, ciliumNodeEvent(resource.Upsert, "172.18.0.1")))
require.NoError(t, gc.handleCiliumNodeEvent(ctx, ciliumNodeEvent(resource.Upsert, "172.18.0.2")))
require.NoError(t, gc.handleCiliumNodeEvent(ctx, ciliumNodeEvent(resource.Sync, "")))
require.NoError(t, gc.handleCiliumNodeEvent(ctx, ciliumNodeEvent(resource.Delete, "172.18.0.1")))
require.NoError(t, gc.NodeAdd(ciliumNodeEvent("172.18.0.1")))
require.NoError(t, gc.NodeAdd(ciliumNodeEvent("172.18.0.2")))
gc.ciliumNodesSynced = true
require.NoError(t, gc.NodeDelete(ciliumNodeEvent("172.18.0.1")))
for i := 1; i < 15; i++ {
require.NoError(t, gc.handleIdentityChange(ctx, ciliumIdentityEvent(cache.IdentityChangeUpsert, identity.NumericIdentity(i))))
}
Expand All @@ -300,14 +296,11 @@ func Test_authMapGarbageCollector_HandleNodeEventError(t *testing.T) {
}
gc := newAuthMapGC(logrus.New(), authMap, newFakeIPCache(map[uint16]string{10: "172.18.0.3"}), nil)

event := ciliumNodeEvent(resource.Delete, "172.18.0.3")
var eventErr error
event.Done = func(err error) {
eventErr = err
}
err := gc.handleCiliumNodeEvent(context.Background(), event)
event := ciliumNodeEvent("172.18.0.3")
err := gc.NodeAdd(event)
assert.NoError(t, err)
err = gc.NodeDelete(event)
assert.NoError(t, err)
assert.NoError(t, eventErr)

gc.ciliumNodesSynced = true
gc.ciliumNodesDiscovered = nil
Expand All @@ -332,25 +325,16 @@ func Test_authMapGarbageCollector_HandleIdentityEventError(t *testing.T) {
assert.ErrorContains(t, err, "failed to cleanup deleted identity: failed to delete entry")
}

func ciliumNodeEvent(eventType resource.EventKind, nodeInternalIP string) resource.Event[*ciliumv2.CiliumNode] {
return resource.Event[*ciliumv2.CiliumNode]{
Kind: eventType,
Done: func(err error) {},
Object: &ciliumv2.CiliumNode{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: "test-node",
},
Spec: ciliumv2.NodeSpec{
Addresses: []ciliumv2.NodeAddress{
{
Type: addressing.NodeInternalIP,
IP: nodeInternalIP,
},
},
func ciliumNodeEvent(nodeInternalIP string) nodeTypes.Node {
return nodeTypes.Node{
Name: "test-node",
Cluster: "test-cluster",
IPAddresses: []nodeTypes.Address{
{
Type: addressing.NodeInternalIP,
IP: net.ParseIP(nodeInternalIP),
},
},
Key: resource.Key{Namespace: "test-ns", Name: "test-node"},
}
}

Expand Down
36 changes: 17 additions & 19 deletions pkg/auth/cell.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ import (
"github.com/cilium/cilium/pkg/hive/job"
"github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/k8s"
ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/k8s/resource"
"github.com/cilium/cilium/pkg/maps/authmap"
nodeManager "github.com/cilium/cilium/pkg/node/manager"
"github.com/cilium/cilium/pkg/policy"
"github.com/cilium/cilium/pkg/signal"
"github.com/cilium/cilium/pkg/stream"
Expand All @@ -46,12 +44,6 @@ var Cell = cell.Module(
// Always fail auth handler provides support for auth type "always-fail" - which always fails.
newAlwaysFailAuthHandler,
),
// Providing k8s resource Node & Identity privately to avoid further usage of them in other agent components
cell.ProvidePrivate(
// TODO: use node manager to get events of all nodes, including the ones of other clusters (ClusterMesh)
// https://github.com/cilium/cilium/issues/25899
k8s.CiliumNodeResource,
),
cell.Config(config{
MeshAuthEnabled: true,
MeshAuthQueueSize: 1024,
Expand Down Expand Up @@ -86,7 +78,7 @@ type authManagerParams struct {
SignalManager signal.SignalManager
IPCache *ipcache.IPCache
IdentityChanges stream.Observable[cache.IdentityChange]
CiliumNodes resource.Resource[*ciliumv2.CiliumNode]
NodeManager nodeManager.NodeManager
PolicyRepo *policy.Repository
}

Expand Down Expand Up @@ -124,13 +116,14 @@ func registerAuthManager(params authManagerParams) error {
job.WithLogger(params.Logger),
job.WithPprofLabels(pprof.Labels("cell", "auth")),
)
params.Lifecycle.Append(jobGroup)

if err := registerSignalAuthenticationJob(jobGroup, mgr, params.SignalManager, params.Config); err != nil {
return fmt.Errorf("failed to register signal authentication job: %w", err)
}
registerReAuthenticationJob(jobGroup, mgr, params.AuthHandlers)
registerGCJobs(jobGroup, mapGC, params.Config, params.CiliumNodes, params.IdentityChanges)
registerGCJobs(jobGroup, params.Lifecycle, mapGC, params.Config, params.NodeManager, params.IdentityChanges)

params.Lifecycle.Append(jobGroup)

return nil
}
Expand All @@ -157,14 +150,19 @@ func registerSignalAuthenticationJob(jobGroup job.Group, mgr *authManager, sm si
return nil
}

func registerGCJobs(jobGroup job.Group, mapGC *authMapGarbageCollector, cfg config, nodeChanges resource.Resource[*ciliumv2.CiliumNode], identityChanges stream.Observable[cache.IdentityChange]) {
jobGroup.Add(job.Observer("auth gc-identity-events", mapGC.handleIdentityChange, identityChanges))

// Add node based auth gc if k8s client is enabled
if nodeChanges != nil {
jobGroup.Add(job.Observer[resource.Event[*ciliumv2.CiliumNode]]("auth gc-node-events", mapGC.handleCiliumNodeEvent, nodeChanges))
}
func registerGCJobs(jobGroup job.Group, lifecycle hive.Lifecycle, mapGC *authMapGarbageCollector, cfg config, nodeManager nodeManager.NodeManager, identityChanges stream.Observable[cache.IdentityChange]) {
lifecycle.Append(hive.Hook{
OnStart: func(hookContext hive.HookContext) error {
mapGC.subscribeToNodeEvents(nodeManager)
return nil
},
OnStop: func(hookContext hive.HookContext) error {
nodeManager.Unsubscribe(mapGC)
return nil
},
})

jobGroup.Add(job.Observer("auth gc-identity-events", mapGC.handleIdentityChange, identityChanges))
jobGroup.Add(job.Timer("auth gc-cleanup", mapGC.cleanup, cfg.MeshAuthGCInterval))
}

Expand Down