Skip to content

Commit

Permalink
agent, operator, clustermesh-apiserver: use Resource[*Endpoints]
Browse files Browse the repository at this point in the history
- Remove endpoint and endpoint_slice watchers
- Add "endpoints watcher" adapted from endpoint_slice watcher that uses
  Resource[*Endpoints] instead of informer. This keeps things the same
  downstream.
- Adapts BGP speaker to only handle *k8s.Endpoints
- Removes endpoint/endpointslice informers from operator and use resource instead

Closes: #23734
Signed-off-by: Jussi Maki <jussi@isovalent.com>
  • Loading branch information
joamaki authored and borkmann committed Jun 5, 2023
1 parent ca3a4df commit 82a728a
Show file tree
Hide file tree
Showing 20 changed files with 338 additions and 960 deletions.
1 change: 1 addition & 0 deletions clustermesh-apiserver/k8s/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (

cell.Provide(
k8s.ServiceResource,
k8s.EndpointsResource,
),
)
)
23 changes: 14 additions & 9 deletions clustermesh-apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,21 @@ func init() {
}),

gops.Cell(defaults.GopsPortApiserver),

k8sClient.Cell,
apiserverK8s.ResourcesCell,

cell.Provide(func() *option.DaemonConfig {
return option.Config
}),

kvstore.Cell(kvstore.EtcdBackendName),
cell.Provide(func() *kvstore.ExtraOptions { return nil }),
heartbeat.Cell,

healthAPIServerCell,
cmmetrics.Cell,
usersManagementCell,

cell.Invoke(registerHooks),
)
rootHive.RegisterFlags(rootCmd.Flags())
Expand All @@ -149,6 +153,7 @@ type parameters struct {

Clientset k8sClient.Clientset
Services resource.Resource[*slim_corev1.Service]
Endpoints resource.Resource[*k8s.Endpoints]
BackendPromise promise.Promise[kvstore.BackendOperations]
}

Expand All @@ -165,7 +170,7 @@ func registerHooks(lc hive.Lifecycle, params parameters) error {
return err
}

startServer(ctx, params.Clientset, params.Services)
startServer(ctx, params.Clientset, params.Services, params.Endpoints)
return nil
},
})
Expand Down Expand Up @@ -222,7 +227,7 @@ func readMockFile(path string) error {
if err != nil {
log.WithError(err).WithField("line", line).Warning("Unable to unmarshal Endpoints")
} else {
operatorWatchers.K8sSvcCache.UpdateEndpoints(&endpoints, nil)
operatorWatchers.K8sSvcCache.UpdateEndpoints(k8s.ParseEndpoints(&endpoints), nil)
}
default:
log.Warningf("Unknown line in mockfile %s: %s", path, line)
Expand Down Expand Up @@ -578,7 +583,7 @@ func synchronizeCiliumEndpoints(clientset k8sClient.Clientset) {
go ciliumEndpointsInformer.Run(wait.NeverStop)
}

func startServer(startCtx hive.HookContext, clientset k8sClient.Clientset, services resource.Resource[*slim_corev1.Service]) {
func startServer(startCtx hive.HookContext, clientset k8sClient.Clientset, services resource.Resource[*slim_corev1.Service], endpoints resource.Resource[*k8s.Endpoints]) {
log.WithFields(logrus.Fields{
"cluster-name": cfg.clusterName,
"cluster-id": cfg.clusterID,
Expand Down Expand Up @@ -631,11 +636,11 @@ func startServer(startCtx hive.HookContext, clientset k8sClient.Clientset, servi
synchronizeCiliumEndpoints(clientset)
operatorWatchers.StartSynchronizingServices(context.Background(), &sync.WaitGroup{}, operatorWatchers.ServiceSyncParameters{
ServiceSyncConfiguration: cfg,

Clientset: clientset,
Services: services,
Backend: backend,
SharedOnly: !cfg.enableExternalWorkloads,
Clientset: clientset,
Services: services,
Endpoints: endpoints,
Backend: backend,
SharedOnly: !cfg.enableExternalWorkloads,
})
}

Expand Down
4 changes: 1 addition & 3 deletions daemon/cmd/daemon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1550,9 +1550,7 @@ func (d *Daemon) initKVStore() {
// that service IP.
d.k8sWatcher.WaitForCacheSync(
resources.K8sAPIGroupServiceV1Core,
resources.K8sAPIGroupEndpointV1Core,
resources.K8sAPIGroupEndpointSliceV1Discovery,
resources.K8sAPIGroupEndpointSliceV1Beta1Discovery,
resources.K8sAPIGroupEndpointSliceOrEndpoint,
)
log := log.WithField(logfields.LogSubsys, "etcd")
goopts.DialOption = []grpc.DialOption{
Expand Down
1 change: 1 addition & 0 deletions operator/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ func (legacy *legacyOnLeader) onStart(_ hive.HookContext) error {

Clientset: legacy.clientset,
Services: legacy.resources.Services,
Endpoints: legacy.resources.Endpoints,
SharedOnly: true,
})
// If K8s is enabled we can do the service translation automagically by
Expand Down
2 changes: 2 additions & 0 deletions operator/k8s/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var (

cell.Provide(
k8s.ServiceResource,
k8s.EndpointsResource,
k8s.LBIPPoolsResource,
k8s.CiliumIdentityResource,
),
Expand All @@ -35,6 +36,7 @@ type Resources struct {
cell.In

Services resource.Resource[*slim_corev1.Service]
Endpoints resource.Resource[*k8s.Endpoints]
LBIPPools resource.Resource[*cilium_api_v2alpha1.CiliumLoadBalancerIPPool]
Identities resource.Resource[*cilium_api_v2.CiliumIdentity]
}

0 comments on commit 82a728a

Please sign in to comment.