Skip to content

Commit

Permalink
service: Update for L7 LB while locked
Browse files Browse the repository at this point in the history
[ upstream commit d913b62 ]

Keep service manager locked while reupserting services after L7 LB
redirection updates. This removes the race where a service would be
re-upserted while having been (concurrently) removed.

Fixes: #18894

Reported-by: Jussi Maki <joamaki@isovalent.com>
Signed-off-by: Jarno Rajahalme <jarno@isovalent.com>
  • Loading branch information
jrajahalme authored and lmb committed Apr 11, 2024
1 parent d2c0a86 commit c7a635d
Showing 1 changed file with 32 additions and 43 deletions.
75 changes: 32 additions & 43 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,6 @@ func NewService(monitorAgent monitorAgent.Agent, envoyCache envoyCache, lbmap da
// RegisterL7LBService makes the given service to be locally forwarded to the
// given proxy port.
func (s *Service) RegisterL7LBService(serviceName, resourceName lb.ServiceName, ports []string, proxyPort uint16) error {
s.Lock()
err := s.registerL7LBService(serviceName, resourceName, ports, proxyPort)
s.Unlock()
if err != nil {
return err
}

if logging.CanLogAt(log.Logger, logrus.DebugLevel) {
log.WithFields(logrus.Fields{
logfields.ServiceName: serviceName.Name,
Expand All @@ -315,17 +308,15 @@ func (s *Service) RegisterL7LBService(serviceName, resourceName lb.ServiceName,
}).Debug("Registering service for L7 load balancing")
}

svcs := s.GetDeepCopyServicesByName(serviceName.Name, serviceName.Namespace)
for _, svc := range svcs {
// Upsert the existing service again after updating 'l7lbSvcs'
// map so that the service will get the l7 flag set in bpf
// datapath and Envoy endpoint resources are created for
// registered services.
if _, _, err := s.UpsertService(svc); err != nil {
return fmt.Errorf("error while updating service in LB map: %s", err)
}
s.Lock()
defer s.Unlock()

err := s.registerL7LBService(serviceName, resourceName, ports, proxyPort)
if err != nil {
return err
}
return nil

return s.reUpsertServicesByName(serviceName.Name, serviceName.Namespace)
}

// 's' must be locked
Expand Down Expand Up @@ -363,26 +354,22 @@ func (s *Service) RegisterL7LBServiceBackendSync(serviceName, resourceName lb.Se
}

func (s *Service) RemoveL7LBService(serviceName, resourceName lb.ServiceName) error {
if logging.CanLogAt(log.Logger, logrus.DebugLevel) {
log.WithFields(logrus.Fields{
logfields.ServiceName: serviceName.Name,
logfields.ServiceNamespace: serviceName.Namespace,
}).Debug("Removing service from L7 load balancing")
}

s.Lock()
defer s.Unlock()
changed := s.removeL7LBService(serviceName, resourceName)
s.Unlock()

if !changed {
return nil
}

log.WithFields(logrus.Fields{
logfields.ServiceName: serviceName.Name,
logfields.ServiceNamespace: serviceName.Namespace,
}).Debug("Removing service from L7 load balancing")

svcs := s.GetDeepCopyServicesByName(serviceName.Name, serviceName.Namespace)
for _, svc := range svcs {
if _, _, err := s.UpsertService(svc); err != nil {
return fmt.Errorf("Error while removing service from LB map: %s", err)
}
}
return nil
return s.reUpsertServicesByName(serviceName.Name, serviceName.Namespace)
}

func (s *Service) removeL7LBService(serviceName, resourceName lb.ServiceName) bool {
Expand Down Expand Up @@ -579,6 +566,21 @@ func (s *Service) UpsertService(params *lb.SVC) (bool, lb.ID, error) {
return s.upsertService(params)
}

// reUpsertServicesByName upserts a service again to update it's internal state after
// changes for L7 service redirection.
// Write lock on 's' must be held.
func (s *Service) reUpsertServicesByName(name, namespace string) error {
for _, svc := range s.svcByHash {
if svc.svcName.Name == name && svc.svcName.Namespace == namespace {
svcCopy := svc.deepCopyToLBSVC()
if _, _, err := s.upsertService(svcCopy); err != nil {
return fmt.Errorf("error while updating service in LB map: %w", err)
}
}
}
return nil
}

func (s *Service) upsertService(params *lb.SVC) (bool, lb.ID, error) {
empty := lb.ServiceName{}

Expand Down Expand Up @@ -1096,19 +1098,6 @@ func (s *Service) GetDeepCopyServices() []*lb.SVC {
return svcs
}

// GetDeepCopyServicesByName returns a deep-copy all matching services.
func (s *Service) GetDeepCopyServicesByName(name, namespace string) (svcs []*lb.SVC) {
s.RLock()
defer s.RUnlock()

for _, svc := range s.svcByHash {
if svc.svcName.Name == name && svc.svcName.Namespace == namespace {
svcs = append(svcs, svc.deepCopyToLBSVC())
}
}
return svcs
}

// GetDeepCopyServiceByFrontend returns a deep-copy of the service that matches the Frontend address.
func (s *Service) GetDeepCopyServiceByFrontend(frontend lb.L3n4Addr) (*lb.SVC, bool) {
s.RLock()
Expand Down

0 comments on commit c7a635d

Please sign in to comment.