Skip to content

Commit

Permalink
[net-8411] bug: fix premature token and service instance deletion due…
Browse files Browse the repository at this point in the history
… to pod fetch errors (#3758)
  • Loading branch information
ndhanushkodi committed Mar 25, 2024
1 parent 8d86b59 commit cc5850e
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 26 deletions.
4 changes: 4 additions & 0 deletions .changelog/3758.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```release-note:bug
control-plane: fix an issue where ACL tokens would prematurely be deleted and services would be deregistered if there
was a K8s API error fetching the pod.
```
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return ctrl.Result{RequeueAfter: requeueAfter}, err
}

// endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare
// deregisterEndpointAddress stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare
// against service instances in Consul to deregister them if they are not in the map.
endpointAddressMap := map[string]bool{}
deregisterEndpointAddress := map[string]bool{}

// Register all addresses of this Endpoints object as service instances in Consul.
for _, subset := range serviceEndpoints.Subsets {
Expand All @@ -193,16 +193,25 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
var pod corev1.Pod
objectKey := types.NamespacedName{Name: address.TargetRef.Name, Namespace: address.TargetRef.Namespace}
if err = r.Client.Get(ctx, objectKey, &pod); err != nil {
r.Log.Error(err, "failed to get pod", "name", address.TargetRef.Name)
errs = multierror.Append(errs, err)
// If the pod doesn't exist anymore, set up the deregisterEndpointAddress map to deregister it.
if k8serrors.IsNotFound(err) {
deregisterEndpointAddress[address.IP] = true
r.Log.Info("pod not found", "name", address.TargetRef.Name)
} else {
// If there was a different error fetching the pod, then log the error but don't deregister it
// since this could be a K8s API blip and we don't want to prematurely deregister.
deregisterEndpointAddress[address.IP] = false
r.Log.Error(err, "failed to get pod", "name", address.TargetRef.Name)
errs = multierror.Append(errs, err)
}
continue
}

svcName, ok := pod.Annotations[constants.AnnotationKubernetesService]
if ok && serviceEndpoints.Name != svcName {
r.Log.Info("ignoring endpoint because it doesn't match explicit service annotation", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
// deregistration for service instances that don't match the annotation happens
// later because we don't add this pod to the endpointAddressMap.
// Set up the deregisterEndpointAddress to deregister service instances that don't match the annotation.
deregisterEndpointAddress[address.IP] = true
continue
}

Expand All @@ -219,8 +228,8 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
r.Log.Error(err, "failed to register services or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true
// Build the deregisterEndpointAddress map up for deregistering service instances later.
deregisterEndpointAddress[pod.Status.PodIP] = false
} else {
r.Log.Info("detected an update to pre-consul-dataplane service", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
nodeAgentClientCfg, err := r.consulClientCfgForNodeAgent(apiClient, pod, serverState)
Expand All @@ -247,17 +256,17 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
r.Log.Error(err, "failed to register gateway or health check", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
}
// Build the endpointAddressMap up for deregistering service instances later.
endpointAddressMap[pod.Status.PodIP] = true
// Build the deregisterEndpointAddress map up for deregistering service instances later.
deregisterEndpointAddress[pod.Status.PodIP] = false
}
}
}
}

// Compare service instances in Consul with addresses in Endpoints. If an address is not in Endpoints, deregister
// from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during
// the registration codepath.
requeueAfter, err := r.deregisterService(ctx, apiClient, serviceEndpoints.Name, serviceEndpoints.Namespace, endpointAddressMap)
// from Consul. This uses deregisterEndpointAddress which is populated with the addresses in the Endpoints object to
// either deregister or keep during the registration codepath.
requeueAfter, err := r.deregisterService(ctx, apiClient, serviceEndpoints.Name, serviceEndpoints.Namespace, deregisterEndpointAddress)
if err != nil {
r.Log.Error(err, "failed to deregister endpoints", "name", serviceEndpoints.Name, "ns", serviceEndpoints.Namespace)
errs = multierror.Append(errs, err)
Expand Down Expand Up @@ -929,8 +938,8 @@ func getHealthCheckStatusReason(healthCheckStatus, podName, podNamespace string)
// "k8s-service-name". So, we query Consul services by "k8s-service-name" metadata.
// When querying by the k8s service name and namespace, the request will return service instances and
// associated proxy service instances.
// The argument endpointsAddressesMap decides whether to deregister *all* service instances or selectively deregister
// them only if they are not in endpointsAddressesMap. If the map is nil, it will deregister all instances. If the map
// The argument deregisterEndpointAddress decides whether to deregister *all* service instances or selectively deregister
// them only if they are not in deregisterEndpointAddress. If the map is nil, it will deregister all instances. If the map
// has addresses, it will only deregister instances not in the map.
// If the pod backing a Consul service instance still exists and the graceful shutdown lifecycle mode is enabled, the instance
// will not be deregistered. Instead, its health check will be updated to Critical in order to drain incoming traffic and
Expand All @@ -941,7 +950,7 @@ func (r *Controller) deregisterService(
apiClient *api.Client,
k8sSvcName string,
k8sSvcNamespace string,
endpointsAddressesMap map[string]bool) (time.Duration, error) {
deregisterEndpointAddress map[string]bool) (time.Duration, error) {

// Get services matching metadata from Consul
serviceInstances, err := r.serviceInstances(apiClient, k8sSvcName, k8sSvcNamespace)
Expand All @@ -958,7 +967,7 @@ func (r *Controller) deregisterService(
// every service instance.
var serviceDeregistered bool

if addressIsMissingFromEndpointsMap(svc.ServiceAddress, endpointsAddressesMap) {
if deregister(svc.ServiceAddress, deregisterEndpointAddress) {
// If graceful shutdown is enabled, continue to the next service instance and
// mark that an event requeue is needed. We should requeue at the longest time interval
// to prevent excessive re-queues. Also, updating the health status in Consul to Critical
Expand Down Expand Up @@ -1621,10 +1630,15 @@ func getMultiPortIdx(pod corev1.Pod, serviceEndpoints corev1.Endpoints) int {
return -1
}

func addressIsMissingFromEndpointsMap(address string, endpointsAddressesMap map[string]bool) bool {
if endpointsAddressesMap == nil {
// deregister returns that the address is marked for deregistration if the map is nil or if the address is explicitly
// marked in the map for deregistration.
func deregister(address string, deregisterEndpointAddress map[string]bool) bool {
if deregisterEndpointAddress == nil {
return true
}
_, ok := endpointsAddressesMap[address]
return !ok
deregister, ok := deregisterEndpointAddress[address]
if ok {
return deregister
}
return true
}

0 comments on commit cc5850e

Please sign in to comment.