Skip to content

Commit

Permalink
Fix endpoint slice filtering when talking to Kubernetes
Browse files Browse the repository at this point in the history
This fixes the filtering of endpoint slices to ensure that we support
all the k8s versions we intend to. This ensures that we always filter
out endpoint slices with the well-known "headless" label, and
_do not_ filter out any endpoint slices based on the service proxy
label.

In pre Kubernetes v1.20, the labels on a service were not mirrored into
the labels of the endpoint slice. The headless label was not applied.
See PR 94443 in kubernetes/kubernetes for more info.

When no longer supporting Kubernetes v1.20, we can remove this custom
logic - and use the same label filter for endpoints, services and endpoint
slices.

Historically, we had no filters on the endpoint slice objects, but with
the two referred commits, the same filter we had for endpoints and
services was introduced to endpoint slices as part of the refactor. The
reason we don't revert the behavior directly, is that we _do want_ to filter
out endpoint slices for headless services, like we do with normal endpoints.

For completeness; the end user behavior will now be equal for both
endpoints and endpoint slices; since we will always filter the services
in the same way, and when we get an endpoint slice without a
corresponding service in state, we effectively ignore that endpoint slice.

Fixes: ca3a4df ("k8s: Add Resource[*Endpoints] to shared resources")
Fixes: 82a728a ("agent, operator, clustermesh-apiserver: use Resource[*Endpoints]")
Signed-off-by: Odin Ugedal <ougedal@palantir.com>
Signed-off-by: Odin Ugedal <odin@uged.al>
  • Loading branch information
odinuge authored and julianwiedmann committed Jul 12, 2023
1 parent c25b96d commit 0790d0f
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 15 deletions.
20 changes: 13 additions & 7 deletions pkg/k8s/resource_ctors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func ServiceResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*metav
if !cs.IsEnabled() {
return nil, nil
}
optsModifier, err := utils.GetServiceListOptionsModifier(option.Config)
optsModifier, err := utils.GetServiceAndEndpointListOptionsModifier(option.Config)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -155,12 +155,16 @@ func EndpointsResource(lc hive.Lifecycle, cs client.Clientset) (resource.Resourc
if !cs.IsEnabled() {
return nil, nil
}
optsModifier, err := utils.GetServiceListOptionsModifier(option.Config)
endpointsOptsModifier, err := utils.GetServiceAndEndpointListOptionsModifier(option.Config)
if err != nil {
return nil, err
}

lw := &endpointsListerWatcher{cs: cs, optsModifier: optsModifier}
endpointSliceOpsModifier, err := utils.GetEndpointSliceListOptionsModifier()
if err != nil {
return nil, err
}
lw := &endpointsListerWatcher{cs: cs, endpointsOptsModifier: endpointsOptsModifier, endpointSlicesOptsModifier: endpointSliceOpsModifier}
return resource.New[*Endpoints](
lc,
lw,
Expand All @@ -173,9 +177,10 @@ func EndpointsResource(lc hive.Lifecycle, cs client.Clientset) (resource.Resourc
// performs the capability check on first call to List/Watch. This allows constructing
// the resource before the client has been started and capabilities have been probed.
type endpointsListerWatcher struct {
cs client.Clientset
optsModifier func(*metav1.ListOptions)
sourceObj k8sRuntime.Object
cs client.Clientset
endpointsOptsModifier func(*metav1.ListOptions)
endpointSlicesOptsModifier func(*metav1.ListOptions)
sourceObj k8sRuntime.Object

once sync.Once
cachedListerWatcher cache.ListerWatcher
Expand All @@ -202,14 +207,15 @@ func (lw *endpointsListerWatcher) getListerWatcher() cache.ListerWatcher {
)
lw.sourceObj = &slim_discoveryv1beta1.EndpointSlice{}
}
lw.cachedListerWatcher = utils.ListerWatcherWithModifier(lw.cachedListerWatcher, lw.endpointSlicesOptsModifier)
} else {
log.Info("Using v1.Endpoints")
lw.cachedListerWatcher = utils.ListerWatcherFromTyped[*slim_corev1.EndpointsList](
lw.cs.Slim().CoreV1().Endpoints(""),
)
lw.sourceObj = &slim_corev1.Endpoints{}
lw.cachedListerWatcher = utils.ListerWatcherWithModifier(lw.cachedListerWatcher, lw.endpointsOptsModifier)
}
lw.cachedListerWatcher = utils.ListerWatcherWithModifier(lw.cachedListerWatcher, lw.optsModifier)
})
return lw.cachedListerWatcher
}
Expand Down
34 changes: 28 additions & 6 deletions pkg/k8s/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,20 @@ func GetObjNamespaceName(obj NamespaceNameGetter) string {
return ns + "/" + obj.GetName()
}

// ServiceConfiguration is the required configuration for GetServiceListOptionsModifier
// ServiceConfiguration is the required configuration for GetServiceAndEndpointListOptionsModifier
type ServiceConfiguration interface {
// K8sServiceProxyNameValue must return the value of the proxy name
// annotation. If set, only services with this label will be handled.
K8sServiceProxyNameValue() string
}

// IngressConfiguration is the required configuration for GetServiceListOptionsModifier
// IngressConfiguration is the required configuration for GetServiceAndEndpointListOptionsModifier
type IngressConfiguration interface {
// K8sIngressControllerEnabled returns true if ingress controller feature is enabled in Cilium
K8sIngressControllerEnabled() bool
}

// GatewayAPIConfiguration is the required configuration for GetServiceListOptionsModifier
// GatewayAPIConfiguration is the required configuration for GetServiceAndEndpointListOptionsModifier
type GatewayAPIConfiguration interface {
// K8sGatewayAPIEnabled returns true if gateway API is enabled in Cilium
K8sGatewayAPIEnabled() bool
Expand All @@ -84,15 +84,37 @@ type PolicyConfiguration interface {
K8sNetworkPolicyEnabled() bool
}

// GetServiceListOptionsModifier returns the options modifier for service object list.
// GetEndpointSliceListOptionsModifier returns the options modifier for endpointSlice object list.
// This methods returns a ListOptions modifier which adds a label selector to
// select all endpointSlice objects that do not contain the k8s headless service label.
// This is the same behavior as kube-proxy.
// Given label mirroring from the service objects to endpoint slice objects were introduced in Kubernetes PR 94443,
// and released as part of Kubernetes v1.20; we can start using GetServiceAndEndpointListOptionsModifier for
// endpoint slices when dropping support for Kubernetes v1.19 and older. We can do that since the
// serviceProxyNameLabel label will then be mirrored to endpoint slices for services with that label.
func GetEndpointSliceListOptionsModifier() (func(options *v1meta.ListOptions), error) {
nonHeadlessServiceSelector, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
if err != nil {
return nil, err
}

labelSelector := labels.NewSelector()
labelSelector = labelSelector.Add(*nonHeadlessServiceSelector)

return func(options *v1meta.ListOptions) {
options.LabelSelector = labelSelector.String()
}, nil
}

// GetServiceAndEndpointListOptionsModifier returns the options modifier for service and endpoint object lists.
// This methods returns a ListOptions modifier which adds a label selector to only
// select services that are in context of Cilium.
// Like kube-proxy Cilium does not select services containing k8s headless service label.
// Like kube-proxy Cilium does not select services/endpoints containing k8s headless service label.
// We honor service.kubernetes.io/service-proxy-name label in the service object and only
// handle services that match our service proxy name. If the service proxy name for Cilium
// is an empty string, we assume that Cilium is the default service handler in which case
// we select all services that don't have the above mentioned label.
func GetServiceListOptionsModifier(cfg ServiceConfiguration) (func(options *v1meta.ListOptions), error) {
func GetServiceAndEndpointListOptionsModifier(cfg ServiceConfiguration) (func(options *v1meta.ListOptions), error) {
var (
serviceNameSelector, nonHeadlessServiceSelector *labels.Requirement
err error
Expand Down
55 changes: 53 additions & 2 deletions pkg/k8s/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"

corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"

Expand Down Expand Up @@ -52,7 +53,7 @@ func TestServiceProxyName(t *testing.T) {

// Should return only test-svc-1 which has the service-proxy-name=foo
cfg := &fakeCfg{proxyName: "foo"}
optMod, _ := GetServiceListOptionsModifier(cfg)
optMod, _ := GetServiceAndEndpointListOptionsModifier(cfg)
options := metav1.ListOptions{}
optMod(&options)
svcs, err := client.CoreV1().Services("test-ns").List(context.TODO(), options)
Expand All @@ -65,7 +66,7 @@ func TestServiceProxyName(t *testing.T) {

// Should return only test-svc-3 which doesn't have any service-proxy-name
cfg = &fakeCfg{proxyName: ""}
optMod, _ = GetServiceListOptionsModifier(cfg)
optMod, _ = GetServiceAndEndpointListOptionsModifier(cfg)
options = metav1.ListOptions{}
optMod(&options)
svcs, err = client.CoreV1().Services("test-ns").List(context.TODO(), options)
Expand All @@ -77,6 +78,56 @@ func TestServiceProxyName(t *testing.T) {
}
}

func TestServiceEndpointsAndSlices(t *testing.T) {
client := fake.NewSimpleClientset()
meta1 := &metav1.ObjectMeta{
Name: "test-svc-1",
Labels: map[string]string{},
}
meta2 := &metav1.ObjectMeta{
Name: "test-svc-2",
Labels: map[string]string{
corev1.IsHeadlessService: "",
},
}
for _, meta := range []*metav1.ObjectMeta{meta1, meta2} {
ep := &corev1.Endpoints{ObjectMeta: *meta}
_, err := client.CoreV1().Endpoints("test-ns").Create(context.TODO(), ep, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create endpoint %v: %s", ep, err)
}
epSlice := &discoveryv1.EndpointSlice{ObjectMeta: *meta}
_, err = client.DiscoveryV1().EndpointSlices("test-ns").Create(context.TODO(), epSlice, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create endpoint slice %v: %s", ep, err)
}
}

// Should return only test-svc-1, since test-svc-2 is headless
cfg := &fakeCfg{}
optMod, _ := GetServiceAndEndpointListOptionsModifier(cfg)
options := metav1.ListOptions{}
optMod(&options)
eps, err := client.CoreV1().Endpoints("test-ns").List(context.TODO(), options)
if err != nil {
t.Fatalf("Failed to list services: %s", err)
}
if len(eps.Items) != 1 || eps.Items[0].ObjectMeta.Name != "test-svc-1" {
t.Fatalf("Expected test-svc-1, retrieved: %v", eps)
}

optMod, _ = GetEndpointSliceListOptionsModifier()
options = metav1.ListOptions{}
optMod(&options)
epSlices, err := client.DiscoveryV1().EndpointSlices("test-ns").List(context.TODO(), options)
if err != nil {
t.Fatalf("Failed to list services: %s", err)
}
if len(epSlices.Items) != 1 || epSlices.Items[0].ObjectMeta.Name != "test-svc-1" {
t.Fatalf("Expected test-svc-1, retrieved: %v", epSlices)
}
}

func TestValidIPs(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit 0790d0f

Please sign in to comment.