Skip to content

Commit

Permalink
services: refactor SyncWithK8sFinished to return stale services
Browse files Browse the repository at this point in the history
Refactor the SyncWithK8sFinished function to return the list of services
with stale backends, which should be refreshed, rather than directly
refreshing them. This makes the separation more clear, allowing to avoid
having to pass the refresh function as parameter and preventing possible
deadlocks due to incorrect mutex locking (due to the interdependencies
between the service subsystem and service cache).

Suggested-by: Jussi Maki <jussi@isovalent.com>
Signed-off-by: Marco Iorio <marco.iorio@isovalent.com>
  • Loading branch information
giorio94 committed Oct 30, 2023
1 parent 17d1901 commit 5f69e0c
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 43 deletions.
15 changes: 14 additions & 1 deletion daemon/cmd/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
"github.com/cilium/cilium/pkg/k8s/watchers/resources"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/maps/ctmap"
"github.com/cilium/cilium/pkg/maps/lxcmap"
Expand Down Expand Up @@ -472,7 +473,19 @@ func (d *Daemon) initRestore(restoredEndpoints *endpointRestoreState, endpointsR
localServices = d.k8sWatcher.K8sSvcCache.LocalServices()
}

return d.svc.SyncWithK8sFinished(d.k8sWatcher.K8sSvcCache.EnsureService, localOnly, localServices)
stale, err := d.svc.SyncWithK8sFinished(localOnly, localServices)

// Always process the list of stale services, regardless
// of whether an error was returned.
swg := lock.NewStoppableWaitGroup()
for _, svc := range stale {
d.k8sWatcher.K8sSvcCache.EnsureService(svc, swg)
}

swg.Stop()
swg.Wait()

return err
},
Context: d.ctx,
},
Expand Down
37 changes: 10 additions & 27 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,31 +1219,14 @@ func (s *Service) restoreAndDeleteOrphanSourceRanges() error {
// The removal is based on an assumption that during the sync period
// UpsertService() is going to be called for each alive service.
//
// Additionally, it returns a list of services which are associated with
// stale backends, and which shall be refreshed. Stale services shall be
// refreshed regardless of whether an error is also returned or not.
//
// The localOnly flag allows to perform a two pass removal, handling local
// services first, and processing global ones only after full synchronization
// with all remote clusters.
func (s *Service) SyncWithK8sFinished(ensurer func(k8s.ServiceID, *lock.StoppableWaitGroup) bool, localOnly bool,
localServices sets.Set[k8s.ServiceID]) error {
servicesWithStaleBackends := sets.New[lb.ServiceName]()

// We need to trigger the stale services refresh while not holding the
// lock, to ensure that the generated events can be processed, and to
// prevent a possible deadlock in case the events channel is already full.
defer func() {
swg := lock.NewStoppableWaitGroup()

for svc := range servicesWithStaleBackends {
ensurer(k8s.ServiceID{
Cluster: svc.Cluster,
Namespace: svc.Namespace,
Name: svc.Name,
}, swg)
}

swg.Stop()
swg.Wait()
}()

func (s *Service) SyncWithK8sFinished(localOnly bool, localServices sets.Set[k8s.ServiceID]) (stale []k8s.ServiceID, err error) {
s.Lock()
defer s.Unlock()

Expand All @@ -1266,11 +1249,11 @@ func (s *Service) SyncWithK8sFinished(ensurer func(k8s.ServiceID, *lock.Stoppabl
Warn("Deleting no longer present service")

if err := s.deleteServiceLocked(svc); err != nil {
return fmt.Errorf("Unable to remove service %+v: %s", svc, err)
return stale, fmt.Errorf("Unable to remove service %+v: %s", svc, err)
}
} else if svc.restoredBackendHashes.Len() > 0 {
// The service is still associated with stale backends
servicesWithStaleBackends.Insert(svc.svcName)
stale = append(stale, svcID)
log.WithFields(logrus.Fields{
logfields.ServiceID: svc.frontend.ID,
logfields.ServiceName: svc.svcName.String(),
Expand All @@ -1285,13 +1268,13 @@ func (s *Service) SyncWithK8sFinished(ensurer func(k8s.ServiceID, *lock.Stoppabl
if localOnly {
// Wait for full clustermesh synchronization before finalizing the
// removal of orphan backends and affinity matches.
return nil
return stale, nil
}

// Remove no longer existing affinity matches
if option.Config.EnableSessionAffinity {
if err := s.deleteOrphanAffinityMatchesLocked(); err != nil {
return err
return stale, err
}
}

Expand All @@ -1301,7 +1284,7 @@ func (s *Service) SyncWithK8sFinished(ensurer func(k8s.ServiceID, *lock.Stoppabl

}

return nil
return stale, nil
}

func (s *Service) createSVCInfoIfNotExist(p *lb.SVC) (*svcInfo, bool, bool,
Expand Down
26 changes: 11 additions & 15 deletions pkg/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
datapathTypes "github.com/cilium/cilium/pkg/datapath/types"
"github.com/cilium/cilium/pkg/k8s"
lb "github.com/cilium/cilium/pkg/loadbalancer"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/maps/lbmap"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
"github.com/cilium/cilium/pkg/option"
Expand Down Expand Up @@ -627,7 +626,8 @@ func (m *ManagerTestSuite) TestSyncWithK8sFinished(c *C) {

// cilium-agent finished the initialization, and thus SyncWithK8sFinished
// is called
err = m.svc.SyncWithK8sFinished(func(k8s.ServiceID, *lock.StoppableWaitGroup) bool { return true }, false, nil)
stale, err := m.svc.SyncWithK8sFinished(false, nil)
c.Assert(stale, IsNil)
c.Assert(err, IsNil)

// svc1 should be removed from cilium while svc2 is synced
Expand Down Expand Up @@ -745,33 +745,29 @@ func TestRestoreServiceWithStaleBackends(t *testing.T) {
require.ElementsMatch(t, backendAddrs, toBackendAddrs(lbmap.ServiceByID[uint16(id1)].Backends), "lbmap incorrectly modified")
require.ElementsMatch(t, backendAddrs, toBackendAddrs(maps.Values(lbmap.BackendByID)), "lbmap incorrectly modified")

// Trigger a new upsertion: this mimics what would eventually happen when calling ServiceCache.EnsureService()
ensurer := func(id k8s.ServiceID, swg *lock.StoppableWaitGroup) bool {
defer swg.Done()
if id.Namespace == "foo" && id.Name == "bar" {
_, _, err := svc.upsertService(service("foo", "bar", "172.16.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.5"))
require.NoError(t, err, "Failed to upsert service")
return true
}
require.Fail(t, "Unexpected service ID", "Service ID: %v", id)
return false
}

svcID := k8s.ServiceID{Namespace: "foo", Name: "bar"}
localServices := sets.New[k8s.ServiceID]()
if tt.isLocal {
localServices.Insert(svcID)
}

require.NoError(t, svc.SyncWithK8sFinished(ensurer, tt.localOnly, localServices), "Failed to trigger garbage collection")
stale, err := svc.SyncWithK8sFinished(tt.localOnly, localServices)
require.NoError(t, err, "Failed to trigger garbage collection")

require.Contains(t, lbmap.ServiceByID, uint16(id1), "service incorrectly removed from lbmap")

// Stale backends should now have been removed (if appropriate)
if tt.expectStaleBackends {
require.Empty(t, stale)
require.ElementsMatch(t, backendAddrs, toBackendAddrs(lbmap.ServiceByID[uint16(id1)].Backends), "stale backends should not have been removed from lbmap")
require.ElementsMatch(t, backendAddrs, toBackendAddrs(maps.Values(lbmap.BackendByID)), "stale backends should not have been removed from lbmap")
} else {
require.ElementsMatch(t, stale, []k8s.ServiceID{svcID})

// Trigger a new upsertion: this mimics what would eventually happen when calling ServiceCache.EnsureService()
_, _, err := svc.upsertService(service("foo", "bar", "172.16.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.5"))
require.NoError(t, err, "Failed to upsert service")

require.ElementsMatch(t, finalBackendAddrs, toBackendAddrs(lbmap.ServiceByID[uint16(id1)].Backends), "stale backends not correctly removed from lbmap")
require.ElementsMatch(t, finalBackendAddrs, toBackendAddrs(maps.Values(lbmap.BackendByID)), "stale backends not correctly removed from lbmap")
}
Expand Down

0 comments on commit 5f69e0c

Please sign in to comment.