Skip to content

Commit

Permalink
agent: rework clustermesh config watcher
Browse files Browse the repository at this point in the history
The agent leverages an fsnotify watcher to detect file changes in
the directory containing the etcd configurations for remote clusters
(as well as the associated keys/certs), triggering a reconfiguration
as appropriate.

This commit reworks the above logic to address two main issues:
* Due to how Kubernetes mounts ConfigMaps and Secrets within pods,
  and in particular the usage of symbolic links to handle atomic
  updates, changes in existing files were not detected (since the
  watched file itself was a symbolic link, which doesn't change
  during such operation). Now, an explicit watch operation is started
  for each configuration file (i.e., watching the actual target),
  causing a notification to be properly emitted when that is changed.
* Prevent possible spurious notifications even if the configuration
  is not actually modified. This is achieved through hash comparison.

Integration tests are updated to cover the above two aspects.

Fixes: cilium#23273
Signed-off-by: Marco Iorio <marco.iorio@isovalent.com>
  • Loading branch information
giorio94 authored and borkmann committed Mar 13, 2023
1 parent a1b3fb3 commit 2cdd4ee
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 69 deletions.
125 changes: 85 additions & 40 deletions pkg/clustermesh/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
package clustermesh

import (
"crypto/sha256"
"os"
"path"
"path/filepath"
"strings"

"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
"golang.org/x/exp/slices"
)

// clusterLifecycle is the interface to implement in order to receive cluster
Expand All @@ -19,10 +21,13 @@ type clusterLifecycle interface {
remove(clusterName string)
}

type fhash [sha256.Size]byte

type configDirectoryWatcher struct {
watcher *fsnotify.Watcher
lifecycle clusterLifecycle
path string
tracked map[string]fhash
stop chan struct{}
}

Expand All @@ -40,40 +45,83 @@ func createConfigDirectoryWatcher(path string, lifecycle clusterLifecycle) (*con
return &configDirectoryWatcher{
watcher: watcher,
path: path,
tracked: map[string]fhash{},
lifecycle: lifecycle,
stop: make(chan struct{}),
}, nil
}

func isEtcdConfigFile(path string) bool {
// isEtcdConfigFile returns whether the given path looks like a configuration
// file, and in that case it returns the corresponding hash to detect modifications.
func isEtcdConfigFile(path string) (bool, fhash) {
if info, err := os.Stat(path); err != nil || info.IsDir() {
return false, fhash{}
}

b, err := os.ReadFile(path)
if err != nil {
return false
return false, fhash{}
}

// search for the "endpoints:" string
return strings.Contains(string(b), "endpoints:")
if strings.Contains(string(b), "endpoints:") {
return true, sha256.Sum256(b)
}

return false, fhash{}
}

func (cdw *configDirectoryWatcher) handleAddedFile(name, absolutePath string) {
// A typical directory will look like this:
// lrwxrwxrwx. 1 root root 12 Jul 21 16:32 test5 -> ..data/test5
// lrwxrwxrwx. 1 root root 12 Jul 21 16:32 test7 -> ..data/test7
//
// Ignore all backing files and only read the symlinks
if strings.HasPrefix(name, "..") {
func (cdw *configDirectoryWatcher) handle(abspath string) {
filename := path.Base(abspath)
isConfig, newHash := isEtcdConfigFile(abspath)

if !isConfig {
// If the corresponding cluster was tracked, then trigger the remove
// event, since the configuration file is no longer present/readable
if _, tracked := cdw.tracked[filename]; tracked {
log.WithFields(logrus.Fields{
fieldClusterName: filename,
fieldConfig: abspath,
}).Debug("Removed cluster configuration")

// The remove operation returns an error if the file does no longer exists.
_ = cdw.watcher.Remove(abspath)
delete(cdw.tracked, filename)
cdw.lifecycle.remove(filename)
}

return
}

if !isEtcdConfigFile(absolutePath) {
if !slices.Contains(cdw.watcher.WatchList(), abspath) {
// Start watching explicitly the file. This allows to receive a notification
// when the underlying file gets updated, if path points to a symbolic link.
// This is required to correctly detect file modifications when the folder
// is mounted from a Kubernetes ConfigMap/Secret.
if err := cdw.watcher.Add(abspath); err != nil {
log.WithError(err).WithField(fieldConfig, abspath).
Warning("Failed adding explicit path watch for config")
}
}

oldHash, tracked := cdw.tracked[filename]

// Do not emit spurious notifications if the config file did not change.
if tracked && oldHash == newHash {
return
}

cdw.lifecycle.add(name, absolutePath)
log.WithFields(logrus.Fields{
fieldClusterName: filename,
fieldConfig: abspath,
}).Debug("Added or updated cluster configuration")

cdw.tracked[filename] = newHash
cdw.lifecycle.add(filename, abspath)
}

func (cdw *configDirectoryWatcher) watch() error {
log.WithField(fieldConfig, cdw.path).Debug("Starting config directory watcher")
log.WithField(fieldConfigDir, cdw.path).Debug("Starting config directory watcher")

files, err := os.ReadDir(cdw.path)
if err != nil {
Expand All @@ -86,38 +134,35 @@ func (cdw *configDirectoryWatcher) watch() error {
}

absolutePath := path.Join(cdw.path, f.Name())
cdw.handleAddedFile(f.Name(), absolutePath)
cdw.handle(absolutePath)
}

go func() {
for {
select {
case event := <-cdw.watcher.Events:
name := filepath.Base(event.Name)
log.WithField(fieldClusterName, name).Debugf("Received fsnotify event: %+v", event)
switch {
case event.Has(fsnotify.Create),
event.Has(fsnotify.Write),
event.Has(fsnotify.Chmod):
cdw.handleAddedFile(name, event.Name)
case event.Has(fsnotify.Remove),
event.Has(fsnotify.Rename):
cdw.lifecycle.remove(name)
}

case err := <-cdw.watcher.Errors:
log.WithError(err).WithField("path", cdw.path).Warning("error encountered while watching directory with fsnotify")

case <-cdw.stop:
return
}
}
}()

go cdw.loop()
return nil
}

func (cdw *configDirectoryWatcher) loop() {
for {
select {
case event := <-cdw.watcher.Events:
log.WithFields(logrus.Fields{
fieldConfigDir: cdw.path,
fieldEvent: event,
}).Debug("Received fsnotify event")
cdw.handle(event.Name)

case err := <-cdw.watcher.Errors:
log.WithError(err).WithField(fieldConfigDir, cdw.path).
Warning("Error encountered while watching directory with fsnotify")

case <-cdw.stop:
return
}
}
}

func (cdw *configDirectoryWatcher) close() {
log.WithField(fieldConfigDir, cdw.path).Debug("Stopping config directory watcher")
close(cdw.stop)
cdw.watcher.Close()
}
110 changes: 81 additions & 29 deletions pkg/clustermesh/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,24 @@ package clustermesh

import (
"context"
"crypto/sha256"
"os"
"path"
"time"

. "gopkg.in/check.v1"

"github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/testutils"
testidentity "github.com/cilium/cilium/pkg/testutils/identity"
)

func createFile(c *C, name string) {
err := os.WriteFile(name, []byte("endpoints:\n- https://cluster1.cilium-etcd.cilium.svc:2379\n"), 0644)
const (
content1 = "endpoints:\n- https://cluster1.cilium-etcd.cilium.svc:2379\n"
content2 = "endpoints:\n- https://cluster1.cilium-etcd.cilium.svc:2380\n"
)

func writeFile(c *C, name, content string) {
err := os.WriteFile(name, []byte(content), 0644)
c.Assert(err, IsNil)
}

Expand All @@ -43,6 +47,19 @@ func expectChange(c *C, cm *ClusterMesh, name string) {
}
}

func expectNoChange(c *C, cm *ClusterMesh, name string) {
cm.mutex.RLock()
cluster := cm.clusters[name]
cm.mutex.RUnlock()
c.Assert(cluster, Not(IsNil))

select {
case <-cluster.changed:
c.Fatal("unexpected changed event detected")
case <-time.After(100 * time.Millisecond):
}
}

func expectNotExist(c *C, cm *ClusterMesh, name string) {
cm.mutex.RLock()
defer cm.mutex.RUnlock()
Expand All @@ -55,20 +72,32 @@ func (s *ClusterMeshTestSuite) TestWatchConfigDirectory(c *C) {
skipKvstoreConnection = false
}()

dir, err := os.MkdirTemp("", "multicluster")
baseDir, err := os.MkdirTemp("", "multicluster")
c.Assert(err, IsNil)
defer os.RemoveAll(dir)
defer os.RemoveAll(baseDir)

dataDir := path.Join(baseDir, "..data")
dataDirTmp := path.Join(baseDir, "..data_tmp")
dataDir1 := path.Join(baseDir, "..data-1")
dataDir2 := path.Join(baseDir, "..data-2")
dataDir3 := path.Join(baseDir, "..data-3")

c.Assert(os.Symlink(dataDir1, dataDir), IsNil)
c.Assert(os.Mkdir(dataDir1, 0755), IsNil)
c.Assert(os.Mkdir(dataDir2, 0755), IsNil)
c.Assert(os.Mkdir(dataDir3, 0755), IsNil)

file1 := path.Join(dir, "cluster1")
file2 := path.Join(dir, "cluster2")
file3 := path.Join(dir, "cluster3")
file1 := path.Join(baseDir, "cluster1")
file2 := path.Join(baseDir, "cluster2")
file3 := path.Join(baseDir, "cluster3")

createFile(c, file1)
createFile(c, file2)
writeFile(c, file1, content1)
writeFile(c, path.Join(dataDir1, "cluster2"), content1)
writeFile(c, path.Join(dataDir2, "cluster2"), content2)
writeFile(c, path.Join(dataDir3, "cluster2"), content1)

mgr := cache.NewCachingIdentityAllocator(&testidentity.IdentityAllocatorOwnerMock{})
// The nils are only used by k8s CRD identities. We default to kvstore.
<-mgr.InitIdentityAllocator(nil, nil)
// Create an indirect link, as in case of Kubernetes COnfigMaps/Secret mounted inside pods.
c.Assert(os.Symlink(path.Join(dataDir, "cluster2"), file2), IsNil)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -77,11 +106,10 @@ func (s *ClusterMeshTestSuite) TestWatchConfigDirectory(c *C) {
})
defer ipc.Shutdown()
cm, err := NewClusterMesh(Configuration{
Name: "test1",
ConfigDirectory: dir,
NodeKeyCreator: testNodeCreator,
RemoteIdentityWatcher: mgr,
IPCache: ipc,
Name: "test1",
ConfigDirectory: baseDir,
NodeKeyCreator: testNodeCreator,
IPCache: ipc,
})
c.Assert(err, IsNil)
c.Assert(cm, Not(IsNil))
Expand All @@ -107,7 +135,7 @@ func (s *ClusterMeshTestSuite) TestWatchConfigDirectory(c *C) {
return len(cm.clusters) == 1
}, time.Second), IsNil)

createFile(c, file3)
writeFile(c, file3, content1)

// wait for cluster3 to appear
c.Assert(testutils.WaitUntil(func() bool {
Expand All @@ -133,12 +161,21 @@ func (s *ClusterMeshTestSuite) TestWatchConfigDirectory(c *C) {
expectNotExist(c, cm, "cluster3")

// touch file
err = os.Chtimes(file1, time.Now(), time.Now())
c.Assert(err, IsNil)

// give time for events to be processed
time.Sleep(100 * time.Millisecond)
expectChange(c, cm, "cluster1")
c.Assert(os.Chtimes(file1, time.Now(), time.Now()), IsNil)
expectNoChange(c, cm, "cluster1")

// update file content changing the symlink target, adopting
// the same approach of the kubelet on ConfigMap/Secret update
c.Assert(os.Symlink(dataDir2, dataDirTmp), IsNil)
c.Assert(os.Rename(dataDirTmp, dataDir), IsNil)
c.Assert(os.RemoveAll(dataDir1), IsNil)
expectChange(c, cm, "cluster2")

// update file content once more
c.Assert(os.Symlink(dataDir3, dataDirTmp), IsNil)
c.Assert(os.Rename(dataDirTmp, dataDir), IsNil)
c.Assert(os.RemoveAll(dataDir2), IsNil)
expectChange(c, cm, "cluster2")

err = os.RemoveAll(file1)
c.Assert(err, IsNil)
Expand All @@ -155,6 +192,10 @@ func (s *ClusterMeshTestSuite) TestWatchConfigDirectory(c *C) {
expectNotExist(c, cm, "cluster2")
expectNotExist(c, cm, "cluster3")

// Ensure that per-config watches are removed properly
wl := cm.configWatcher.watcher.WatchList()
c.Assert(wl, HasLen, 1)
c.Assert(wl[0], Equals, baseDir)
}

func (s *ClusterMeshTestSuite) TestIsEtcdConfigFile(c *C) {
Expand All @@ -163,12 +204,23 @@ func (s *ClusterMeshTestSuite) TestIsEtcdConfigFile(c *C) {
defer os.RemoveAll(dir)

validPath := path.Join(dir, "valid")
err = os.WriteFile(validPath, []byte("endpoints:\n- https://cluster1.cilium-etcd.cilium.svc:2379\n"), 0644)
content := []byte("endpoints:\n- https://cluster1.cilium-etcd.cilium.svc:2379\n")
err = os.WriteFile(validPath, content, 0644)
c.Assert(err, IsNil)
c.Assert(isEtcdConfigFile(validPath), Equals, true)

isConfig, hash := isEtcdConfigFile(validPath)
c.Assert(isConfig, Equals, true)
c.Assert(hash, Equals, fhash(sha256.Sum256(content)))

invalidPath := path.Join(dir, "valid")
err = os.WriteFile(invalidPath, []byte("sf324kj234lkjsdvl\nwl34kj23l4k\nendpoints"), 0644)
c.Assert(err, IsNil)
c.Assert(isEtcdConfigFile(invalidPath), Equals, false)

isConfig, hash = isEtcdConfigFile(validPath)
c.Assert(isConfig, Equals, false)
c.Assert(hash, Equals, fhash{})

isConfig, hash = isEtcdConfigFile(dir)
c.Assert(isConfig, Equals, false)
c.Assert(hash, Equals, fhash{})
}
2 changes: 2 additions & 0 deletions pkg/clustermesh/logfields.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ var log = logging.DefaultLogger.WithField(logfields.LogSubsys, "clustermesh")
const (
fieldClusterName = "clusterName"
fieldConfig = "config"
fieldConfigDir = "configDir"
fieldEvent = "event"
fieldKVStoreStatus = "kvstoreStatus"
fieldKVStoreErr = "kvstoreErr"
)

0 comments on commit 2cdd4ee

Please sign in to comment.