Skip to content

Commit

Permalink
service: Update for L7 LB while locked
Browse files Browse the repository at this point in the history
[ upstream commit d913b62 ]

Keep service manager locked while reupserting services after L7 LB
redirection updates. This removes the race where a service would be
re-upserted while having been (concurrently) removed.

Fixes: #18894

Reported-by: Jussi Maki <joamaki@isovalent.com>
Signed-off-by: Jarno Rajahalme <jarno@isovalent.com>
  • Loading branch information
jrajahalme authored and lmb committed Apr 11, 2024
1 parent 9246f38 commit 33c7352
Showing 1 changed file with 37 additions and 45 deletions.
82 changes: 37 additions & 45 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cilium/cilium/pkg/k8s"
lb "github.com/cilium/cilium/pkg/loadbalancer"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/logging"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/maps/lbmap"
"github.com/cilium/cilium/pkg/metrics"
Expand Down Expand Up @@ -277,31 +278,24 @@ func NewService(monitorNotify monitorNotify, envoyCache envoyCache, lbmap datapa
// RegisterL7LBService makes the given service to be locally forwarded to the
// given proxy port.
func (s *Service) RegisterL7LBService(serviceName, resourceName lb.ServiceName, ports []string, proxyPort uint16) error {
if logging.CanLogAt(log.Logger, logrus.DebugLevel) {
log.WithFields(logrus.Fields{
logfields.ServiceName: serviceName.Name,
logfields.ServiceNamespace: serviceName.Namespace,
logfields.L7LBFrontendPorts: ports,
logfields.L7LBProxyPort: proxyPort,
}).Debug("Registering service for L7 load balancing")
}

s.Lock()
defer s.Unlock()

err := s.registerL7LBService(serviceName, resourceName, ports, proxyPort)
s.Unlock()
if err != nil {
return err
}

log.WithFields(logrus.Fields{
logfields.ServiceName: serviceName.Name,
logfields.ServiceNamespace: serviceName.Namespace,
logfields.L7LBFrontendPorts: ports,
logfields.L7LBProxyPort: proxyPort,
}).Debug("Registering service for L7 load balancing")

svcs := s.GetDeepCopyServicesByName(serviceName.Name, serviceName.Namespace)
for _, svc := range svcs {
// Upsert the existing service again after updating 'l7lbSvcs'
// map so that the service will get the l7 flag set in bpf
// datapath and Envoy endpoint resources are created for
// registered services.
if _, _, err := s.UpsertService(svc); err != nil {
return fmt.Errorf("error while updating service in LB map: %s", err)
}
}
return nil
return s.reUpsertServicesByName(serviceName.Name, serviceName.Namespace)
}

// 's' must be locked
Expand Down Expand Up @@ -338,26 +332,22 @@ func (s *Service) RegisterL7LBServiceBackendSync(serviceName, resourceName lb.Se
}

func (s *Service) RemoveL7LBService(serviceName, resourceName lb.ServiceName) error {
if logging.CanLogAt(log.Logger, logrus.DebugLevel) {
log.WithFields(logrus.Fields{
logfields.ServiceName: serviceName.Name,
logfields.ServiceNamespace: serviceName.Namespace,
}).Debug("Removing service from L7 load balancing")
}

s.Lock()
defer s.Unlock()
changed := s.removeL7LBService(serviceName, resourceName)
s.Unlock()

if !changed {
return nil
}

log.WithFields(logrus.Fields{
logfields.ServiceName: serviceName.Name,
logfields.ServiceNamespace: serviceName.Namespace,
}).Debug("Removing service from L7 load balancing")

svcs := s.GetDeepCopyServicesByName(serviceName.Name, serviceName.Namespace)
for _, svc := range svcs {
if _, _, err := s.UpsertService(svc); err != nil {
return fmt.Errorf("Error while removing service from LB map: %s", err)
}
}
return nil
return s.reUpsertServicesByName(serviceName.Name, serviceName.Namespace)
}

func (s *Service) removeL7LBService(serviceName, resourceName lb.ServiceName) bool {
Expand Down Expand Up @@ -555,6 +545,21 @@ func (s *Service) UpsertService(params *lb.SVC) (bool, lb.ID, error) {
return s.upsertService(params)
}

// reUpsertServicesByName upserts a service again to update it's internal state after
// changes for L7 service redirection.
// Write lock on 's' must be held.
func (s *Service) reUpsertServicesByName(name, namespace string) error {
for _, svc := range s.svcByHash {
if svc.svcName.Name == name && svc.svcName.Namespace == namespace {
svcCopy := svc.deepCopyToLBSVC()
if _, _, err := s.upsertService(svcCopy); err != nil {
return fmt.Errorf("error while updating service in LB map: %w", err)
}
}
}
return nil
}

func (s *Service) upsertService(params *lb.SVC) (bool, lb.ID, error) {
empty := lb.ServiceName{}

Expand Down Expand Up @@ -937,19 +942,6 @@ func (s *Service) GetDeepCopyServices() []*lb.SVC {
return svcs
}

// GetDeepCopyServicesByName returns a deep-copy all matching services.
func (s *Service) GetDeepCopyServicesByName(name, namespace string) (svcs []*lb.SVC) {
s.RLock()
defer s.RUnlock()

for _, svc := range s.svcByHash {
if svc.svcName.Name == name && svc.svcName.Namespace == namespace {
svcs = append(svcs, svc.deepCopyToLBSVC())
}
}
return svcs
}

// RestoreServices restores services from BPF maps.
//
// It first restores all the service entries, followed by backend entries.
Expand Down

0 comments on commit 33c7352

Please sign in to comment.