Skip to content

Commit

Permalink
clustermesh: introduce hive.Cell
Browse files Browse the repository at this point in the history
This commit refactors the clustermesh subsystem into a cell, which is
registered in the controlplane module.

Signed-off-by: Marco Iorio <marco.iorio@isovalent.com>
  • Loading branch information
giorio94 authored and tklauser committed May 26, 2023
1 parent 59c201f commit 69b5845
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 137 deletions.
1 change: 1 addition & 0 deletions Documentation/cmdref/cilium-agent_hive.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Documentation/cmdref/cilium-agent_hive_dot-graph.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions daemon/cmd/cells.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/cilium/cilium/daemon/cmd/cni"
"github.com/cilium/cilium/pkg/auth"
"github.com/cilium/cilium/pkg/bgpv1"
"github.com/cilium/cilium/pkg/clustermesh"
"github.com/cilium/cilium/pkg/crypto/certificatemanager"
"github.com/cilium/cilium/pkg/datapath"
dptypes "github.com/cilium/cilium/pkg/datapath/types"
Expand Down Expand Up @@ -113,5 +114,8 @@ var (

// ServiceCache holds the list of known services correlated with the matching endpoints.
cell.Provide(func(dp dptypes.Datapath) *k8s.ServiceCache { return k8s.NewServiceCache(dp.LocalNodeAddressing()) }),

// ClusterMesh is the Cilium's multicluster implementation.
clustermesh.Cell,
)
)
32 changes: 1 addition & 31 deletions daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ import (
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"
"github.com/cilium/cilium/pkg/mtu"
"github.com/cilium/cilium/pkg/node"
nodemanager "github.com/cilium/cilium/pkg/node/manager"
nodeStore "github.com/cilium/cilium/pkg/node/store"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
"github.com/cilium/cilium/pkg/nodediscovery"
"github.com/cilium/cilium/pkg/option"
Expand Down Expand Up @@ -537,6 +535,7 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup, params *daemonParams
policyUpdater: params.PolicyUpdater,
egressGatewayManager: params.EgressGatewayManager,
cniConfigManager: params.CNIConfigManager,
clustermesh: params.ClusterMesh,
}

if option.Config.RunMonitorAgent {
Expand Down Expand Up @@ -1180,8 +1179,6 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup, params *daemonParams
// identity allocator to run asynchronously.
realIdentityAllocator := d.identityAllocator
realIdentityAllocator.InitIdentityAllocator(params.Clientset)

d.bootstrapClusterMesh(params.NodeManager)
}

// Must be done at least after initializing BPF LB-related maps
Expand Down Expand Up @@ -1279,33 +1276,6 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup, params *daemonParams
return &d, restoredEndpoints, nil
}

func (d *Daemon) bootstrapClusterMesh(nodeMngr nodemanager.NodeManager) {
bootstrapStats.clusterMeshInit.Start()
if path := option.Config.ClusterMeshConfig; path != "" {
if option.Config.ClusterID == 0 {
log.Info("Cluster-ID is not specified, skipping ClusterMesh initialization")
} else {
log.WithField("path", path).Info("Initializing ClusterMesh routing")
clustermesh, err := clustermesh.NewClusterMesh(clustermesh.Configuration{
Name: option.Config.ClusterName,
NodeName: nodeTypes.GetName(),
ConfigDirectory: path,
NodeKeyCreator: nodeStore.KeyCreator,
ServiceMerger: d.k8sWatcher.K8sSvcCache,
NodeManager: nodeMngr,
RemoteIdentityWatcher: d.identityAllocator,
IPCache: d.ipcache,
})
if err != nil {
log.WithError(err).Fatal("Unable to initialize ClusterMesh")
}

d.clustermesh = clustermesh
}
}
bootstrapStats.clusterMeshInit.End(true)
}

// ReloadOnDeviceChange regenerates device related information and reloads the datapath.
// The devices is the new set of devices that replaces the old set.
func (d *Daemon) ReloadOnDeviceChange(devices []string) {
Expand Down
5 changes: 2 additions & 3 deletions daemon/cmd/daemon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
bgpv1 "github.com/cilium/cilium/pkg/bgpv1/agent"
"github.com/cilium/cilium/pkg/bpf"
"github.com/cilium/cilium/pkg/cgroups"
"github.com/cilium/cilium/pkg/clustermesh"
"github.com/cilium/cilium/pkg/common"
"github.com/cilium/cilium/pkg/components"
"github.com/cilium/cilium/pkg/controller"
Expand Down Expand Up @@ -192,9 +193,6 @@ func initializeFlags() {
flags.String(option.ClusterName, defaults.ClusterName, "Name of the cluster")
option.BindEnv(Vp, option.ClusterName)

flags.String(option.ClusterMeshConfigName, "", "Path to the ClusterMesh configuration directory")
option.BindEnv(Vp, option.ClusterMeshConfigName)

flags.StringSlice(option.CompilerFlags, []string{}, "Extra CFLAGS for BPF compilation")
flags.MarkHidden(option.CompilerFlags)
option.BindEnv(Vp, option.CompilerFlags)
Expand Down Expand Up @@ -1621,6 +1619,7 @@ type daemonParams struct {
SwaggerSpec *server.Spec
HealthAPISpec *healthApi.Spec
ServiceCache *k8s.ServiceCache
ClusterMesh *clustermesh.ClusterMesh
}

func newDaemonPromise(params daemonParams) promise.Promise[*Daemon] {
Expand Down
47 changes: 47 additions & 0 deletions pkg/clustermesh/cell.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package clustermesh

import (
"github.com/spf13/pflag"

"github.com/cilium/cilium/pkg/clustermesh/types"
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/k8s"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/kvstore/store"
nodemanager "github.com/cilium/cilium/pkg/node/manager"
nodeStore "github.com/cilium/cilium/pkg/node/store"
"github.com/cilium/cilium/pkg/option"
)

var Cell = cell.Module(
"clustermesh",
"ClusterMesh is the Cilium multicluster implementation",

cell.Provide(NewClusterMesh),

// Convert concrete objects into more restricted interfaces used by clustermesh.
cell.ProvidePrivate(func(sc *k8s.ServiceCache) ServiceMerger { return sc }),
cell.ProvidePrivate(func(ipcache *ipcache.IPCache) ipcache.IPCacher { return ipcache }),
cell.ProvidePrivate(func(mgr nodemanager.NodeManager) (store.Observer, kvstore.ClusterSizeDependantIntervalFunc) {
return nodeStore.NewNodeObserver(mgr), mgr.ClusterSizeDependantInterval
}),
cell.ProvidePrivate(func() store.KeyCreator { return nodeStore.KeyCreator }),
cell.ProvidePrivate(func(cfg *option.DaemonConfig) types.ClusterIDName {
return types.ClusterIDName{ClusterID: cfg.ClusterID, ClusterName: cfg.ClusterName}
}),

cell.Config(Config{}),
)

type Config struct {
// ClusterMeshConfig is the path to the clustermesh configuration directory.
ClusterMeshConfig string
}

func (def Config) Flags(flags *pflag.FlagSet) {
flags.String("clustermesh-config", def.ClusterMeshConfig, "Path to the ClusterMesh configuration directory")
}
86 changes: 46 additions & 40 deletions pkg/clustermesh/clustermesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@ import (

"github.com/cilium/cilium/api/v1/models"
"github.com/cilium/cilium/pkg/allocator"
"github.com/cilium/cilium/pkg/clustermesh/types"
cmtypes "github.com/cilium/cilium/pkg/clustermesh/types"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/hive"
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/kvstore/store"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/metrics"
nodemanager "github.com/cilium/cilium/pkg/node/manager"
nodeStore "github.com/cilium/cilium/pkg/node/store"
"github.com/cilium/cilium/pkg/option"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
)

const (
Expand All @@ -37,15 +38,12 @@ const (
// Configuration is the configuration that must be provided to
// NewClusterMesh()
type Configuration struct {
// Name is the name of the local cluster. This is used for logging and metrics
Name string
cell.In

// NodeName is the name of the local node. This is used for logging and metrics
NodeName string
Config

// ConfigDirectory is the path to the directory that will be watched for etcd
// configuration files to appear
ConfigDirectory string
// ClusterIDName is the id/name of the local cluster. This is used for logging and metrics
types.ClusterIDName

// NodeKeyCreator is the function used to create node instances as
// nodes are being discovered in remote clusters
Expand All @@ -55,17 +53,17 @@ type Configuration struct {
// endpoints into an existing cache
ServiceMerger ServiceMerger

// NodeManager is the node manager to manage all discovered remote
// nodes
NodeManager nodemanager.NodeManager

nodeObserver store.Observer
// NodeObserver reacts to node events.
NodeObserver store.Observer

// RemoteIdentityWatcher provides identities that have been allocated on a
// remote cluster.
RemoteIdentityWatcher RemoteIdentityWatcher

IPCache ipcache.IPCacher

// ClusterSizeDependantInterval allows to calculate intervals based on cluster size.
ClusterSizeDependantInterval kvstore.ClusterSizeDependantIntervalFunc `optional:"true"`
}

func SetClusterConfig(clusterName string, config *cmtypes.CiliumClusterConfig, backend kvstore.BackendOperations) error {
Expand Down Expand Up @@ -125,31 +123,22 @@ type RemoteIdentityWatcher interface {
Close()
}

// NodeObserver returns the node store observer of the configuration
func (c *Configuration) NodeObserver() store.Observer {
if c.nodeObserver != nil {
return c.nodeObserver
}

return nodeStore.NewNodeObserver(c.NodeManager)
}

// ClusterMesh is a cache of multiple remote clusters
type ClusterMesh struct {
// conf is the configuration, it is immutable after NewClusterMesh()
conf Configuration

mutex lock.RWMutex
clusters map[string]*remoteCluster
controllers *controller.Manager
configWatcher *configDirectoryWatcher

ipcache ipcache.IPCacher

// globalServices is a list of all global services. The datastructure
// is protected by its own mutex inside the structure.
globalServices *globalServiceCache

// nodeName is the name of the local node. This is used for logging and metrics
nodeName string

// metricTotalRemoteClusters is gauge metric keeping track of total number
// of remote clusters.
metricTotalRemoteClusters *prometheus.GaugeVec
Expand All @@ -169,12 +158,22 @@ type ClusterMesh struct {

// NewClusterMesh creates a new remote cluster cache based on the
// provided configuration
func NewClusterMesh(c Configuration) (*ClusterMesh, error) {
func NewClusterMesh(lifecycle hive.Lifecycle, c Configuration) *ClusterMesh {
if c.ClusterID == 0 {
return nil
}

if c.ClusterMeshConfig == "" {
return nil
}

nodeName := nodeTypes.GetName()
cm := &ClusterMesh{
conf: c,
clusters: map[string]*remoteCluster{},
controllers: controller.NewManager(),
globalServices: newGlobalServiceCache(c.Name, c.NodeName),
globalServices: newGlobalServiceCache(c.ClusterName, nodeName),
nodeName: nodeName,

metricTotalRemoteClusters: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: subsystem,
Expand Down Expand Up @@ -209,18 +208,22 @@ func NewClusterMesh(c Configuration) (*ClusterMesh, error) {
Name: "remote_cluster_nodes",
Help: "The total number of nodes in the remote cluster",
}, []string{metrics.LabelSourceCluster, metrics.LabelSourceNodeName, metrics.LabelTargetCluster}),
ipcache: c.IPCache,
}

w, err := createConfigDirectoryWatcher(c.ConfigDirectory, cm)
lifecycle.Append(cm)
return cm
}

func (cm *ClusterMesh) Start(hive.HookContext) error {
w, err := createConfigDirectoryWatcher(cm.conf.ClusterMeshConfig, cm)
if err != nil {
return nil, fmt.Errorf("unable to create config directory watcher: %s", err)
return fmt.Errorf("unable to create config directory watcher: %w", err)
}

cm.configWatcher = w

if err := cm.configWatcher.watch(); err != nil {
return nil, err
return fmt.Errorf("unable to start config directory watcher: %w", err)
}

_ = metrics.RegisterList([]prometheus.Collector{
Expand All @@ -230,12 +233,13 @@ func NewClusterMesh(c Configuration) (*ClusterMesh, error) {
cm.metricTotalFailures,
cm.metricTotalNodes,
})
return cm, nil

return nil
}

// Close stops watching for remote cluster configuration files to appear and
// will close all connections to remote clusters
func (cm *ClusterMesh) Close() {
func (cm *ClusterMesh) Stop(hive.HookContext) error {
cm.mutex.Lock()
defer cm.mutex.Unlock()

Expand All @@ -247,12 +251,14 @@ func (cm *ClusterMesh) Close() {
cluster.onRemove()
delete(cm.clusters, name)
}
cm.controllers.RemoveAllAndWait()

metrics.Unregister(cm.metricTotalRemoteClusters)
metrics.Unregister(cm.metricLastFailureTimestamp)
metrics.Unregister(cm.metricReadinessStatus)
metrics.Unregister(cm.metricTotalFailures)
metrics.Unregister(cm.metricTotalNodes)

return nil
}

func (cm *ClusterMesh) newRemoteCluster(name, path string) *remoteCluster {
Expand All @@ -269,7 +275,7 @@ func (cm *ClusterMesh) newRemoteCluster(name, path string) *remoteCluster {
}

func (cm *ClusterMesh) add(name, path string) {
if name == option.Config.ClusterName {
if name == cm.conf.ClusterName {
log.WithField(fieldClusterName, name).Debug("Ignoring configuration for own cluster")
return
}
Expand All @@ -283,7 +289,7 @@ func (cm *ClusterMesh) add(name, path string) {
inserted = true
}

cm.metricTotalRemoteClusters.WithLabelValues(cm.conf.Name, cm.conf.NodeName).Set(float64(len(cm.clusters)))
cm.metricTotalRemoteClusters.WithLabelValues(cm.conf.ClusterName, cm.nodeName).Set(float64(len(cm.clusters)))
cm.mutex.Unlock()

log.WithField(fieldClusterName, name).Debug("Remote cluster configuration added")
Expand All @@ -301,7 +307,7 @@ func (cm *ClusterMesh) remove(name string) {
if cluster, ok := cm.clusters[name]; ok {
cluster.onRemove()
delete(cm.clusters, name)
cm.metricTotalRemoteClusters.WithLabelValues(cm.conf.Name, cm.conf.NodeName).Set(float64(len(cm.clusters)))
cm.metricTotalRemoteClusters.WithLabelValues(cm.conf.ClusterName, cm.nodeName).Set(float64(len(cm.clusters)))
cm.globalServices.onClusterDelete(name)
}
cm.mutex.Unlock()
Expand Down

0 comments on commit 69b5845

Please sign in to comment.