Skip to content

Commit

Permalink
clustermesh: expose service cache and hooks from endpointslicesync
Browse files Browse the repository at this point in the history
Expose Global Service cache and various hooks registration from
the endpointslicesync package. This will allow to collocate other
controllers in different go packages in the operator
that need to use the global service cache and register hooks.

The first use case for this would be for the Multi-Cluster Services API
support.

Signed-off-by: Arthur Outhenin-Chalandre <arthur@cri.epita.fr>
  • Loading branch information
MrFreezeex authored and ldelossa committed May 7, 2024
1 parent e3346ed commit a66a0f1
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 20 deletions.
4 changes: 2 additions & 2 deletions pkg/clustermesh/endpointslicesync/cell.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ var Cell = cell.Module(
"EndpointSlice clustermesh synchronization in the Cilium operator",
cell.Config(ClusterMeshConfig{}),
cell.Provide(newClusterMesh),
// Invoke an empty function which takes a clusterMesh to force its construction.
cell.Invoke(func(*clusterMesh) {}),
// Invoke an empty function which takes a ClusterMesh to force its construction.
cell.Invoke(func(ClusterMesh) {}),

cell.Config(common.Config{}),

Expand Down
90 changes: 83 additions & 7 deletions pkg/clustermesh/endpointslicesync/clustermesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"net"
"sync/atomic"

"github.com/cilium/endpointslice-controller/endpointslice"
"github.com/cilium/hive/cell"
Expand Down Expand Up @@ -47,9 +48,34 @@ type clusterMesh struct {

endpointSliceMeshController *endpointslice.Controller
endpointSliceInformerFactory informers.SharedInformerFactory

started atomic.Bool
clusterAddHooks []func(string)
clusterDeleteHooks []func(string)
clusterServiceUpdateHooks []func(*serviceStore.ClusterService)
clusterServiceDeleteHooks []func(*serviceStore.ClusterService)
}

// ClusterMesh is the interface corresponding to the clusterMesh struct to expose
// its public methods to other Cilium packages.
type ClusterMesh interface {
// RegisterClusterAddHook register a hook when a cluster is added to the mesh.
// This should NOT be called after the Start hook.
RegisterClusterAddHook(clusterAddHook func(string))
// RegisterClusterDeleteHook register a hook when a cluster is removed from the mesh.
// This should NOT be called after the Start hook.
RegisterClusterDeleteHook(clusterDeleteHook func(string))
// RegisterClusterServiceUpdateHook register a hook when a service in the mesh is updated.
// This should NOT be called after the Start hook.
RegisterClusterServiceUpdateHook(clusterServiceUpdateHook func(*serviceStore.ClusterService))
// RegisterClusterServiceDeleteHook register a hook when a service in the mesh is deleted.
// This should NOT be called after the Start hook.
RegisterClusterServiceDeleteHook(clusterServiceDeleteHook func(*serviceStore.ClusterService))

GlobalServices() *common.GlobalServiceCache
}

func newClusterMesh(lc cell.Lifecycle, params clusterMeshParams) *clusterMesh {
func newClusterMesh(lc cell.Lifecycle, params clusterMeshParams) ClusterMesh {
if !params.Clientset.IsEnabled() || params.ClusterMeshConfig == "" || !params.Cfg.ClusterMeshEnableEndpointSync {
return nil
}
Expand All @@ -66,7 +92,11 @@ func newClusterMesh(lc cell.Lifecycle, params clusterMeshParams) *clusterMesh {
}
cm.context, cm.contextCancel = context.WithCancel(context.Background())
cm.meshPodInformer = newMeshPodInformer(cm.globalServices)
cm.RegisterClusterServiceUpdateHook(cm.meshPodInformer.onClusterServiceUpdate)
cm.RegisterClusterServiceDeleteHook(cm.meshPodInformer.onClusterServiceDelete)
cm.meshNodeInformer = newMeshNodeInformer()
cm.RegisterClusterAddHook(cm.meshNodeInformer.onClusterAdd)
cm.RegisterClusterDeleteHook(cm.meshNodeInformer.onClusterDelete)
cm.endpointSliceMeshController, cm.meshServiceInformer, cm.endpointSliceInformerFactory = newEndpointSliceMeshController(
cm.context, params.Cfg, cm.meshPodInformer,
cm.meshNodeInformer, params.Clientset,
Expand Down Expand Up @@ -124,19 +154,64 @@ func (cm *clusterMeshServiceGetter) GetServiceIP(svcID k8s.ServiceID) *loadbalan
return nil
}

// RegisterClusterAddHook register a hook when a cluster is added to the mesh.
// This should NOT be called after the Start hook.
func (cm *clusterMesh) RegisterClusterAddHook(clusterAddHook func(string)) {
if cm.started.Load() {
panic(fmt.Errorf("can't call RegisterClusterAddHook after the Start hook"))
}
cm.clusterAddHooks = append(cm.clusterAddHooks, clusterAddHook)
}

// RegisterClusterDeleteHook register a hook when a cluster is removed from the mesh.
// This should NOT be called after the Start hook.
func (cm *clusterMesh) RegisterClusterDeleteHook(clusterDeleteHook func(string)) {
if cm.started.Load() {
panic(fmt.Errorf("can't call RegisterClusterDeleteHook after the Start hook"))
}
cm.clusterDeleteHooks = append(cm.clusterDeleteHooks, clusterDeleteHook)
}

// RegisterClusterServiceUpdateHook register a hook when a service in the mesh is updated.
// This should NOT be called after the Start hook.
func (cm *clusterMesh) RegisterClusterServiceUpdateHook(clusterServiceUpdateHook func(*serviceStore.ClusterService)) {
if cm.started.Load() {
panic(fmt.Errorf("can't call RegisterClusterServiceUpdateHook after the Start hook"))
}
cm.clusterServiceUpdateHooks = append(cm.clusterServiceUpdateHooks, clusterServiceUpdateHook)
}

// RegisterClusterServiceDeleteHook register a hook when a service in the mesh is deleted.
// This should NOT be called after the Start hook.
func (cm *clusterMesh) RegisterClusterServiceDeleteHook(clusterServiceDeleteHook func(*serviceStore.ClusterService)) {
if cm.started.Load() {
panic(fmt.Errorf("can't call RegisterClusterServiceDeleteHook after the Start hook"))
}
cm.clusterServiceDeleteHooks = append(cm.clusterServiceDeleteHooks, clusterServiceDeleteHook)
}

func (cm *clusterMesh) GlobalServices() *common.GlobalServiceCache {
return cm.globalServices
}

func (cm *clusterMesh) newRemoteCluster(name string, status common.StatusFunc) common.RemoteCluster {
rc := &remoteCluster{
name: name,
meshNodeInformer: cm.meshNodeInformer,
globalServices: cm.globalServices,
storeFactory: cm.storeFactory,
synced: newSynced(),
name: name,
globalServices: cm.globalServices,
storeFactory: cm.storeFactory,
synced: newSynced(),
clusterAddHooks: cm.clusterAddHooks,
clusterDeleteHooks: cm.clusterDeleteHooks,
}

rc.remoteServices = cm.storeFactory.NewWatchStore(
name,
func() store.Key { return new(serviceStore.ClusterService) },
&remoteServiceObserver{globalServices: cm.globalServices, meshPodInformer: cm.meshPodInformer},
&remoteServiceObserver{
globalServices: cm.globalServices,
clusterServiceUpdateHooks: cm.clusterServiceUpdateHooks,
clusterServiceDeleteHooks: cm.clusterServiceDeleteHooks,
},
store.RWSWithOnSyncCallback(func(ctx context.Context) { rc.synced.services.Stop() }),
)

Expand All @@ -145,6 +220,7 @@ func (cm *clusterMesh) newRemoteCluster(name string, status common.StatusFunc) c

func (cm *clusterMesh) Start(startCtx cell.HookContext) error {
log.Info("Bootstrap clustermesh EndpointSlice controller")
cm.started.Store(true)

cm.endpointSliceInformerFactory.Start(cm.context.Done())
if err := cm.meshServiceInformer.Start(cm.context); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/clustermesh/endpointslicesync/endpointslice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func Test_meshEndpointSlice_Reconcile(t *testing.T) {
cache.WaitForCacheSync(context.Background().Done(), serviceInformer.HasSynced)

go controller.Run(context.Background(), 1)
nodeInformer.onAddCluster(remoteClusterName)
nodeInformer.onClusterAdd(remoteClusterName)

svcStore, _ := services.Store(context.Background())

Expand Down
4 changes: 2 additions & 2 deletions pkg/clustermesh/endpointslicesync/node_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (i *meshNodeInformer) Get(name string) (*v1.Node, error) {
return nil, newNotFoundError(fmt.Sprintf("node '%s' not found", name))
}

func (i *meshNodeInformer) onAddCluster(cluster string) {
func (i *meshNodeInformer) onClusterAdd(cluster string) {
i.mutex.Lock()
node := createDummyNode(cluster)
i.nodes[cluster] = node
Expand All @@ -87,7 +87,7 @@ func (i *meshNodeInformer) onAddCluster(cluster string) {
i.handler.OnAdd(node, false)
}

func (i *meshNodeInformer) onDeleteCluster(cluster string) {
func (i *meshNodeInformer) onClusterDelete(cluster string) {
i.mutex.Lock()
delete(i.nodes, cluster)
i.mutex.Unlock()
Expand Down
14 changes: 10 additions & 4 deletions pkg/clustermesh/endpointslicesync/remote_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ type remoteCluster struct {
// name is the name of the cluster
name string

meshNodeInformer *meshNodeInformer
globalServices *common.GlobalServiceCache
globalServices *common.GlobalServiceCache

// remoteServices is the shared store representing services in remote clusters
remoteServices store.WatchStore

storeFactory store.Factory

clusterAddHooks []func(string)
clusterDeleteHooks []func(string)

// synced tracks the initial synchronization with the remote cluster.
synced synced
}
Expand Down Expand Up @@ -57,7 +59,9 @@ func (rc *remoteCluster) Run(ctx context.Context, backend kvstore.BackendOperati
})

close(ready)
rc.meshNodeInformer.onAddCluster(rc.name)
for _, clusterAddHook := range rc.clusterAddHooks {
clusterAddHook(rc.name)
}
mgr.Run(ctx)
}

Expand All @@ -66,7 +70,9 @@ func (rc *remoteCluster) Stop() {
}

func (rc *remoteCluster) Remove() {
rc.meshNodeInformer.onDeleteCluster(rc.name)
for _, clusterDeleteHook := range rc.clusterDeleteHooks {
clusterDeleteHook(rc.name)
}
// Draining shall occur only when the configuration for the remote cluster
// is removed, and not in case the operator is shutting down, otherwise we
// would break existing connections on restart.
Expand Down
14 changes: 10 additions & 4 deletions pkg/clustermesh/endpointslicesync/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
)

type remoteServiceObserver struct {
globalServices *common.GlobalServiceCache
meshPodInformer *meshPodInformer
globalServices *common.GlobalServiceCache

clusterServiceUpdateHooks []func(*serviceStore.ClusterService)
clusterServiceDeleteHooks []func(*serviceStore.ClusterService)
}

// OnUpdate is called when a service in a remote cluster is updated
Expand All @@ -35,7 +37,9 @@ func (r *remoteServiceObserver) OnUpdate(key store.Key) {
}

r.globalServices.OnUpdate(svc)
r.meshPodInformer.onClusterServiceUpdate(svc)
for _, clusterServiceUpdateHook := range r.clusterServiceUpdateHooks {
clusterServiceUpdateHook(svc)
}
} else {
log.Warningf("Received unexpected remote service update object %+v", key)
}
Expand All @@ -53,7 +57,9 @@ func (r *remoteServiceObserver) OnDelete(key store.NamedKey) {
return
}

r.meshPodInformer.onClusterServiceDelete(svc)
for _, clusterServiceDeleteHook := range r.clusterServiceDeleteHooks {
clusterServiceDeleteHook(svc)
}
} else {
log.Warningf("Received unexpected remote service delete object %+v", key)
}
Expand Down

0 comments on commit a66a0f1

Please sign in to comment.