Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clustermesh-apiserver: extract kvstore client initialization and heartbeat logic in separate cells #25554

Merged
merged 3 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
92 changes: 48 additions & 44 deletions clustermesh-apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/identity"
identityCache "github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/inctimer"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/k8s"
ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
Expand All @@ -47,6 +46,7 @@ import (
"github.com/cilium/cilium/pkg/k8s/synced"
"github.com/cilium/cilium/pkg/k8s/types"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/kvstore/heartbeat"
"github.com/cilium/cilium/pkg/kvstore/store"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/logging"
Expand All @@ -56,6 +56,7 @@ import (
nodeTypes "github.com/cilium/cilium/pkg/node/types"
"github.com/cilium/cilium/pkg/option"
"github.com/cilium/cilium/pkg/pprof"
"github.com/cilium/cilium/pkg/promise"
)

type configuration struct {
Expand Down Expand Up @@ -108,6 +109,11 @@ var (
ciliumNodeStore *store.SharedStore

identityStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)

// Holds the backend retrieved from the promise. This global variable is used
// temporarily while the refactoring of the clustermesh-apiserver is in progress
// to reduce the amount of modifications required in this first step.
backend kvstore.BackendOperations
)

func init() {
Expand All @@ -121,6 +127,11 @@ func init() {
gops.Cell(defaults.GopsPortApiserver),
k8sClient.Cell,
k8s.SharedResourcesCell,

kvstore.Cell(kvstore.EtcdBackendName),
cell.Provide(func() *kvstore.ExtraOptions { return nil }),
heartbeat.Cell,

healthAPIServerCell,
cmmetrics.Cell,
usersManagementCell,
Expand All @@ -132,13 +143,28 @@ func init() {
vp = rootHive.Viper()
}

func registerHooks(lc hive.Lifecycle, clientset k8sClient.Clientset, services resource.Resource[*slim_corev1.Service]) error {
type parameters struct {
cell.In

Clientset k8sClient.Clientset
Services resource.Resource[*slim_corev1.Service]
BackendPromise promise.Promise[kvstore.BackendOperations]
}

func registerHooks(lc hive.Lifecycle, params parameters) error {
lc.Append(hive.Hook{
OnStart: func(ctx hive.HookContext) error {
if !clientset.IsEnabled() {
if !params.Clientset.IsEnabled() {
return errors.New("Kubernetes client not configured, cannot continue.")
}
startServer(ctx, clientset, services)

var err error
backend, err = params.BackendPromise.Await(ctx)
if err != nil {
return err
}

startServer(ctx, params.Clientset, params.Services)
return nil
},
})
Expand Down Expand Up @@ -228,20 +254,6 @@ func runApiserver() error {

flags.StringVar(&mockFile, "mock-file", "", "Read from mock file")

flags.Duration(option.KVstoreConnectivityTimeout, defaults.KVstoreConnectivityTimeout, "Time after which an incomplete kvstore operation is considered failed")
option.BindEnv(vp, option.KVstoreConnectivityTimeout)

flags.Duration(option.KVstoreLeaseTTL, defaults.KVstoreLeaseTTL, "Time-to-live for the KVstore lease.")
flags.MarkHidden(option.KVstoreLeaseTTL)
option.BindEnv(vp, option.KVstoreLeaseTTL)

flags.Duration(option.KVstorePeriodicSync, defaults.KVstorePeriodicSync, "Periodic KVstore synchronization interval")
option.BindEnv(vp, option.KVstorePeriodicSync)

flags.Var(option.NewNamedMapOptions(option.KVStoreOpt, &option.Config.KVStoreOpt, nil),
option.KVStoreOpt, "Key-value store options e.g. etcd.address=127.0.0.1:4001")
option.BindEnv(vp, option.KVStoreOpt)

flags.StringVar(&cfg.serviceProxyName, option.K8sServiceProxyName, "", "Value of K8s service-proxy-name label for which Cilium handles the services (empty = all services without service.kubernetes.io/service-proxy-name label)")
option.BindEnv(vp, option.K8sServiceProxyName)

Expand Down Expand Up @@ -306,10 +318,10 @@ func updateIdentity(obj interface{}) {
return
}

keyEncoded := []byte(kvstore.Client().Encode(key))
keyEncoded := []byte(backend.Encode(key))
log.WithFields(logrus.Fields{"key": keyPath, "value": string(keyEncoded)}).Info("Updating identity in etcd")

_, err := kvstore.Client().UpdateIfDifferent(context.Background(), keyPath, keyEncoded, true)
_, err := backend.UpdateIfDifferent(context.Background(), keyPath, keyEncoded, true)
if err != nil {
log.WithError(err).Warningf("Unable to update identity %s in etcd", keyPath)
}
Expand All @@ -328,7 +340,7 @@ func deleteIdentity(obj interface{}) {
}

keyPath := path.Join(identityCache.IdentitiesPath, "id", identity.Name)
err := kvstore.Client().Delete(context.Background(), keyPath)
err := backend.Delete(context.Background(), keyPath)
if err != nil {
log.WithError(err).Warningf("Unable to delete identity %s in etcd", keyPath)
}
Expand Down Expand Up @@ -456,7 +468,7 @@ func updateEndpoint(oldEp, newEp *types.CiliumEndpoint) {
continue
}

_, err = kvstore.Client().UpdateIfDifferent(context.Background(), keyPath, marshaledEntry, true)
_, err = backend.UpdateIfDifferent(context.Background(), keyPath, marshaledEntry, true)
if err != nil {
log.WithError(err).Warningf("Unable to update endpoint %s in etcd", keyPath)
} else {
Expand Down Expand Up @@ -488,7 +500,7 @@ func updateEndpoint(oldEp, newEp *types.CiliumEndpoint) {
if !found {
// Delete the old IPs from the kvstore:
keyPath := path.Join(ipcache.IPIdentitiesPath, ipcache.DefaultAddressSpace, oldIP)
if err := kvstore.Client().Delete(context.Background(), keyPath); err != nil {
if err := backend.Delete(context.Background(), keyPath); err != nil {
log.WithError(err).
WithFields(logrus.Fields{
"path": keyPath,
Expand All @@ -514,7 +526,7 @@ func deleteEndpoint(obj interface{}) {
}

keyPath := path.Join(ipcache.IPIdentitiesPath, ipcache.DefaultAddressSpace, ip)
if err := kvstore.Client().Delete(context.Background(), keyPath); err != nil {
if err := backend.Delete(context.Background(), keyPath); err != nil {
log.WithError(err).Warningf("Unable to delete endpoint %s in etcd", keyPath)
}
}
Expand Down Expand Up @@ -576,21 +588,19 @@ func startServer(startCtx hive.HookContext, clientset k8sClient.Clientset, servi
}

var err error
if err = kvstore.Setup(context.Background(), "etcd", option.Config.KVStoreOpt, nil); err != nil {
log.WithError(err).Fatal("Unable to connect to etcd")
}

config := cmtypes.CiliumClusterConfig{
ID: cfg.clusterID,
}

if err := clustermesh.SetClusterConfig(context.Background(), cfg.clusterName, &config, kvstore.Client()); err != nil {
if err := clustermesh.SetClusterConfig(context.Background(), cfg.clusterName, &config, backend); err != nil {
log.WithError(err).Fatal("Unable to set local cluster config on kvstore")
}

if cfg.enableExternalWorkloads {
mgr := NewVMManager(clientset)
mgr := NewVMManager(clientset, backend)
_, err = store.JoinSharedStore(store.Configuration{
Backend: backend,
Prefix: nodeStore.NodeRegisterStorePrefix,
KeyCreator: nodeStore.RegisterKeyCreator,
SharedKeyDeleteDelay: defaults.NodeDeleteDelay,
Expand All @@ -602,6 +612,7 @@ func startServer(startCtx hive.HookContext, clientset k8sClient.Clientset, servi
}

ciliumNodeStore, err = store.JoinSharedStore(store.Configuration{
Backend: backend,
Prefix: nodeStore.NodeStorePrefix,
KeyCreator: nodeStore.KeyCreator,
})
Expand All @@ -617,22 +628,15 @@ func startServer(startCtx hive.HookContext, clientset k8sClient.Clientset, servi
synchronizeIdentities(clientset)
synchronizeNodes(clientset)
synchronizeCiliumEndpoints(clientset)
operatorWatchers.StartSynchronizingServices(context.Background(), &sync.WaitGroup{}, clientset, services, !cfg.enableExternalWorkloads, cfg)
}
operatorWatchers.StartSynchronizingServices(context.Background(), &sync.WaitGroup{}, operatorWatchers.ServiceSyncParameters{
ServiceSyncConfiguration: cfg,

go func() {
timer, timerDone := inctimer.New()
defer timerDone()
for {
ctx, cancel := context.WithTimeout(context.Background(), defaults.LockLeaseTTL)
err := kvstore.Client().Update(ctx, kvstore.HeartbeatPath, []byte(time.Now().Format(time.RFC3339)), true)
if err != nil {
log.WithError(err).Warning("Unable to update heartbeat key")
}
cancel()
<-timer.After(kvstore.HeartbeatWriteInterval)
}
}()
Clientset: clientset,
Services: services,
Backend: backend,
SharedOnly: !cfg.enableExternalWorkloads,
})
}

log.Info("Initialization complete")
}
7 changes: 5 additions & 2 deletions clustermesh-apiserver/vmmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ type VMManager struct {

ciliumExternalWorkloadStore cache.Store
ciliumExternalWorkloadInformer cache.Controller

backend kvstore.BackendOperations
}

func NewVMManager(clientset k8sClient.Clientset) *VMManager {
func NewVMManager(clientset k8sClient.Clientset, backend kvstore.BackendOperations) *VMManager {
m := &VMManager{
ciliumClient: clientset,
backend: backend,
}
m.identityAllocator = identityCache.NewCachingIdentityAllocator(m)

Expand Down Expand Up @@ -449,7 +452,7 @@ func (m *VMManager) syncKVStoreKey(ctx context.Context, key store.LocalKey) erro
// Update key in kvstore, overwrite an eventual existing key, attach
// lease to expire entry when agent dies and never comes back up.
k := path.Join(nodeStore.NodeRegisterStorePrefix, key.GetKeyName())
if _, err := kvstore.Client().UpdateIfDifferent(ctx, k, jsonValue, true); err != nil {
if _, err := m.backend.UpdateIfDifferent(ctx, k, jsonValue, true); err != nil {
return err
}

Expand Down
8 changes: 7 additions & 1 deletion operator/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,13 @@ func (legacy *legacyOnLeader) onStart(_ hive.HookContext) error {
})

if legacy.clientset.IsEnabled() && operatorOption.Config.SyncK8sServices {
operatorWatchers.StartSynchronizingServices(legacy.ctx, &legacy.wg, legacy.clientset, legacy.resources.Services, true, option.Config)
operatorWatchers.StartSynchronizingServices(legacy.ctx, &legacy.wg, operatorWatchers.ServiceSyncParameters{
ServiceSyncConfiguration: option.Config,

Clientset: legacy.clientset,
Services: legacy.resources.Services,
SharedOnly: true,
})
// If K8s is enabled we can do the service translation automagically by
// looking at services from k8s and retrieve the service IP from that.
// This makes cilium to not depend on kube dns to interact with etcd
Expand Down
27 changes: 21 additions & 6 deletions operator/watchers/k8s_service_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,20 @@ type ServiceSyncConfiguration interface {
utils.ServiceConfiguration
}

type ServiceSyncParameters struct {
ServiceSyncConfiguration

Clientset k8sClient.Clientset
Services resource.Resource[*slim_corev1.Service]
Backend store.SyncStoreBackend
SharedOnly bool
}

// StartSynchronizingServices starts a controller for synchronizing services from k8s to kvstore
// 'shared' specifies whether only shared services are synchronized. If 'false' then all services
// will be synchronized. For clustermesh we only need to synchronize shared services, while for
// VM support we need to sync all the services.
func StartSynchronizingServices(ctx context.Context, wg *sync.WaitGroup, clientset k8sClient.Clientset, services resource.Resource[*slim_corev1.Service], shared bool, cfg ServiceSyncConfiguration) {
func StartSynchronizingServices(ctx context.Context, wg *sync.WaitGroup, cfg ServiceSyncParameters) {
log.Info("Starting to synchronize k8s services to kvstore")

serviceOptsModifier, err := utils.GetServiceListOptionsModifier(cfg)
Expand All @@ -122,7 +131,13 @@ func StartSynchronizingServices(ctx context.Context, wg *sync.WaitGroup, clients
readyChan := make(chan struct{}, 0)

go func() {
store := store.NewWorkqueueSyncStore(kvstore.Client(), serviceStore.ServiceStorePrefix,
if cfg.Backend == nil {
// Needs to be assigned in a separate goroutine, since it might block
// if the client is not yet initialized.
cfg.Backend = kvstore.Client()
}

store := store.NewWorkqueueSyncStore(cfg.Backend, serviceStore.ServiceStorePrefix,
store.WSSWithSourceClusterName(cfg.LocalClusterName()))
kvs = store
close(readyChan)
Expand All @@ -134,7 +149,7 @@ func StartSynchronizingServices(ctx context.Context, wg *sync.WaitGroup, clients

// Start populating the service cache
go func() {
for ev := range services.Events(ctx) {
for ev := range cfg.Services.Events(ctx) {
switch ev.Kind {
case resource.Sync:
// Wait until service cache updates have been fully processed.
Expand All @@ -160,7 +175,7 @@ func StartSynchronizingServices(ctx context.Context, wg *sync.WaitGroup, clients
switch {
case k8s.SupportsEndpointSlice():
var endpointSliceEnabled bool
endpointController, endpointSliceEnabled = endpointSlicesInit(ctx, wg, clientset.Slim(), swgEps)
endpointController, endpointSliceEnabled = endpointSlicesInit(ctx, wg, cfg.Clientset.Slim(), swgEps)
// the cluster has endpoint slices so we should not check for v1.Endpoints
if endpointSliceEnabled {
// endpointController has been kicked off already inside
Expand All @@ -175,7 +190,7 @@ func StartSynchronizingServices(ctx context.Context, wg *sync.WaitGroup, clients
}
fallthrough
default:
endpointController = endpointsInit(clientset.Slim(), swgEps, serviceOptsModifier)
endpointController = endpointsInit(cfg.Clientset.Slim(), swgEps, serviceOptsModifier)
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -197,7 +212,7 @@ func StartSynchronizingServices(ctx context.Context, wg *sync.WaitGroup, clients

<-readyChan
log.Info("Starting to synchronize Kubernetes services to kvstore")
k8sServiceHandler(ctx, cfg.LocalClusterName(), shared, cfg.LocalClusterID())
k8sServiceHandler(ctx, cfg.LocalClusterName(), cfg.SharedOnly, cfg.LocalClusterID())
}()
}

Expand Down