Skip to content

Commit

Permalink
clustermesh: synchronize headless services too
Browse files Browse the repository at this point in the history
Now that we synchronize EndpointSlice we also need to synchronize
headless services so that we can synchronize their EndpointSlice too.

We implement this by removing the exclusion on regular Endpoints/Service
resources of headless Services and adding it back inside the daemon.

As a result clustermesh-api-server and the operator will now watch
headless Services/Endpoints as well.

Signed-off-by: Arthur Outhenin-Chalandre <arthur@cri.epita.fr>
  • Loading branch information
MrFreezeex committed Mar 21, 2024
1 parent 32e1ad7 commit 142c20f
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 33 deletions.
62 changes: 60 additions & 2 deletions daemon/k8s/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
package k8s

import (
"fmt"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"

"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/k8s"
Expand All @@ -31,6 +36,7 @@ var (

cell.Config(k8s.DefaultConfig),
LocalNodeCell,
ServiceNonHeadlessCell,
cell.Provide(
k8s.ServiceResource,
k8s.EndpointsResource,
Expand Down Expand Up @@ -78,6 +84,50 @@ var (
},
),
)

ServiceNonHeadlessCell = cell.Module(
"k8s-service-non-headless",
"Agent Kubernetes non headless service resources",

cell.Provide(
func(lc cell.Lifecycle, cfg k8s.Config, cs client.Clientset) (ServiceNonHeadless, error) {
return k8s.ServiceResource(
lc, cfg, cs,
func(opts *metav1.ListOptions) {
nonHeadlessServiceSelector, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
if err != nil {
panic(fmt.Sprintf("can't create headless service requirement: %s", err))
}

labelSelector, err := labels.Parse(opts.LabelSelector)
if err != nil {
panic(fmt.Sprintf("can't parse existing service label selector: %s", err))
}
labelSelector = labelSelector.Add(*nonHeadlessServiceSelector)
opts.LabelSelector = labelSelector.String()
},
)
},
func(lc cell.Lifecycle, cfg k8s.Config, cs client.Clientset) (EndpointsNonHeadless, error) {
return k8s.EndpointsResource(
lc, cfg, cs,
func(opts *metav1.ListOptions) {
nonHeadlessServiceSelector, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
if err != nil {
panic(fmt.Sprintf("can't create headless service requirement: %s", err))
}

labelSelector, err := labels.Parse(opts.LabelSelector)
if err != nil {
panic(fmt.Sprintf("can't parse existing endpoints label selector: %s", err))
}
labelSelector = labelSelector.Add(*nonHeadlessServiceSelector)
opts.LabelSelector = labelSelector.String()
},
)
},
),
)
)

// LocalNodeResource is a resource.Resource[*slim_corev1.Node] but one which will only stream updates for the node object
Expand All @@ -92,12 +142,20 @@ type LocalCiliumNodeResource resource.Resource[*cilium_api_v2.CiliumNode]
// objects scheduled on the node we are currently running on.
type LocalPodResource resource.Resource[*slim_corev1.Pod]

// ServiceNonHeadless is a resource.Resource[*slim_corev1.Service] but one which will only stream updates for
// non headless Services.
type ServiceNonHeadless resource.Resource[*slim_corev1.Service]

// EndpointsNonHeadless is a resource.Resource[*slim_corev1.Service] but one which will only stream updates for
// Endpoints from non headless Services.
type EndpointsNonHeadless resource.Resource[*k8s.Endpoints]

// Resources is a convenience struct to group all the agent k8s resources as cell constructor parameters.
type Resources struct {
cell.In

Services resource.Resource[*slim_corev1.Service]
Endpoints resource.Resource[*k8s.Endpoints]
Services ServiceNonHeadless
Endpoints EndpointsNonHeadless
LocalNode LocalNodeResource
LocalCiliumNode LocalCiliumNodeResource
LocalPods LocalPodResource
Expand Down
27 changes: 14 additions & 13 deletions pkg/k8s/resource_ctors.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func CiliumBGPPeerConfigResource(lc cell.Lifecycle, cs client.Clientset, opts ..
return resource.New[*cilium_api_v2alpha1.CiliumBGPPeerConfig](lc, lw, resource.WithMetric("CiliumBGPPeerConfig")), nil
}

func EndpointsResource(lc cell.Lifecycle, cfg Config, cs client.Clientset) (resource.Resource[*Endpoints], error) {
func EndpointsResource(lc cell.Lifecycle, cfg Config, cs client.Clientset, opts ...func(*metav1.ListOptions)) (resource.Resource[*Endpoints], error) {
if !cs.IsEnabled() {
return nil, nil
}
Expand All @@ -248,15 +248,16 @@ func EndpointsResource(lc cell.Lifecycle, cfg Config, cs client.Clientset) (reso
return nil, err
}

endpointSliceOpsModifier, err := utils.GetEndpointSliceListOptionsModifier()
endpointSliceOptsModifier, err := utils.GetEndpointSliceListOptionsModifier()
if err != nil {
return nil, err
}

lw := &endpointsListerWatcher{
cs: cs,
enableK8sEndpointSlice: cfg.EnableK8sEndpointSlice,
endpointsOptsModifier: endpointsOptsModifier,
endpointSlicesOptsModifier: endpointSliceOpsModifier,
cs: cs,
enableK8sEndpointSlice: cfg.EnableK8sEndpointSlice,
endpointsOptsModifiers: append(opts, endpointsOptsModifier),
endpointSlicesOptsModifiers: append(opts, endpointSliceOptsModifier),
}
return resource.New[*Endpoints](
lc,
Expand All @@ -271,11 +272,11 @@ func EndpointsResource(lc cell.Lifecycle, cfg Config, cs client.Clientset) (reso
// performs the capability check on first call to List/Watch. This allows constructing
// the resource before the client has been started and capabilities have been probed.
type endpointsListerWatcher struct {
cs client.Clientset
enableK8sEndpointSlice bool
endpointsOptsModifier func(*metav1.ListOptions)
endpointSlicesOptsModifier func(*metav1.ListOptions)
sourceObj k8sRuntime.Object
cs client.Clientset
enableK8sEndpointSlice bool
endpointsOptsModifiers []func(*metav1.ListOptions)
endpointSlicesOptsModifiers []func(*metav1.ListOptions)
sourceObj k8sRuntime.Object

once sync.Once
cachedListerWatcher cache.ListerWatcher
Expand All @@ -302,14 +303,14 @@ func (lw *endpointsListerWatcher) getListerWatcher() cache.ListerWatcher {
)
lw.sourceObj = &slim_discoveryv1beta1.EndpointSlice{}
}
lw.cachedListerWatcher = utils.ListerWatcherWithModifier(lw.cachedListerWatcher, lw.endpointSlicesOptsModifier)
lw.cachedListerWatcher = utils.ListerWatcherWithModifiers(lw.cachedListerWatcher, lw.endpointSlicesOptsModifiers...)
} else {
log.Info("Using v1.Endpoints")
lw.cachedListerWatcher = utils.ListerWatcherFromTyped[*slim_corev1.EndpointsList](
lw.cs.Slim().CoreV1().Endpoints(""),
)
lw.sourceObj = &slim_corev1.Endpoints{}
lw.cachedListerWatcher = utils.ListerWatcherWithModifier(lw.cachedListerWatcher, lw.endpointsOptsModifier)
lw.cachedListerWatcher = utils.ListerWatcherWithModifiers(lw.cachedListerWatcher, lw.endpointsOptsModifiers...)
}
})
return lw.cachedListerWatcher
Expand Down
25 changes: 7 additions & 18 deletions pkg/k8s/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,22 @@ type PolicyConfiguration interface {

// GetEndpointSliceListOptionsModifier returns the options modifier for endpointSlice object list.
// This methods returns a ListOptions modifier which adds a label selector to
// select all endpointSlice objects that do not contain the k8s headless service label and that they are not from
// remote clusters in Cilium cluster mesh.
// This is mostly the same behavior as kube-proxy (except the cluster mesh behavior which is
// tied to how Cilium internally works with clustermesh endpoints).
// select all endpointSlice objects they are not from remote clusters in Cilium cluster mesh.
// This is mostly the same behavior as kube-proxy except the cluster mesh behavior which is
// tied to how Cilium internally works with clustermesh endpoints and that this function also doesn't ignore headless Services.
// Given label mirroring from the service objects to endpoint slice objects were introduced in Kubernetes PR 94443,
// and released as part of Kubernetes v1.20; we can start using GetServiceAndEndpointListOptionsModifier for
// endpoint slices when dropping support for Kubernetes v1.19 and older. We can do that since the
// serviceProxyNameLabel label will then be mirrored to endpoint slices for services with that label.
// We also ignore Kubernetes endpoints coming from other clusters in the Cilium clustermesh here as
// Cilium does not rely on mirrored Kubernetes EndpointSlice for any of its functionalities.
func GetEndpointSliceListOptionsModifier() (func(options *v1meta.ListOptions), error) {
nonHeadlessServiceSelector, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
if err != nil {
return nil, err
}
nonRemoteEndpointSelector, err := labels.NewRequirement(discoveryv1.LabelManagedBy, selection.NotEquals, []string{EndpointSliceMeshControllerName})
if err != nil {
return nil, err
}

labelSelector := labels.NewSelector()
labelSelector = labelSelector.Add(*nonHeadlessServiceSelector)
labelSelector = labelSelector.Add(*nonRemoteEndpointSelector)

return func(options *v1meta.ListOptions) {
Expand All @@ -106,22 +100,17 @@ func GetEndpointSliceListOptionsModifier() (func(options *v1meta.ListOptions), e
// GetServiceAndEndpointListOptionsModifier returns the options modifier for service and endpoint object lists.
// This methods returns a ListOptions modifier which adds a label selector to only
// select services that are in context of Cilium.
// Like kube-proxy Cilium does not select services/endpoints containing k8s headless service label.
// Unlike kube-proxy Cilium does not select services/endpoints containing k8s headless service label.
// We honor service.kubernetes.io/service-proxy-name label in the service object and only
// handle services that match our service proxy name. If the service proxy name for Cilium
// is an empty string, we assume that Cilium is the default service handler in which case
// we select all services that don't have the above mentioned label.
func GetServiceAndEndpointListOptionsModifier(k8sServiceProxy string) (func(options *v1meta.ListOptions), error) {
var (
serviceNameSelector, nonHeadlessServiceSelector *labels.Requirement
err error
serviceNameSelector *labels.Requirement
err error
)

nonHeadlessServiceSelector, err = labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
if err != nil {
return nil, err
}

if k8sServiceProxy == "" {
serviceNameSelector, err = labels.NewRequirement(
serviceProxyNameLabel, selection.DoesNotExist, nil)
Expand All @@ -135,7 +124,7 @@ func GetServiceAndEndpointListOptionsModifier(k8sServiceProxy string) (func(opti
}

labelSelector := labels.NewSelector()
labelSelector = labelSelector.Add(*serviceNameSelector, *nonHeadlessServiceSelector)
labelSelector = labelSelector.Add(*serviceNameSelector)

return func(options *v1meta.ListOptions) {
options.LabelSelector = labelSelector.String()
Expand Down

0 comments on commit 142c20f

Please sign in to comment.