Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix endpoint slice filtering to ensure we get all the necessary objects #25351

Merged
merged 1 commit into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 13 additions & 7 deletions pkg/k8s/resource_ctors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func ServiceResource(lc hive.Lifecycle, cs client.Clientset, opts ...func(*metav
if !cs.IsEnabled() {
return nil, nil
}
optsModifier, err := utils.GetServiceListOptionsModifier(option.Config)
optsModifier, err := utils.GetServiceAndEndpointListOptionsModifier(option.Config)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -155,12 +155,16 @@ func EndpointsResource(lc hive.Lifecycle, cs client.Clientset) (resource.Resourc
if !cs.IsEnabled() {
return nil, nil
}
optsModifier, err := utils.GetServiceListOptionsModifier(option.Config)
endpointsOptsModifier, err := utils.GetServiceAndEndpointListOptionsModifier(option.Config)
if err != nil {
return nil, err
}

lw := &endpointsListerWatcher{cs: cs, optsModifier: optsModifier}
endpointSliceOpsModifier, err := utils.GetEndpointSliceListOptionsModifier()
if err != nil {
return nil, err
}
lw := &endpointsListerWatcher{cs: cs, endpointsOptsModifier: endpointsOptsModifier, endpointSlicesOptsModifier: endpointSliceOpsModifier}
return resource.New[*Endpoints](
lc,
lw,
Expand All @@ -173,9 +177,10 @@ func EndpointsResource(lc hive.Lifecycle, cs client.Clientset) (resource.Resourc
// performs the capability check on first call to List/Watch. This allows constructing
// the resource before the client has been started and capabilities have been probed.
type endpointsListerWatcher struct {
cs client.Clientset
optsModifier func(*metav1.ListOptions)
sourceObj k8sRuntime.Object
cs client.Clientset
endpointsOptsModifier func(*metav1.ListOptions)
endpointSlicesOptsModifier func(*metav1.ListOptions)
sourceObj k8sRuntime.Object

once sync.Once
cachedListerWatcher cache.ListerWatcher
Expand All @@ -202,14 +207,15 @@ func (lw *endpointsListerWatcher) getListerWatcher() cache.ListerWatcher {
)
lw.sourceObj = &slim_discoveryv1beta1.EndpointSlice{}
}
lw.cachedListerWatcher = utils.ListerWatcherWithModifier(lw.cachedListerWatcher, lw.endpointSlicesOptsModifier)
} else {
log.Info("Using v1.Endpoints")
lw.cachedListerWatcher = utils.ListerWatcherFromTyped[*slim_corev1.EndpointsList](
lw.cs.Slim().CoreV1().Endpoints(""),
)
lw.sourceObj = &slim_corev1.Endpoints{}
lw.cachedListerWatcher = utils.ListerWatcherWithModifier(lw.cachedListerWatcher, lw.endpointsOptsModifier)
}
lw.cachedListerWatcher = utils.ListerWatcherWithModifier(lw.cachedListerWatcher, lw.optsModifier)
})
return lw.cachedListerWatcher
}
Expand Down
34 changes: 28 additions & 6 deletions pkg/k8s/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,20 @@ func GetObjNamespaceName(obj NamespaceNameGetter) string {
return ns + "/" + obj.GetName()
}

// ServiceConfiguration is the required configuration for GetServiceListOptionsModifier
// ServiceConfiguration is the required configuration for GetServiceAndEndpointListOptionsModifier
type ServiceConfiguration interface {
// K8sServiceProxyNameValue must return the value of the proxy name
// annotation. If set, only services with this label will be handled.
K8sServiceProxyNameValue() string
}

// IngressConfiguration is the required configuration for GetServiceListOptionsModifier
// IngressConfiguration is the required configuration for GetServiceAndEndpointListOptionsModifier
type IngressConfiguration interface {
// K8sIngressControllerEnabled returns true if ingress controller feature is enabled in Cilium
K8sIngressControllerEnabled() bool
}

// GatewayAPIConfiguration is the required configuration for GetServiceListOptionsModifier
// GatewayAPIConfiguration is the required configuration for GetServiceAndEndpointListOptionsModifier
type GatewayAPIConfiguration interface {
// K8sGatewayAPIEnabled returns true if gateway API is enabled in Cilium
K8sGatewayAPIEnabled() bool
Expand All @@ -84,15 +84,37 @@ type PolicyConfiguration interface {
K8sNetworkPolicyEnabled() bool
}

// GetServiceListOptionsModifier returns the options modifier for service object list.
// GetEndpointSliceListOptionsModifier returns the options modifier for endpointSlice object list.
// This methods returns a ListOptions modifier which adds a label selector to
// select all endpointSlice objects that do not contain the k8s headless service label.
// This is the same behavior as kube-proxy.
// Given label mirroring from the service objects to endpoint slice objects were introduced in Kubernetes PR 94443,
// and released as part of Kubernetes v1.20; we can start using GetServiceAndEndpointListOptionsModifier for
// endpoint slices when dropping support for Kubernetes v1.19 and older. We can do that since the
// serviceProxyNameLabel label will then be mirrored to endpoint slices for services with that label.
func GetEndpointSliceListOptionsModifier() (func(options *v1meta.ListOptions), error) {
nonHeadlessServiceSelector, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
if err != nil {
return nil, err
}

labelSelector := labels.NewSelector()
labelSelector = labelSelector.Add(*nonHeadlessServiceSelector)

return func(options *v1meta.ListOptions) {
options.LabelSelector = labelSelector.String()
}, nil
}

// GetServiceAndEndpointListOptionsModifier returns the options modifier for service and endpoint object lists.
// This methods returns a ListOptions modifier which adds a label selector to only
// select services that are in context of Cilium.
// Like kube-proxy Cilium does not select services containing k8s headless service label.
// Like kube-proxy Cilium does not select services/endpoints containing k8s headless service label.
// We honor service.kubernetes.io/service-proxy-name label in the service object and only
// handle services that match our service proxy name. If the service proxy name for Cilium
// is an empty string, we assume that Cilium is the default service handler in which case
// we select all services that don't have the above mentioned label.
func GetServiceListOptionsModifier(cfg ServiceConfiguration) (func(options *v1meta.ListOptions), error) {
func GetServiceAndEndpointListOptionsModifier(cfg ServiceConfiguration) (func(options *v1meta.ListOptions), error) {
var (
serviceNameSelector, nonHeadlessServiceSelector *labels.Requirement
err error
Expand Down
55 changes: 53 additions & 2 deletions pkg/k8s/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"

corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"

Expand Down Expand Up @@ -52,7 +53,7 @@ func TestServiceProxyName(t *testing.T) {

// Should return only test-svc-1 which has the service-proxy-name=foo
cfg := &fakeCfg{proxyName: "foo"}
optMod, _ := GetServiceListOptionsModifier(cfg)
optMod, _ := GetServiceAndEndpointListOptionsModifier(cfg)
options := metav1.ListOptions{}
optMod(&options)
svcs, err := client.CoreV1().Services("test-ns").List(context.TODO(), options)
Expand All @@ -65,7 +66,7 @@ func TestServiceProxyName(t *testing.T) {

// Should return only test-svc-3 which doesn't have any service-proxy-name
cfg = &fakeCfg{proxyName: ""}
optMod, _ = GetServiceListOptionsModifier(cfg)
optMod, _ = GetServiceAndEndpointListOptionsModifier(cfg)
options = metav1.ListOptions{}
optMod(&options)
svcs, err = client.CoreV1().Services("test-ns").List(context.TODO(), options)
Expand All @@ -77,6 +78,56 @@ func TestServiceProxyName(t *testing.T) {
}
}

func TestServiceEndpointsAndSlices(t *testing.T) {
client := fake.NewSimpleClientset()
meta1 := &metav1.ObjectMeta{
Name: "test-svc-1",
Labels: map[string]string{},
}
meta2 := &metav1.ObjectMeta{
Name: "test-svc-2",
Labels: map[string]string{
corev1.IsHeadlessService: "",
},
}
for _, meta := range []*metav1.ObjectMeta{meta1, meta2} {
ep := &corev1.Endpoints{ObjectMeta: *meta}
_, err := client.CoreV1().Endpoints("test-ns").Create(context.TODO(), ep, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create endpoint %v: %s", ep, err)
}
epSlice := &discoveryv1.EndpointSlice{ObjectMeta: *meta}
_, err = client.DiscoveryV1().EndpointSlices("test-ns").Create(context.TODO(), epSlice, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create endpoint slice %v: %s", ep, err)
}
}

// Should return only test-svc-1, since test-svc-2 is headless
cfg := &fakeCfg{}
optMod, _ := GetServiceAndEndpointListOptionsModifier(cfg)
options := metav1.ListOptions{}
optMod(&options)
eps, err := client.CoreV1().Endpoints("test-ns").List(context.TODO(), options)
if err != nil {
t.Fatalf("Failed to list services: %s", err)
}
if len(eps.Items) != 1 || eps.Items[0].ObjectMeta.Name != "test-svc-1" {
t.Fatalf("Expected test-svc-1, retrieved: %v", eps)
}

optMod, _ = GetEndpointSliceListOptionsModifier()
options = metav1.ListOptions{}
optMod(&options)
epSlices, err := client.DiscoveryV1().EndpointSlices("test-ns").List(context.TODO(), options)
if err != nil {
t.Fatalf("Failed to list services: %s", err)
}
if len(epSlices.Items) != 1 || epSlices.Items[0].ObjectMeta.Name != "test-svc-1" {
t.Fatalf("Expected test-svc-1, retrieved: %v", epSlices)
}
}

func TestValidIPs(t *testing.T) {
tests := []struct {
name string
Expand Down