Skip to content

Commit

Permalink
service: Update for L7 LB while locked
Browse files Browse the repository at this point in the history
Keep service manager locked while reupserting services after L7 LB
redirection updates. This removes the race where a service would be
re-upserted while having been (concurrently) removed.

Fixes: #18894

Reported-by: Jussi Maki <joamaki@isovalent.com>
Signed-off-by: Jarno Rajahalme <jarno@isovalent.com>
  • Loading branch information
jrajahalme committed Apr 3, 2024
1 parent fe46958 commit 5c01103
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 71 deletions.
3 changes: 3 additions & 0 deletions pkg/logging/logfields/logfields.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ const (
// BackendSlot is the backend slot number in a service BPF map
BackendSlot = "backendSlot"

// ProxyName is the name of a proxy (e.g., "Envoy")
ProxyName = "proxyName"

// L7LBProxyPort is the port number of the Envoy listener a L7 LB service redirects traffic to for load balancing.
L7LBProxyPort = "l7LBProxyPort"

Expand Down
120 changes: 49 additions & 71 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,6 @@ func (s *Service) RegisterL7LBServiceRedirect(serviceName lb.ServiceName, resour
return errors.New("proxy port for L7 LB redirection must be nonzero")
}

s.Lock()
err := s.registerL7LBServiceRedirect(serviceName, resourceName, proxyPort)
s.Unlock()
if err != nil {
return err
}

if logging.CanLogAt(log.Logger, logrus.DebugLevel) {
log.WithFields(logrus.Fields{
logfields.ServiceName: serviceName.Name,
Expand All @@ -292,16 +285,15 @@ func (s *Service) RegisterL7LBServiceRedirect(serviceName lb.ServiceName, resour
}).Debug("Registering service for L7 proxy port redirection")
}

svcs := s.GetDeepCopyServicesByName(serviceName.Name, serviceName.Namespace)
for _, svc := range svcs {
// Upsert the existing service again after updating 'l7lbSvcs'
// map so that the service will get the l7 flag set in bpf
// datapath.
if _, _, err := s.UpsertService(svc); err != nil {
return fmt.Errorf("error while updating service in LB map: %w", err)
}
s.Lock()
defer s.Unlock()

err := s.registerL7LBServiceRedirect(serviceName, resourceName, proxyPort)
if err != nil {
return err
}
return nil

return s.reUpsertServicesByName(serviceName.Name, serviceName.Namespace)
}

// 's' must be locked
Expand Down Expand Up @@ -331,28 +323,19 @@ func (s *Service) RegisterL7LBServiceBackendSync(serviceName lb.ServiceName, bac
return nil
}

s.Lock()
s.registerL7LBServiceBackendSync(serviceName, backendSyncRegistration)
s.Unlock()

if logging.CanLogAt(log.Logger, logrus.DebugLevel) {
log.WithFields(logrus.Fields{
logfields.ServiceName: serviceName.Name,
logfields.ServiceNamespace: serviceName.Namespace,
"proxyName": backendSyncRegistration.ProxyName(),
logfields.ProxyName: backendSyncRegistration.ProxyName(),
}).Debug("Registering service backend sync for L7 loadbalancer")
}

svcs := s.GetDeepCopyServicesByName(serviceName.Name, serviceName.Namespace)
for _, svc := range svcs {
// Upsert the existing service again after updating 'l7lbSvcs'
// map so that the registered BackendSync are informed about the current
// Service Backends (e.g. Envoy)
if _, _, err := s.UpsertService(svc); err != nil {
return fmt.Errorf("error while updating service: %w", err)
}
}
return nil
s.Lock()
defer s.Unlock()
s.registerL7LBServiceBackendSync(serviceName, backendSyncRegistration)

return s.reUpsertServicesByName(serviceName.Name, serviceName.Namespace)
}

// 's' must be locked
Expand All @@ -371,26 +354,23 @@ func (s *Service) registerL7LBServiceBackendSync(serviceName lb.ServiceName, bac
}

func (s *Service) DeregisterL7LBServiceRedirect(serviceName lb.ServiceName, resourceName L7LBResourceName) error {
if logging.CanLogAt(log.Logger, logrus.DebugLevel) {
log.WithFields(logrus.Fields{
logfields.ServiceName: serviceName.Name,
logfields.ServiceNamespace: serviceName.Namespace,
}).Debug("Deregistering service from L7 load balancing")
}

s.Lock()
defer s.Unlock()

changed := s.deregisterL7LBServiceRedirect(serviceName, resourceName)
s.Unlock()

if !changed {
return nil
}

log.WithFields(logrus.Fields{
logfields.ServiceName: serviceName.Name,
logfields.ServiceNamespace: serviceName.Namespace,
}).Debug("Deregister service from L7 load balancing")

svcs := s.GetDeepCopyServicesByName(serviceName.Name, serviceName.Namespace)
for _, svc := range svcs {
if _, _, err := s.UpsertService(svc); err != nil {
return fmt.Errorf("Error while removing service from LB map: %w", err)
}
}
return nil
return s.reUpsertServicesByName(serviceName.Name, serviceName.Namespace)
}

func (s *Service) deregisterL7LBServiceRedirect(serviceName lb.ServiceName, resourceName L7LBResourceName) bool {
Expand Down Expand Up @@ -422,27 +402,23 @@ func (s *Service) DeregisterL7LBServiceBackendSync(serviceName lb.ServiceName, b
return nil
}

if logging.CanLogAt(log.Logger, logrus.DebugLevel) {
log.WithFields(logrus.Fields{
logfields.ServiceName: serviceName.Name,
logfields.ServiceNamespace: serviceName.Namespace,
logfields.ProxyName: backendSyncRegistration.ProxyName(),
}).Debug("Deregistering service backend sync for L7 loadbalancer")
}

s.Lock()
defer s.Unlock()
changed := s.deregisterL7LBServiceBackendSync(serviceName, backendSyncRegistration)
s.Unlock()

if !changed {
return nil
}

log.WithFields(logrus.Fields{
logfields.ServiceName: serviceName.Name,
logfields.ServiceNamespace: serviceName.Namespace,
"proxyName": backendSyncRegistration.ProxyName(),
}).Debug("Deregister service backend sync for L7 loadbalancer")

svcs := s.GetDeepCopyServicesByName(serviceName.Name, serviceName.Namespace)
for _, svc := range svcs {
if _, _, err := s.UpsertService(svc); err != nil {
return fmt.Errorf("Error while removing service from LB map: %w", err)
}
}
return nil
return s.reUpsertServicesByName(serviceName.Name, serviceName.Namespace)
}

func (s *Service) deregisterL7LBServiceBackendSync(serviceName lb.ServiceName, backendSyncRegistration BackendSyncer) bool {
Expand Down Expand Up @@ -649,6 +625,21 @@ func (s *Service) UpsertService(params *lb.SVC) (bool, lb.ID, error) {
return s.upsertService(params)
}

// reUpsertServicesByName upserts a service again to update it's internal state after
// changes for L7 service redirection.
// Write lock on 's' must be held.
func (s *Service) reUpsertServicesByName(name, namespace string) error {
for _, svc := range s.svcByHash {
if svc.svcName.Name == name && svc.svcName.Namespace == namespace {
svcCopy := svc.deepCopyToLBSVC()
if _, _, err := s.upsertService(svcCopy); err != nil {
return fmt.Errorf("error while updating service in LB map: %w", err)
}
}
}
return nil
}

func (s *Service) upsertService(params *lb.SVC) (bool, lb.ID, error) {
empty := L7LBResourceName{}

Expand Down Expand Up @@ -1120,19 +1111,6 @@ func (s *Service) GetDeepCopyServices() []*lb.SVC {
return svcs
}

// GetDeepCopyServicesByName returns a deep-copy all matching services.
func (s *Service) GetDeepCopyServicesByName(name, namespace string) (svcs []*lb.SVC) {
s.RLock()
defer s.RUnlock()

for _, svc := range s.svcByHash {
if svc.svcName.Name == name && svc.svcName.Namespace == namespace {
svcs = append(svcs, svc.deepCopyToLBSVC())
}
}
return svcs
}

// GetDeepCopyServiceByFrontend returns a deep-copy of the service that matches the Frontend address.
func (s *Service) GetDeepCopyServiceByFrontend(frontend lb.L3n4Addr) (*lb.SVC, bool) {
s.RLock()
Expand Down

0 comments on commit 5c01103

Please sign in to comment.