Skip to content

Commit

Permalink
Cleanup and changelog entry
Browse files Browse the repository at this point in the history
  • Loading branch information
jukie committed Feb 28, 2024
1 parent 0b34e99 commit 942f224
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 29 deletions.
3 changes: 3 additions & 0 deletions .changelog/3693.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
catalog: Endpoint Zone Topology is now added to registered consul services under Meta.
```
76 changes: 47 additions & 29 deletions control-plane/catalog/to-consul/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ type ServiceResource struct {
// in the form <kube namespace>/<kube svc name>.
serviceMap map[string]*corev1.Service

// endpointSliceListMap uses the same keys as serviceMap but maps to the EndpointSliceList
// endpointSlicesMap uses the same keys as serviceMap but maps to a list of EndpointSlices
// of each service.
endpointSliceListMap map[string][]discoveryv1.EndpointSlice
endpointSlicesMap map[string][]discoveryv1.EndpointSlice

// EnableIngress enables syncing of the hostname from an Ingress resource
// to the service registration if an Ingress rule matches the service.
Expand Down Expand Up @@ -227,26 +227,45 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error {
t.serviceMap[key] = service
t.Log.Debug("[ServiceResource.Upsert] adding service to serviceMap", "key", key, "service", service)

// If we care about endpoints, we should do the initial endpoints load.
// If we care about endpoints, we should load the associated endpoint slices.
if t.shouldTrackEndpoints(key) {

// account for continue
endpointSliceList, err := t.Client.DiscoveryV1().
EndpointSlices(service.Namespace).
List(t.Ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, service.Name)})
var allEndpointSlices []discoveryv1.EndpointSlice
labelSelector := fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, service.Name)
continueToken := ""
limit := int64(100)

if err != nil {
t.Log.Warn("error loading initial endpoint slices list",
"key", key,
"err", err)
return err // Ensure to handle or return the error appropriately
} else {
if t.endpointSliceListMap == nil {
t.endpointSliceListMap = make(map[string][]discoveryv1.EndpointSlice)
for {
opts := metav1.ListOptions{
LabelSelector: labelSelector,
Limit: limit,
Continue: continueToken,
}
endpointSliceList, err := t.Client.DiscoveryV1().
EndpointSlices(service.Namespace).
List(t.Ctx, opts)

if err != nil {
t.Log.Warn("error loading endpoint slices list",
"key", key,
"err", err)
return err
}
t.endpointSliceListMap[key] = endpointSliceList.Items
t.Log.Debug("[ServiceResource.Upsert] adding service's endpoint slices to endpointSliceListMap", "key", key, "service", service, "endpointSliceList", endpointSliceList)

allEndpointSlices = append(allEndpointSlices, endpointSliceList.Items...)

if endpointSliceList.Continue != "" {
continueToken = endpointSliceList.Continue
} else {
break
}
}

if t.endpointSlicesMap == nil {
t.endpointSlicesMap = make(map[string][]discoveryv1.EndpointSlice)
}
t.endpointSlicesMap[key] = allEndpointSlices
t.Log.Debug("[ServiceResource.Upsert] adding service's endpoint slices to endpointSlicesMap", "key", key, "service", service, "endpointSlices", allEndpointSlices)
}

// Update the registration and trigger a sync
Expand All @@ -271,8 +290,8 @@ func (t *ServiceResource) Delete(key string, _ interface{}) error {
func (t *ServiceResource) doDelete(key string) {
delete(t.serviceMap, key)
t.Log.Debug("[doDelete] deleting service from serviceMap", "key", key)
delete(t.endpointSliceListMap, key)
t.Log.Debug("[doDelete] deleting endpoints from endpointSliceListMap", "key", key)
delete(t.endpointSlicesMap, key)
t.Log.Debug("[doDelete] deleting endpoints from endpointSlicesMap", "key", key)
// If there were registrations related to this service, then
// delete them and sync.
if _, ok := t.consulMap[key]; ok {
Expand Down Expand Up @@ -588,11 +607,11 @@ func (t *ServiceResource) generateRegistrations(key string) {
// pods are running on. This way we don't register _every_ K8S
// node as part of the service.
case corev1.ServiceTypeNodePort:
if t.endpointSliceListMap == nil {
if t.endpointSlicesMap == nil {
return
}

endpointSliceList := t.endpointSliceListMap[key]
endpointSliceList := t.endpointSlicesMap[key]
if endpointSliceList == nil {
return
}
Expand Down Expand Up @@ -684,11 +703,11 @@ func (t *ServiceResource) registerServiceInstance(
overridePortNumber int,
useHostname bool) {

if t.endpointSliceListMap == nil {
if t.endpointSlicesMap == nil {
return
}

endpointSliceList := t.endpointSliceListMap[key]
endpointSliceList := t.endpointSlicesMap[key]
if endpointSliceList == nil {
return
}
Expand Down Expand Up @@ -861,11 +880,10 @@ func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error {
}

// We are tracking this service so let's keep track of the endpoints
if svc.endpointSliceListMap == nil {
svc.endpointSliceListMap = make(map[string][]discoveryv1.EndpointSlice)
if svc.endpointSlicesMap == nil {
svc.endpointSlicesMap = make(map[string][]discoveryv1.EndpointSlice)
}
//svc.endpointSliceListMap[key] = &discoveryv1.EndpointSliceList{Items: []discoveryv1.EndpointSlice{*endpointSliceList}}
svc.endpointSliceListMap[key] = []discoveryv1.EndpointSlice{*endpointSlice}
svc.endpointSlicesMap[key] = []discoveryv1.EndpointSlice{*endpointSlice}

// Update the registration and trigger a sync
svc.generateRegistrations(key)
Expand All @@ -881,8 +899,8 @@ func (t *serviceEndpointsResource) Delete(key string, _ interface{}) error {
// This is a bit of an optimization. We only want to force a resync
// if we were tracking this endpoint to begin with and that endpoint
// had associated registrations.
if _, ok := t.Service.endpointSliceListMap[key]; ok {
delete(t.Service.endpointSliceListMap, key)
if _, ok := t.Service.endpointSlicesMap[key]; ok {
delete(t.Service.endpointSlicesMap, key)
if _, ok := t.Service.consulMap[key]; ok {
delete(t.Service.consulMap, key)
t.Service.sync()
Expand Down

0 comments on commit 942f224

Please sign in to comment.