Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EndpointManager and NodeManager Cells #21746

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions daemon/cmd/cells.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package cmd
import (
"github.com/cilium/cilium/pkg/bgpv1"
"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/pkg/endpointmanager"
"github.com/cilium/cilium/pkg/gops"
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/k8s"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/node"
nodeManager "github.com/cilium/cilium/pkg/node/manager"
"github.com/cilium/cilium/pkg/option"
)

Expand Down Expand Up @@ -57,6 +59,12 @@ var (
// read-only stores.
k8s.SharedResourcesCell,

// EndpointManager maintains a collection of the locally running endpoints.
tommyp1ckles marked this conversation as resolved.
Show resolved Hide resolved
endpointmanager.Cell,

// NodeManager maintains a collection of other nodes in the cluster.
nodeManager.Cell,

// daemonCell wraps the legacy daemon initialization and provides Promise[*Daemon].
daemonCell,

Expand Down
38 changes: 8 additions & 30 deletions daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import (
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/k8s"
k8sConst "github.com/cilium/cilium/pkg/k8s/apis/cilium.io"
"github.com/cilium/cilium/pkg/k8s/client"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/watchers"
"github.com/cilium/cilium/pkg/labels"
Expand All @@ -74,6 +73,7 @@ import (
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"
"github.com/cilium/cilium/pkg/mtu"
"github.com/cilium/cilium/pkg/node"
nodeManager "github.com/cilium/cilium/pkg/node/manager"
nodemanager "github.com/cilium/cilium/pkg/node/manager"
nodeStore "github.com/cilium/cilium/pkg/node/store"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
Expand Down Expand Up @@ -163,7 +163,7 @@ type Daemon struct {

netConf *cnitypes.NetConf

endpointManager *endpointmanager.EndpointManager
endpointManager endpointmanager.EndpointManager

identityAllocator CachingIdentityAllocator

Expand Down Expand Up @@ -401,7 +401,9 @@ func removeOldRouterState(ipv6 bool, restoredIP net.IP) error {

// newDaemon creates and returns a new Daemon with the parameters set in c.
func newDaemon(ctx context.Context, cleaner *daemonCleanup,
epMgr *endpointmanager.EndpointManager, dp datapath.Datapath,
epMgr endpointmanager.EndpointManager,
nodeMngr nodeManager.NodeManager,
dp datapath.Datapath,
wgAgent *wg.Agent,
clientset k8sClient.Clientset,
sharedResources k8s.SharedResources,
Expand Down Expand Up @@ -516,10 +518,7 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup,
externalIP,
)

nodeMngr, err := nodemanager.NewManager("all", dp.Node(), option.Config, nil, nil)
if err != nil {
return nil, nil, err
}
nodeMngr.Subscribe(dp.Node())

identity.IterateReservedIdentities(func(_ identity.NumericIdentity, _ *identity.Identity) {
metrics.Identity.WithLabelValues(identity.ReservedIdentityType).Inc()
Expand Down Expand Up @@ -640,14 +639,11 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup,
}
}
}
nodeMngr = nodeMngr.WithIPCache(d.ipcache)
nodeMngr = nodeMngr.WithSelectorCacheUpdater(d.policy.GetSelectorCache()) // must be after initPolicy
nodeMngr = nodeMngr.WithPolicyTriggerer(epMgr) // must be after initPolicy
nodeMngr.SetIPCache(d.ipcache)

proxy.Allocator = d.identityAllocator

d.endpointManager = epMgr
d.endpointManager.InitMetrics()

// Start the proxy before we start K8s watcher or restore endpoints so that we can inject
// the daemon's proxy into the k8s watcher and each endpoint.
Expand Down Expand Up @@ -1312,24 +1308,7 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup,
return &d, restoredEndpoints, nil
}

// WithDefaultEndpointManager creates the default endpoint manager with a
// functional endpoint synchronizer.
func WithDefaultEndpointManager(ctx context.Context, clientset client.Clientset, checker endpointmanager.EndpointCheckerFunc) *endpointmanager.EndpointManager {
christarazi marked this conversation as resolved.
Show resolved Hide resolved
mgr := WithCustomEndpointManager(&watchers.EndpointSynchronizer{Clientset: clientset})
if option.Config.EndpointGCInterval > 0 {
mgr = mgr.WithPeriodicEndpointGC(ctx, checker, option.Config.EndpointGCInterval)
}
return mgr
}

// WithCustomEndpointManager creates the custom endpoint manager with the
// provided endpoint synchronizer. This is useful for tests which want to mock
// out the real endpoint synchronizer.
func WithCustomEndpointManager(s endpointmanager.EndpointResourceSynchronizer) *endpointmanager.EndpointManager {
return endpointmanager.NewEndpointManager(s)
}

func (d *Daemon) bootstrapClusterMesh(nodeMngr *nodemanager.Manager) {
func (d *Daemon) bootstrapClusterMesh(nodeMngr nodemanager.NodeManager) {
bootstrapStats.clusterMeshInit.Start()
if path := option.Config.ClusterMeshConfig; path != "" {
if option.Config.ClusterID == 0 {
Expand Down Expand Up @@ -1408,7 +1387,6 @@ func (d *Daemon) Close() {
}
d.identityAllocator.Close()
identitymanager.RemoveAll()
d.nodeDiscovery.Close()
d.cgroupManager.Close()
}

Expand Down
8 changes: 6 additions & 2 deletions daemon/cmd/daemon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
"github.com/cilium/cilium/pkg/datapath/maps"
datapathOption "github.com/cilium/cilium/pkg/datapath/option"
"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/pkg/endpoint"
"github.com/cilium/cilium/pkg/endpointmanager"
"github.com/cilium/cilium/pkg/envoy"
"github.com/cilium/cilium/pkg/flowdebug"
"github.com/cilium/cilium/pkg/hive"
Expand Down Expand Up @@ -71,6 +71,7 @@ import (
"github.com/cilium/cilium/pkg/metrics"
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"
"github.com/cilium/cilium/pkg/node"
nodeManager "github.com/cilium/cilium/pkg/node/manager"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
"github.com/cilium/cilium/pkg/option"
"github.com/cilium/cilium/pkg/pidfile"
Expand Down Expand Up @@ -1646,6 +1647,8 @@ type daemonParams struct {
BGPController *bgpv1.Controller
Shutdowner hive.Shutdowner
SharedResources k8s.SharedResources
NodeManager nodeManager.NodeManager
EndpointManager endpointmanager.EndpointManager
}

func newDaemonPromise(params daemonParams) promise.Promise[*Daemon] {
Expand All @@ -1669,7 +1672,8 @@ func newDaemonPromise(params daemonParams) promise.Promise[*Daemon] {
OnStart: func(hive.HookContext) error {
d, restoredEndpoints, err := newDaemon(
daemonCtx, cleaner,
WithDefaultEndpointManager(daemonCtx, params.Clientset, endpoint.CheckHealth),
params.EndpointManager,
params.NodeManager,
params.Datapath,
params.WGAgent,
params.Clientset,
Expand Down
2 changes: 1 addition & 1 deletion daemon/cmd/datapath.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (d *Daemon) SetPrefilter(preFilter datapath.PreFilter) {
// EndpointMapManager is a wrapper around an endpointmanager as well as the
// filesystem for removing maps related to endpoints from the filesystem.
type EndpointMapManager struct {
*endpointmanager.EndpointManager
endpointmanager.EndpointManager
}

// RemoveDatapathMapping unlinks the endpointID from the global policy map, preventing
Expand Down
3 changes: 2 additions & 1 deletion daemon/cmd/fqdn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cilium/cilium/pkg/counter"
"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/pkg/endpoint"
"github.com/cilium/cilium/pkg/endpointmanager"
"github.com/cilium/cilium/pkg/fqdn"
"github.com/cilium/cilium/pkg/fqdn/dns"
"github.com/cilium/cilium/pkg/fqdn/dnsproxy"
Expand Down Expand Up @@ -126,7 +127,7 @@ func (ds *DaemonFQDNSuite) SetUpTest(c *C) {
Cache: fqdn.NewDNSCache(0),
UpdateSelectors: d.updateSelectors,
})
d.endpointManager = WithCustomEndpointManager(&dummyEpSyncher{})
d.endpointManager = endpointmanager.New(&dummyEpSyncher{})
d.policy.GetSelectorCache().SetLocalIdentityNotifier(d.dnsNameManager)
d.ipcache = ipcache.NewIPCache(&ipcache.Configuration{
Context: context.TODO(),
Expand Down
2 changes: 1 addition & 1 deletion daemon/cmd/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
)

// initPolicy initializes the core policy components of the daemon.
func (d *Daemon) initPolicy(epMgr *endpointmanager.EndpointManager) error {
func (d *Daemon) initPolicy(epMgr endpointmanager.EndpointManager) error {
// Reuse policy.TriggerMetrics and PolicyTriggerInterval here since
// this is only triggered by agent configuration changes for now and
// should be counted in pol.TriggerMetrics.
Expand Down
5 changes: 2 additions & 3 deletions daemon/cmd/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/cilium/cilium/api/v1/models"
. "github.com/cilium/cilium/api/v1/server/restapi/daemon"
"github.com/cilium/cilium/pkg/checker"
"github.com/cilium/cilium/pkg/datapath/fake"
"github.com/cilium/cilium/pkg/mtu"
"github.com/cilium/cilium/pkg/node/manager"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
Expand All @@ -27,7 +26,7 @@ type GetNodesSuite struct {

var _ = Suite(&GetNodesSuite{})

var nm *manager.Manager
var nm manager.NodeManager

func (g *GetNodesSuite) SetUpTest(c *C) {
option.Config.IPv4ServiceRange = AutoCIDR
Expand All @@ -36,7 +35,7 @@ func (g *GetNodesSuite) SetUpTest(c *C) {

func (g *GetNodesSuite) SetUpSuite(c *C) {
var err error
nm, err = manager.NewManager("", fake.NewNodeHandler(), &fakeConfig.Config{}, nil, nil)
nm, err = manager.New("", &fakeConfig.Config{})
c.Assert(err, IsNil)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/auth/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (
)

type AuthManager struct {
endpointManager *endpointmanager.EndpointManager
endpointManager endpointmanager.EndpointsLookup
}

func NewAuthManager(epMgr *endpointmanager.EndpointManager) *AuthManager {
func NewAuthManager(epMgr endpointmanager.EndpointsLookup) *AuthManager {
return &AuthManager{
endpointManager: epMgr,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/clustermesh/clustermesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type Configuration struct {

// NodeManager is the node manager to manage all discovered remote
// nodes
NodeManager *nodemanager.Manager
NodeManager nodemanager.NodeManager

nodeObserver store.Observer

Expand Down