Skip to content

Commit

Permalink
clustermesh: delete stale node/service entries on reconnect/disconnect
Browse files Browse the repository at this point in the history
Currently, a new shared store synchronizing node and service information
is created for each connection to the kvstore backend of a remote
cluster, and the old one is dropped. Yet, this approach leads to missing
the deletion event for possible entries that are removed in the remote
cluster during the reconnection process (added entries would instead be
detected by the initial "list" operation).

This commit fixes this issue using a fixed instance of node and service
stores per remote cluster, which are reused upon reconnection, while
transparently handling the emission of the appropriate deletion events
for all keys no longer present. To prevent reading an incomplete state
when watching an ephemeral kvstore instance that has not yet been
completely synchronized, the watch operation is started only once the
sync canary for the given prefix is present, if support is enabled.

In case the sync canary support is not enabled (e.g., because the remote
cluster is running an older version of the clustermesh-apiserver), there
is the possibility that non-stale keys are temporarily removed upon
reconnection, causing brief connectivity disruptions. Yet, this is not
different from what already happens today if the agent is restarted when
the remote kvstore has not yet been fully synchronized. Additionally,
this is not an issue when the remote kvstore is backed by persistent
storage, since it is already synchronized. Alternatively, we might
disable the deletion of stale entries if sync canaries are not
supported, at the cost of leaking those entries until the agent is
restarted. This behavior will be further detailed updating the release
notes in a subsequent commit (it can be prevented upgrading first all
clustermesh-apiservers and then the agents) after addressing the same
problem affecting the ipcache and identity entries.

Additionally, all keys are drained when disconnecting from a remote
cluster, to properly clean-up the status without requiring to restart
the agent. Differently, they are not drained when simply shutting down,
to avoid breaking existing connections on restart.

Signed-off-by: Marco Iorio <marco.iorio@isovalent.com>
  • Loading branch information
giorio94 authored and julianwiedmann committed Jun 1, 2023
1 parent bd6110a commit 150de13
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 61 deletions.
17 changes: 16 additions & 1 deletion pkg/clustermesh/clustermesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/metrics"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
serviceStore "github.com/cilium/cilium/pkg/service/store"
)

const (
Expand Down Expand Up @@ -258,7 +259,7 @@ func (cm *ClusterMesh) Stop(hive.HookContext) error {
}

for name, cluster := range cm.clusters {
cluster.onRemove()
cluster.onStop()
delete(cm.clusters, name)
}

Expand All @@ -281,6 +282,20 @@ func (cm *ClusterMesh) newRemoteCluster(name, path string) *remoteCluster {
swg: lock.NewStoppableWaitGroup(),
}

rc.remoteNodes = store.NewRestartableWatchStore(
name,
cm.conf.NodeKeyCreator,
cm.conf.NodeObserver,
store.RWSWithEntriesMetric(rc.mesh.metricTotalNodes.WithLabelValues(rc.mesh.conf.ClusterName, rc.mesh.nodeName, rc.name)),
)

rc.remoteServices = store.NewRestartableWatchStore(
name,
func() store.Key { return new(serviceStore.ClusterService) },
&remoteServiceObserver{remoteCluster: rc, swg: rc.swg},
store.RWSWithOnSyncCallback(func(ctx context.Context) { rc.swg.Stop() }),
)

return rc
}

Expand Down
26 changes: 22 additions & 4 deletions pkg/clustermesh/clustermesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/kvstore/store"
"github.com/cilium/cilium/pkg/lock"
nodeStore "github.com/cilium/cilium/pkg/node/store"
fakeConfig "github.com/cilium/cilium/pkg/option/fake"
"github.com/cilium/cilium/pkg/testutils"
testidentity "github.com/cilium/cilium/pkg/testutils/identity"
Expand Down Expand Up @@ -54,7 +55,7 @@ type testNode struct {
}

func (n *testNode) GetKeyName() string {
return path.Join(n.Name, n.Cluster)
return path.Join(n.Cluster, n.Name)
}

func (n *testNode) DeepKeyCopy() store.LocalKey {
Expand Down Expand Up @@ -98,7 +99,12 @@ func (s *ClusterMeshTestSuite) TestClusterMesh(c *C) {
defer cancel()

kvstore.SetupDummy("etcd")
defer kvstore.Client().Close(ctx)
defer func() {
kvstore.Client().DeletePrefix(context.TODO(), kvstore.ClusterConfigPrefix)
kvstore.Client().DeletePrefix(context.TODO(), kvstore.SyncedPrefix)
kvstore.Client().DeletePrefix(context.TODO(), nodeStore.NodeStorePrefix)
kvstore.Client().Close(ctx)
}()

identity.InitWellKnownIdentities(&fakeConfig.Config{})
// The nils are only used by k8s CRD identities. We default to kvstore.
Expand All @@ -121,6 +127,11 @@ func (s *ClusterMeshTestSuite) TestClusterMesh(c *C) {
ID: uint32(i),
}

if name == "cluster2" {
// Cluster2 supports synced canaries
config.Capabilities.SyncedCanaries = true
}

err = SetClusterConfig(ctx, name, &config, kvstore.Client())
c.Assert(err, IsNil)
}
Expand Down Expand Up @@ -153,9 +164,13 @@ func (s *ClusterMeshTestSuite) TestClusterMesh(c *C) {
})
c.Assert(cm, Not(IsNil))

nodesWSS := store.NewWorkqueueSyncStore(kvstore.Client(), nodeStore.NodeStorePrefix,
store.WSSWithSourceClusterName("cluster2"), // The one which is tested with sync canaries
)
go nodesWSS.Run(ctx)
nodeNames := []string{"foo", "bar", "baz"}

// wait for both clusters to appear in the list of cm clusters
// wait for all clusters to appear in the list of cm clusters
c.Assert(testutils.WaitUntil(func() bool {
return cm.NumReadyClusters() == 3
}, 10*time.Second), IsNil)
Expand All @@ -164,13 +179,16 @@ func (s *ClusterMeshTestSuite) TestClusterMesh(c *C) {
for _, rc := range cm.clusters {
rc.mutex.RLock()
for _, name := range nodeNames {
err = rc.remoteNodes.UpdateLocalKeySync(ctx, &testNode{Name: name, Cluster: rc.name})
nodesWSS.UpsertKey(ctx, &testNode{Name: name, Cluster: rc.name})
c.Assert(err, IsNil)
}
rc.mutex.RUnlock()
}
cm.mutex.RUnlock()

// Write the sync canary for cluster2
nodesWSS.Synced(ctx)

// wait for all cm nodes in both clusters to appear in the node list
c.Assert(testutils.WaitUntil(func() bool {
nodesMutex.RLock()
Expand Down
92 changes: 36 additions & 56 deletions pkg/clustermesh/remote_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/cilium/cilium/pkg/clustermesh/types"
cmtypes "github.com/cilium/cilium/pkg/clustermesh/types"
"github.com/cilium/cilium/pkg/controller"
"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/kvstore/store"
Expand Down Expand Up @@ -58,18 +57,16 @@ type remoteCluster struct {

// mutex protects the following variables
// - backend
// - store
// - remoteNodes
// - ipCacheWatcher
// - remoteIdentityCache
mutex lock.RWMutex

// store is the shared store representing all nodes in the remote cluster
remoteNodes *store.SharedStore
remoteNodes store.WatchStore

// remoteServices is the shared store representing services in remote
// clusters
remoteServices *store.SharedStore
remoteServices store.WatchStore

// ipCacheWatcher is the watcher that notifies about IP<->identity
// changes in the remote cluster
Expand Down Expand Up @@ -120,21 +117,14 @@ func (rc *remoteCluster) releaseOldConnection() {
ipCacheWatcher := rc.ipCacheWatcher
rc.ipCacheWatcher = nil

remoteNodes := rc.remoteNodes
rc.remoteNodes = nil

remoteIdentityCache := rc.remoteIdentityCache
rc.remoteIdentityCache = nil

remoteServices := rc.remoteServices
rc.remoteServices = nil

backend := rc.backend
rc.backend = nil

rc.config = nil

rc.mesh.metricTotalNodes.WithLabelValues(rc.mesh.conf.ClusterName, rc.mesh.nodeName, rc.name).Set(0.0)
rc.mesh.metricReadinessStatus.WithLabelValues(rc.mesh.conf.ClusterName, rc.mesh.nodeName, rc.name).Set(metrics.BoolToFloat64(rc.isReadyLocked()))

rc.mutex.Unlock()
Expand All @@ -146,15 +136,9 @@ func (rc *remoteCluster) releaseOldConnection() {
if ipCacheWatcher != nil {
ipCacheWatcher.Close()
}
if remoteNodes != nil {
remoteNodes.Close(context.TODO())
}
if remoteIdentityCache != nil {
remoteIdentityCache.Close()
}
if remoteServices != nil {
remoteServices.Close(context.TODO())
}
if backend != nil {
backend.Close(context.TODO())
}
Expand Down Expand Up @@ -204,43 +188,28 @@ func (rc *remoteCluster) restartRemoteConnection(allocator RemoteIdentityWatcher
return err
}

remoteNodes, err := store.JoinSharedStore(store.Configuration{
Prefix: path.Join(nodeStore.NodeStorePrefix, rc.name),
KeyCreator: rc.mesh.conf.NodeKeyCreator,
SynchronizationInterval: time.Minute,
SharedKeyDeleteDelay: defaults.NodeDeleteDelay,
Backend: backend,
Observer: rc.mesh.conf.NodeObserver,
})
if err != nil {
backend.Close(ctx)
return err
var capabilities types.CiliumClusterConfigCapabilities
if config != nil {
capabilities = config.Capabilities
}

remoteServices, err := store.JoinSharedStore(store.Configuration{
Prefix: path.Join(serviceStore.ServiceStorePrefix, rc.name),
KeyCreator: func() store.Key {
svc := serviceStore.ClusterService{}
return &svc
},
SynchronizationInterval: time.Minute,
Backend: backend,
Observer: &remoteServiceObserver{
remoteCluster: rc,
swg: rc.swg,
},
})
if err != nil {
remoteNodes.Close(ctx)
backend.Close(ctx)
return err
var mgr store.WatchStoreManager
if capabilities.SyncedCanaries {
mgr = store.NewWatchStoreManagerSync(backend, rc.name)
} else {
mgr = store.NewWatchStoreManagerImmediate(rc.name)
}
rc.swg.Stop()

mgr.Register(nodeStore.NodeStorePrefix, func(ctx context.Context) {
rc.remoteNodes.Watch(ctx, backend, path.Join(nodeStore.NodeStorePrefix, rc.name))
})

mgr.Register(serviceStore.ServiceStorePrefix, func(ctx context.Context) {
rc.remoteServices.Watch(ctx, backend, path.Join(serviceStore.ServiceStorePrefix, rc.name))
})

remoteIdentityCache, err := allocator.WatchRemoteIdentities(rc.name, backend)
if err != nil {
remoteServices.Close(ctx)
remoteNodes.Close(ctx)
backend.Close(ctx)
return err
}
Expand All @@ -249,23 +218,21 @@ func (rc *remoteCluster) restartRemoteConnection(allocator RemoteIdentityWatcher
go ipCacheWatcher.Watch(ctx)

rc.mutex.Lock()
rc.remoteNodes = remoteNodes
rc.remoteServices = remoteServices
rc.backend = backend
rc.config = config
rc.ipCacheWatcher = ipCacheWatcher
rc.remoteIdentityCache = remoteIdentityCache
rc.mesh.metricTotalNodes.WithLabelValues(rc.mesh.conf.ClusterName, rc.mesh.nodeName, rc.name).Set(float64(rc.remoteNodes.NumEntries()))
rc.mesh.metricReadinessStatus.WithLabelValues(rc.mesh.conf.ClusterName, rc.mesh.nodeName, rc.name).Set(metrics.BoolToFloat64(rc.isReadyLocked()))
rc.mutex.Unlock()

rc.getLogger().Info("Established connection to remote etcd")
mgr.Run(ctx)

return nil
},
StopFunc: func(ctx context.Context) error {
rc.releaseOldConnection()
rc.mesh.metricTotalNodes.WithLabelValues(rc.mesh.conf.ClusterName, rc.mesh.nodeName, rc.name).Set(float64(rc.remoteNodes.NumEntries()))

rc.mesh.metricReadinessStatus.WithLabelValues(rc.mesh.conf.ClusterName, rc.mesh.nodeName, rc.name).Set(metrics.BoolToFloat64(rc.isReadyLocked()))
allocator.RemoveRemoteIdentities(rc.name)
rc.getLogger().Info("All resources of remote cluster cleaned up")
Expand Down Expand Up @@ -409,7 +376,6 @@ func (rc *remoteCluster) onInsert(allocator RemoteIdentityWatcher) {
rc.lastFailure = time.Now()
rc.mesh.metricLastFailureTimestamp.WithLabelValues(rc.mesh.conf.ClusterName, rc.mesh.nodeName, rc.name).SetToCurrentTime()
rc.mesh.metricTotalFailures.WithLabelValues(rc.mesh.conf.ClusterName, rc.mesh.nodeName, rc.name).Set(float64(rc.failures))
rc.mesh.metricTotalNodes.WithLabelValues(rc.mesh.conf.ClusterName, rc.mesh.nodeName, rc.name).Set(float64(rc.remoteNodes.NumEntries()))
rc.mesh.metricReadinessStatus.WithLabelValues(rc.mesh.conf.ClusterName, rc.mesh.nodeName, rc.name).Set(metrics.BoolToFloat64(rc.isReadyLocked()))
rc.mutex.Unlock()
rc.restartRemoteConnection(allocator)
Expand All @@ -419,9 +385,23 @@ func (rc *remoteCluster) onInsert(allocator RemoteIdentityWatcher) {

}

func (rc *remoteCluster) onRemove() {
// onStop is executed when the clustermesh subsystem is being stopped.
// In this case, we don't want to drain the known entries, otherwise
// we would break existing connections when the agent gets restarted.
func (rc *remoteCluster) onStop() {
rc.controllers.RemoveAllAndWait()
close(rc.changed)
}

// onRemove is executed when a remote cluster is explicitly disconnected
// (i.e., its configuration is removed). In this case, we need to drain
// all known entries, to properly cleanup the status without requiring to
// restart the agent.
func (rc *remoteCluster) onRemove() {
rc.onStop()

rc.remoteNodes.Drain()
rc.remoteServices.Drain()

rc.getLogger().Info("Remote cluster disconnected")
}
Expand All @@ -434,7 +414,7 @@ func (rc *remoteCluster) isReady() bool {
}

func (rc *remoteCluster) isReadyLocked() bool {
return rc.backend != nil && rc.remoteNodes != nil && rc.ipCacheWatcher != nil
return rc.backend != nil && rc.ipCacheWatcher != nil
}

func (rc *remoteCluster) status() *models.RemoteCluster {
Expand Down

0 comments on commit 150de13

Please sign in to comment.