Skip to content

Commit

Permalink
Fix endpointslice map
Browse files Browse the repository at this point in the history
  • Loading branch information
jukie committed Mar 17, 2024
1 parent 5dcb453 commit 09506ba
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 32 deletions.
96 changes: 68 additions & 28 deletions control-plane/catalog/to-consul/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,11 @@ type ServiceResource struct {
// in the form <kube namespace>/<kube svc name>.
serviceMap map[string]*corev1.Service

// endpointSlicesMap uses the same keys as serviceMap but maps to a list of EndpointSlices
// of each service.
endpointSlicesMap map[string][]discoveryv1.EndpointSlice
// endpointSlicesMap tracks EndpointSlices associated with services that are being synced to Consul.
// The outer map's keys represent service identifiers in the same format as serviceMap and maps
// each service to its related EndpointSlices. The inner map's keys are EndpointSlice name keys
// the format "<kube namespace>/<kube endpointslice name>".
endpointSlicesMap map[string]map[string]discoveryv1.EndpointSlice

// EnableIngress enables syncing of the hostname from an Ingress resource
// to the service registration if an Ingress rule matches the service.
Expand Down Expand Up @@ -230,8 +232,7 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error {

// If we care about endpoints, we should load the associated endpoint slices.
if t.shouldTrackEndpoints(key) {

var allEndpointSlices []discoveryv1.EndpointSlice
allEndpointSlices := make(map[string]discoveryv1.EndpointSlice)
labelSelector := fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, service.Name)
continueToken := ""
limit := int64(100)
Expand All @@ -250,10 +251,14 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error {
t.Log.Warn("error loading endpoint slices list",
"key", key,
"err", err)
return err
break

}

allEndpointSlices = append(allEndpointSlices, endpointSliceList.Items...)
for _, endpointSlice := range endpointSliceList.Items {
endptKey := service.Namespace + "/" + endpointSlice.Name
allEndpointSlices[endptKey] = endpointSlice
}

if endpointSliceList.Continue != "" {
continueToken = endpointSliceList.Continue
Expand All @@ -263,7 +268,7 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error {
}

if t.endpointSlicesMap == nil {
t.endpointSlicesMap = make(map[string][]discoveryv1.EndpointSlice)
t.endpointSlicesMap = make(map[string]map[string]discoveryv1.EndpointSlice)
}
t.endpointSlicesMap[key] = allEndpointSlices
t.Log.Debug("[ServiceResource.Upsert] adding service's endpoint slices to endpointSlicesMap", "key", key, "service", service, "endpointSlices", allEndpointSlices)
Expand Down Expand Up @@ -627,7 +632,7 @@ func (t *ServiceResource) generateRegistrations(key string) {
// Look up the node's ip address by getting node info
node, err := t.Client.CoreV1().Nodes().Get(t.Ctx, *endpoint.NodeName, metav1.GetOptions{})
if err != nil {
t.Log.Warn("error getting node info", "error", err)
t.Log.Error("error getting node info", "error", err)
continue
}

Expand Down Expand Up @@ -739,8 +744,7 @@ func (t *ServiceResource) registerServiceInstance(
// Used for adding node region label to an endpoint but only needs to called once
node, err := t.Client.CoreV1().Nodes().Get(t.Ctx, *endpoint.NodeName, metav1.GetOptions{})
if err != nil {
t.Log.Warn("error getting node info", "error", err)
continue
t.Log.Error("error getting node info", "error", err)
}

for _, endpointAddr := range endpoint.Addresses {
Expand Down Expand Up @@ -789,12 +793,15 @@ func (t *ServiceResource) registerServiceInstance(
if endpoint.NodeName != nil {
r.Service.Meta[ConsulK8SNodeName] = *endpoint.NodeName
}
if node.Labels != nil {
if region := node.Labels[corev1.LabelTopologyRegion]; region != "" {
r.Service.Meta[ConsulK8STopologyRegion] = region
}
}

if endpoint.Zone != nil {
r.Service.Meta[ConsulK8STopologyZone] = *endpoint.Zone
}
if region := node.Labels[corev1.LabelTopologyRegion]; region != "" {
r.Service.Meta[ConsulK8STopologyRegion] = region
}

r.Check = &consulapi.AgentCheck{
CheckID: consulHealthCheckID(endpointSlice.Namespace, serviceID(r.Service.Service, addr)),
Expand Down Expand Up @@ -873,51 +880,84 @@ func (t *serviceEndpointsResource) Informer() cache.SharedIndexInformer {
)
}

func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error {
func (t *serviceEndpointsResource) Upsert(endptKey string, raw interface{}) error {
svc := t.Service

endpointSlice, ok := raw.(*discoveryv1.EndpointSlice)
if !ok {
svc.Log.Warn("upsert got invalid type", "raw", raw)
svc.Log.Error("upsert got invalid type", "raw", raw)
return nil
}

svc.serviceLock.Lock()
defer svc.serviceLock.Unlock()

// Extract service name and format key
svcName := endpointSlice.Labels[discoveryv1.LabelServiceName]
svcKey := endpointSlice.Namespace + "/" + endpointSlice.Labels[discoveryv1.LabelServiceName]

if svc.serviceMap == nil {
svc.serviceMap = make(map[string]*corev1.Service)
}
var err error
if svc.serviceMap[svcKey] == nil {
svc.serviceMap[svcKey], err = t.Service.Client.CoreV1().Services(endpointSlice.Namespace).Get(t.Ctx, svcName, metav1.GetOptions{})
if err != nil {
t.Log.Error("issue getting service", "error", err)
return err
}
}

// Check if we care about endpoints for this service
if !svc.shouldTrackEndpoints(key) {
if !svc.shouldTrackEndpoints(svcKey) {
return nil
}

// We are tracking this service so let's keep track of the endpoints
if svc.endpointSlicesMap == nil {
svc.endpointSlicesMap = make(map[string][]discoveryv1.EndpointSlice)
svc.endpointSlicesMap = make(map[string]map[string]discoveryv1.EndpointSlice)
}
svc.endpointSlicesMap[key] = []discoveryv1.EndpointSlice{*endpointSlice}
if _, ok := svc.endpointSlicesMap[svcKey]; !ok {
svc.endpointSlicesMap[svcKey] = make(map[string]discoveryv1.EndpointSlice)
}
svc.endpointSlicesMap[svcKey][endptKey] = *endpointSlice

// Update the registration and trigger a sync
svc.generateRegistrations(key)
svc.generateRegistrations(svcKey)
svc.sync()
svc.Log.Info("upsert endpoint", "key", key)
svc.Log.Info("upsert endpoint", "key", endptKey)
return nil
}

func (t *serviceEndpointsResource) Delete(key string, _ interface{}) error {
func (t *serviceEndpointsResource) Delete(endptKey string, raw interface{}) error {

endpointSlice, ok := raw.(*discoveryv1.EndpointSlice)
if !ok {
t.Service.Log.Error("upsert got invalid type", "raw", raw)
return nil
}

t.Service.serviceLock.Lock()
defer t.Service.serviceLock.Unlock()

// Extract service name and format key
svcName := endpointSlice.Labels[discoveryv1.LabelServiceName]
svcKey := endpointSlice.Namespace + "/" + svcName

// This is a bit of an optimization. We only want to force a resync
// if we were tracking this endpoint to begin with and that endpoint
// had associated registrations.
if _, ok := t.Service.endpointSlicesMap[key]; ok {
delete(t.Service.endpointSlicesMap, key)
if _, ok := t.Service.consulMap[key]; ok {
delete(t.Service.consulMap, key)
t.Service.sync()
if _, ok := t.Service.endpointSlicesMap[svcKey]; ok {
if _, ok := t.Service.endpointSlicesMap[svcKey][endptKey]; ok {
delete(t.Service.endpointSlicesMap[svcKey], endptKey)
if _, ok := t.Service.consulMap[svcKey]; ok {
delete(t.Service.consulMap, svcKey)
t.Service.sync()
}
}
}

t.Service.Log.Info("delete endpoint", "key", key)
t.Service.Log.Info("delete endpoint", "key", endptKey)
return nil
}

Expand Down
12 changes: 8 additions & 4 deletions control-plane/catalog/to-consul/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/utils/pointer"
Expand Down Expand Up @@ -46,6 +47,9 @@ func TestServiceResource_createDelete(t *testing.T) {
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

createNodes(t, client)
createEndpointSlice(t, client, svc.Name, metav1.NamespaceDefault)

// Delete
require.NoError(t, client.CoreV1().Services(metav1.NamespaceDefault).Delete(context.Background(), "foo", metav1.DeleteOptions{}))

Expand Down Expand Up @@ -977,12 +981,12 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) {

createNodes(t, client)

createEndpointSlice(t, client, "foo", metav1.NamespaceDefault)

// Insert the service
svc := nodePortService("foo", metav1.NamespaceDefault)
svc.Annotations = map[string]string{annotationServicePort: "rpc"}
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
createEndpointSlice(t, client, svc.Name, metav1.NamespaceDefault)

require.NoError(t, err)

// Verify what we got
Expand Down Expand Up @@ -2136,8 +2140,8 @@ func createEndpointSlice(t *testing.T, client *fake.Clientset, serviceName strin
context.Background(),
&discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
GenerateName: serviceName + "-",
Labels: map[string]string{discoveryv1.LabelServiceName: serviceName},
Labels: map[string]string{discoveryv1.LabelServiceName: serviceName},
Name: serviceName + "-" + rand.String(5),
},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{
Expand Down

0 comments on commit 09506ba

Please sign in to comment.