diff --git a/aitelemetry/telemetrywrapper.go b/aitelemetry/telemetrywrapper.go index 00148572aa..94749bc7dd 100644 --- a/aitelemetry/telemetrywrapper.go +++ b/aitelemetry/telemetrywrapper.go @@ -7,6 +7,7 @@ import ( "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/log" + "github.com/Azure/azure-container-networking/processlock" "github.com/Azure/azure-container-networking/store" "github.com/microsoft/ApplicationInsights-Go/appinsights" ) @@ -101,17 +102,31 @@ func getMetadata(th *telemetryHandle) { th.metadata = metadata th.rwmutex.Unlock() + lockclient, err := processlock.NewFileLock(metadataFile + store.LockExtension) + if err != nil { + log.Printf("Error initializing file lock:%v", err) + return + } + // Save metadata retrieved from wireserver to a file - kvs, err := store.NewJsonFileStore(metadataFile) + kvs, err := store.NewJsonFileStore(metadataFile, lockclient) if err != nil { debugLog("[AppInsights] Error initializing kvs store: %v", err) return } - kvs.Lock(true) - err = common.SaveHostMetadata(th.metadata, metadataFile) - kvs.Unlock(true) + err = kvs.Lock(store.DefaultLockTimeout) if err != nil { + log.Errorf("getMetadata: Not able to acquire lock:%v", err) + return + } + metadataErr := common.SaveHostMetadata(th.metadata, metadataFile) + err = kvs.Unlock() + if err != nil { + log.Errorf("getMetadata: Not able to release lock:%v", err) + } + + if metadataErr != nil { debugLog("[AppInsights] saving host metadata failed with :%v", err) } } @@ -284,6 +299,7 @@ func (th *telemetryHandle) TrackMetric(metric Metric) { aimetric.Properties[versionStr] = th.appVersion aimetric.Properties[resourceGroupStr] = th.metadata.ResourceGroupName aimetric.Properties[vmIDStr] = metadata.VMID + aimetric.Properties[osStr] = runtime.GOOS aimetric.Tags.Session().SetId(metadata.VMID) } diff --git a/cni/ipam/plugin/main.go b/cni/ipam/plugin/main.go index 2a7538b3b4..e4e7812bb2 100644 --- a/cni/ipam/plugin/main.go +++ b/cni/ipam/plugin/main.go @@ -43,20 +43,12 @@ func main() { if err := ipamPlugin.Plugin.InitializeKeyValueStore(&config); err != nil { fmt.Printf("Failed to initialize key-value store of ipam plugin, err:%v.\n", err) - - if isSafe, _ := ipamPlugin.Plugin.IsSafeToRemoveLock(ipamPlugin.Plugin.Name); isSafe { - log.Printf("[IPAM] Removing lock file as process holding lock exited") - if errUninit := ipamPlugin.Plugin.UninitializeKeyValueStore(true); errUninit != nil { - log.Errorf("Failed to uninitialize key-value store of network plugin, err:%v.\n", errUninit) - } - } - os.Exit(1) } defer func() { - if errUninit := ipamPlugin.Plugin.UninitializeKeyValueStore(false); errUninit != nil { - fmt.Printf("Failed to uninitialize key-value store of ipam plugin, err:%v.\n", err) + if errUninit := ipamPlugin.Plugin.UninitializeKeyValueStore(); errUninit != nil { + fmt.Printf("Failed to uninitialize key-value store of ipam plugin, err:%v.\n", errUninit) } if recover() != nil { diff --git a/cni/ipam/pluginv6/main.go b/cni/ipam/pluginv6/main.go index cb93640c48..f206498ce7 100644 --- a/cni/ipam/pluginv6/main.go +++ b/cni/ipam/pluginv6/main.go @@ -43,20 +43,12 @@ func main() { if err := ipamPlugin.Plugin.InitializeKeyValueStore(&config); err != nil { fmt.Printf("Failed to initialize key-value store of ipam plugin, err:%v.\n", err) - - if isSafe, _ := ipamPlugin.Plugin.IsSafeToRemoveLock(ipamPlugin.Plugin.Name); isSafe { - log.Printf("[IPAM] Removing lock file as process holding lock exited") - if errUninit := ipamPlugin.Plugin.UninitializeKeyValueStore(true); errUninit != nil { - log.Errorf("Failed to uninitialize key-value store of network plugin, err:%v.\n", errUninit) - } - } - os.Exit(1) } defer func() { - if errUninit := ipamPlugin.Plugin.UninitializeKeyValueStore(false); errUninit != nil { - fmt.Printf("Failed to uninitialize key-value store of ipam plugin, err:%v.\n", err) + if errUninit := ipamPlugin.Plugin.UninitializeKeyValueStore(); errUninit != nil { + fmt.Printf("Failed to uninitialize key-value store of ipam plugin, err:%v.\n", errUninit) } if recover() != nil { diff --git a/cni/network/plugin/main.go b/cni/network/plugin/main.go index 01aa938846..34eaca0a64 100644 --- a/cni/network/plugin/main.go +++ b/cni/network/plugin/main.go @@ -11,6 +11,7 @@ import ( "reflect" "time" + "github.com/Azure/azure-container-networking/aitelemetry" "github.com/Azure/azure-container-networking/cni" "github.com/Azure/azure-container-networking/cni/network" "github.com/Azure/azure-container-networking/common" @@ -18,8 +19,10 @@ import ( acnnetwork "github.com/Azure/azure-container-networking/network" "github.com/Azure/azure-container-networking/nns" "github.com/Azure/azure-container-networking/platform" + "github.com/Azure/azure-container-networking/store" "github.com/Azure/azure-container-networking/telemetry" "github.com/containernetworking/cni/pkg/skel" + "github.com/pkg/errors" ) const ( @@ -139,14 +142,13 @@ func main() { var ( config common.PluginConfig - err error logDirectory string // This sets empty string i.e. current location tb *telemetry.TelemetryBuffer ) log.SetName(name) log.SetLevel(log.LevelInfo) - if err = log.SetTargetLogDirectory(log.TargetLogfile, logDirectory); err != nil { + if err := log.SetTargetLogDirectory(log.TargetLogfile, logDirectory); err != nil { fmt.Printf("Failed to setup cni logging: %v\n", err) return } @@ -187,7 +189,8 @@ func main() { cniReport.GetReport(pluginName, version, ipamQueryURL) - upTime, err := platform.GetLastRebootTime() + var upTime time.Time + upTime, err = platform.GetLastRebootTime() if err == nil { cniReport.VMUptime = upTime.Format("2006-01-02 15:04:05") } @@ -195,24 +198,33 @@ func main() { // CNI Acquires lock if err = netPlugin.Plugin.InitializeKeyValueStore(&config); err != nil { log.Errorf("Failed to initialize key-value store of network plugin, err:%v.\n", err) - tb := telemetry.NewTelemetryBuffer() - if tberr := tb.Connect(); tberr == nil { - reportPluginError(reportManager, tb, err) - tb.Close() + tb = telemetry.NewTelemetryBuffer() + if tberr := tb.Connect(); tberr != nil { + log.Errorf("Cannot connect to telemetry service:%v", tberr) + return } - if isSafe, _ := netPlugin.Plugin.IsSafeToRemoveLock(name); isSafe { - log.Printf("[CNI] Removing lock file as process holding lock exited") - if errUninit := netPlugin.Plugin.UninitializeKeyValueStore(true); errUninit != nil { - log.Errorf("Failed to uninitialize key-value store of network plugin, err:%v.\n", errUninit) + reportPluginError(reportManager, tb, err) + + if errors.Is(err, store.ErrTimeoutLockingStore) { + var cniMetric telemetry.AIMetric + cniMetric.Metric = aitelemetry.Metric{ + Name: telemetry.CNILockTimeoutStr, + Value: 1.0, + CustomDimensions: make(map[string]string), + } + err = telemetry.SendCNIMetric(&cniMetric, tb) + if err != nil { + log.Errorf("Couldn't send cnilocktimeout metric: %v", err) } } + tb.Close() return } defer func() { - if errUninit := netPlugin.Plugin.UninitializeKeyValueStore(false); errUninit != nil { + if errUninit := netPlugin.Plugin.UninitializeKeyValueStore(); errUninit != nil { log.Errorf("Failed to uninitialize key-value store of network plugin, err:%v.\n", errUninit) } @@ -270,7 +282,7 @@ func main() { netPlugin.Stop() // release cni lock - if errUninit := netPlugin.Plugin.UninitializeKeyValueStore(false); errUninit != nil { + if errUninit := netPlugin.Plugin.UninitializeKeyValueStore(); errUninit != nil { log.Errorf("Failed to uninitialize key-value store of network plugin, err:%v.\n", errUninit) } diff --git a/cni/plugin.go b/cni/plugin.go index f4caf461a1..077a5f7eee 100644 --- a/cni/plugin.go +++ b/cni/plugin.go @@ -6,15 +6,14 @@ package cni import ( "context" "fmt" - "io/ioutil" "os" "runtime" "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/platform" + "github.com/Azure/azure-container-networking/processlock" "github.com/Azure/azure-container-networking/store" - cniInvoke "github.com/containernetworking/cni/pkg/invoke" cniSkel "github.com/containernetworking/cni/pkg/skel" cniTypes "github.com/containernetworking/cni/pkg/types" @@ -155,8 +154,13 @@ func (plugin *Plugin) Errorf(format string, args ...interface{}) *cniTypes.Error func (plugin *Plugin) InitializeKeyValueStore(config *common.PluginConfig) error { // Create the key value store. if plugin.Store == nil { - var err error - plugin.Store, err = store.NewJsonFileStore(platform.CNIRuntimePath + plugin.Name + ".json") + lockclient, err := processlock.NewFileLock(platform.CNILockPath + plugin.Name + store.LockExtension) + if err != nil { + log.Printf("[cni] Error initializing file lock:%v", err) + return errors.Wrap(err, "error creating new filelock") + } + + plugin.Store, err = store.NewJsonFileStore(platform.CNIRuntimePath+plugin.Name+".json", lockclient) if err != nil { log.Printf("[cni] Failed to create store: %v.", err) return err @@ -164,7 +168,7 @@ func (plugin *Plugin) InitializeKeyValueStore(config *common.PluginConfig) error } // Acquire store lock. - if err := plugin.Store.Lock(true); err != nil { + if err := plugin.Store.Lock(store.DefaultLockTimeout); err != nil { log.Printf("[cni] Failed to lock store: %v.", err) return err } @@ -175,9 +179,9 @@ func (plugin *Plugin) InitializeKeyValueStore(config *common.PluginConfig) error } // Uninitialize key-value store -func (plugin *Plugin) UninitializeKeyValueStore(force bool) error { +func (plugin *Plugin) UninitializeKeyValueStore() error { if plugin.Store != nil { - err := plugin.Store.Unlock(force) + err := plugin.Store.Unlock() if err != nil { log.Printf("[cni] Failed to unlock store: %v.", err) return err @@ -187,67 +191,3 @@ func (plugin *Plugin) UninitializeKeyValueStore(force bool) error { return nil } - -// check if safe to remove lockfile -func (plugin *Plugin) IsSafeToRemoveLock(processName string) (bool, error) { - if plugin != nil && plugin.Store != nil { - // check if get process command supported - if cmdErr := platform.GetProcessSupport(); cmdErr != nil { - log.Errorf("Get process cmd not supported. Error %v", cmdErr) - return false, cmdErr - } - - lockFileName := plugin.Store.GetLockFileName() - // Read pid from lockfile - lockFilePid, err := plugin.readLockFile(lockFileName) - if err != nil { - return false, errors.Wrap(err, "IsSafeToRemoveLock lockfile read failed") - } - - log.Printf("Read from lockfile:%s", lockFilePid) - // Get the process name if running and - // check if that matches with our expected process - // if it returns non-nil error then process is not running - pName, err := platform.GetProcessNameByID(lockFilePid) - if err != nil { - var content string - content, err = plugin.readLockFile(lockFileName) - if err != nil { - return false, errors.Wrap(err, "IsSafeToRemoveLock lockfile 2nd read failed") - } - - // pid in lockfile changed after getprocessnamebyid call. so some other process acquired lockfile in between. - // so its not safe to remove lockfile - if string(content) != lockFilePid { - log.Printf("Lockfile content changed from %s to %s. So not safe to remove lockfile", lockFilePid, content) - return false, nil - } - - return true, nil - } - - log.Printf("[CNI] Process name is %s", pName) - - if pName != processName { - return true, nil - } - } - - log.Errorf("Plugin store is nil") - return false, fmt.Errorf("plugin store nil") -} - -func (plugin *Plugin) readLockFile(filename string) (string, error) { - content, err := ioutil.ReadFile(filename) - if err != nil { - log.Errorf("Failed to read lockfile :%v", err) - return "", fmt.Errorf("readLockFile error:%w", err) - } - - if len(content) == 0 { - log.Errorf("Num bytes read from lockfile is 0") - return "", errEmptyContent - } - - return string(content), nil -} diff --git a/cni/plugin_test.go b/cni/plugin_test.go deleted file mode 100644 index 7c60cfaf69..0000000000 --- a/cni/plugin_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package cni - -import ( - "os" - "testing" - - "github.com/Azure/azure-container-networking/common" - "github.com/Azure/azure-container-networking/store" - "github.com/stretchr/testify/require" -) - -func TestMain(m *testing.M) { - // Run tests. - exitCode := m.Run() - os.Exit(exitCode) -} - -func TestPluginSafeToRemoveLock(t *testing.T) { - tests := []struct { - name string - plugin Plugin - processName string - wantIsSafe bool - wantErr bool - }{ - { - name: "Safe to remove lock-true. Process name does not match", - plugin: Plugin{ - Plugin: &common.Plugin{ - Name: "cni", - Version: "0.3.0", - Store: store.NewMockStore("testfiles/processfound.lock"), - }, - version: "0.3.0", - }, - processName: "azure-vnet", - wantIsSafe: true, - wantErr: false, - }, - { - name: "Safe to remove lock-true. Process not running", - plugin: Plugin{ - Plugin: &common.Plugin{ - Name: "cni", - Version: "0.3.0", - Store: store.NewMockStore("testfiles/processnotfound.lock"), - }, - version: "0.3.0", - }, - processName: "azure-vnet", - wantIsSafe: true, - wantErr: false, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - isSafe, err := tt.plugin.IsSafeToRemoveLock(tt.processName) - if tt.wantErr { - require.Error(t, err) - require.Equal(t, tt.wantIsSafe, isSafe) - } else { - require.NoError(t, err) - require.Equal(t, tt.wantIsSafe, isSafe) - } - }) - } -} diff --git a/cnm/plugin/main.go b/cnm/plugin/main.go index 99d2832504..4ef75b2a3b 100644 --- a/cnm/plugin/main.go +++ b/cnm/plugin/main.go @@ -14,6 +14,7 @@ import ( "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/platform" + "github.com/Azure/azure-container-networking/processlock" "github.com/Azure/azure-container-networking/store" ) @@ -158,9 +159,15 @@ func main() { return } + lockclient, err := processlock.NewFileLock(platform.CNILockPath + name + store.LockExtension) + if err != nil { + log.Printf("Error initializing file lock:%v", err) + return + } + // Create the key value store. storeFileName := storeFileLocation + name + ".json" - config.Store, err = store.NewJsonFileStore(storeFileName) + config.Store, err = store.NewJsonFileStore(storeFileName, lockclient) if err != nil { log.Errorf("Failed to create store file: %s, due to error %v\n", storeFileName, err) return diff --git a/cnms/service/networkmonitor.go b/cnms/service/networkmonitor.go index 8020bc3a53..5ab46a499c 100644 --- a/cnms/service/networkmonitor.go +++ b/cnms/service/networkmonitor.go @@ -15,6 +15,7 @@ import ( "github.com/Azure/azure-container-networking/netlink" "github.com/Azure/azure-container-networking/network" "github.com/Azure/azure-container-networking/platform" + "github.com/Azure/azure-container-networking/processlock" "github.com/Azure/azure-container-networking/store" "github.com/Azure/azure-container-networking/telemetry" ) @@ -141,8 +142,15 @@ func main() { tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds) defer tb.Close() + var lockclient processlock.Interface for { - config.Store, err = store.NewJsonFileStore(platform.CNIRuntimePath + pluginName + ".json") + lockclient, err = processlock.NewFileLock(platform.CNILockPath + pluginName + store.LockExtension) + if err != nil { + log.Printf("Error initializing file lock:%v", err) + return + } + + config.Store, err = store.NewJsonFileStore(platform.CNIRuntimePath+pluginName+".json", lockclient) if err != nil { fmt.Printf("[monitor] Failed to create store: %v\n", err) return diff --git a/cns/restserver/api_test.go b/cns/restserver/api_test.go index d311cc25e9..3c61b1fe53 100644 --- a/cns/restserver/api_test.go +++ b/cns/restserver/api_test.go @@ -17,8 +17,6 @@ import ( "strings" "testing" - "github.com/Azure/azure-container-networking/store" - "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/common" "github.com/Azure/azure-container-networking/cns/fakes" @@ -26,6 +24,8 @@ import ( "github.com/Azure/azure-container-networking/cns/nmagent" "github.com/Azure/azure-container-networking/cns/types" acncommon "github.com/Azure/azure-container-networking/common" + "github.com/Azure/azure-container-networking/processlock" + "github.com/Azure/azure-container-networking/store" ) const ( @@ -911,7 +911,7 @@ func startService() error { // Create the service. config := common.ServiceConfig{} // Create the key value store. - if config.Store, err = store.NewJsonFileStore(cnsJsonFileName); err != nil { + if config.Store, err = store.NewJsonFileStore(cnsJsonFileName, processlock.NewMockFileLock(false)); err != nil { logger.Errorf("Failed to create store file: %s, due to error %v\n", cnsJsonFileName, err) return err } diff --git a/cns/service/main.go b/cns/service/main.go index eb15a576a1..1e5cbbfbf7 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -23,7 +23,6 @@ import ( "github.com/Azure/azure-container-networking/cns" cnscli "github.com/Azure/azure-container-networking/cns/cmd/cli" "github.com/Azure/azure-container-networking/cns/cnireconciler" - cni "github.com/Azure/azure-container-networking/cns/cnireconciler" "github.com/Azure/azure-container-networking/cns/common" "github.com/Azure/azure-container-networking/cns/configuration" "github.com/Azure/azure-container-networking/cns/hnsclient" @@ -42,6 +41,7 @@ import ( "github.com/Azure/azure-container-networking/crd/nodenetworkconfig/api/v1alpha" "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/platform" + "github.com/Azure/azure-container-networking/processlock" localtls "github.com/Azure/azure-container-networking/server/tls" "github.com/Azure/azure-container-networking/store" "github.com/avast/retry-go/v3" @@ -468,9 +468,15 @@ func main() { return } + lockclient, err := processlock.NewFileLock(platform.CNILockPath + name + store.LockExtension) + if err != nil { + log.Printf("Error initializing file lock:%v", err) + return + } + // Create the key value store. storeFileName := storeFileLocation + name + ".json" - config.Store, err = store.NewJsonFileStore(storeFileName) + config.Store, err = store.NewJsonFileStore(storeFileName, lockclient) if err != nil { logger.Errorf("Failed to create store file: %s, due to error %v\n", storeFileName, err) return @@ -538,7 +544,8 @@ func main() { // If so, we should check that the the CNI is new enough to support the state commands, // otherwise we fall back to the existing behavior. if cnsconfig.InitializeFromCNI { - isGoodVer, err := cni.IsDumpStateVer() + var isGoodVer bool + isGoodVer, err = cnireconciler.IsDumpStateVer() if err != nil { logger.Errorf("error checking CNI ver: %v", err) } @@ -623,8 +630,11 @@ func main() { }(privateEndpoint, infravnet, nodeID) } - var netPlugin network.NetPlugin - var ipamPlugin ipam.IpamPlugin + var ( + netPlugin network.NetPlugin + ipamPlugin ipam.IpamPlugin + lockclientCnm processlock.Interface + ) if startCNM { var pluginConfig acn.PluginConfig @@ -647,9 +657,15 @@ func main() { return } + lockclientCnm, err = processlock.NewFileLock(platform.CNILockPath + pluginName + store.LockExtension) + if err != nil { + log.Printf("Error initializing file lock:%v", err) + return + } + // Create the key value store. pluginStoreFile := storeFileLocation + pluginName + ".json" - pluginConfig.Store, err = store.NewJsonFileStore(pluginStoreFile) + pluginConfig.Store, err = store.NewJsonFileStore(pluginStoreFile, lockclientCnm) if err != nil { logger.Errorf("Failed to create plugin store file %s, due to error : %v\n", pluginStoreFile, err) return @@ -700,6 +716,14 @@ func main() { logger.Printf("stop ipam plugin") ipamPlugin.Stop() } + + if err = lockclientCnm.Unlock(); err != nil { + log.Errorf("lockclient cnm unlock error:%v", err) + } + } + + if err = lockclient.Unlock(); err != nil { + log.Errorf("lockclient cns unlock error:%v", err) } logger.Printf("CNS exited") diff --git a/internal/lockedfile/internal/filelock/filelock.go b/internal/lockedfile/internal/filelock/filelock.go new file mode 100644 index 0000000000..7a174ef1b4 --- /dev/null +++ b/internal/lockedfile/internal/filelock/filelock.go @@ -0,0 +1,102 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package filelock provides a platform-independent API for advisory file +// locking. Calls to functions in this package on platforms that do not support +// advisory locks will return errors for which IsNotSupported returns true. + +//nolint +package filelock + +import ( + "errors" + "io/fs" + "os" +) + +// A File provides the minimal set of methods required to lock an open file. +// File implementations must be usable as map keys. +// The usual implementation is *os.File. +type File interface { + // Name returns the name of the file. + Name() string + + // Fd returns a valid file descriptor. + // (If the File is an *os.File, it must not be closed.) + Fd() uintptr + + // Stat returns the FileInfo structure describing file. + Stat() (fs.FileInfo, error) +} + +// Lock places an advisory write lock on the file, blocking until it can be +// locked. +// +// If Lock returns nil, no other process will be able to place a read or write +// lock on the file until this process exits, closes f, or calls Unlock on it. +// +// If f's descriptor is already read- or write-locked, the behavior of Lock is +// unspecified. +// +// Closing the file may or may not release the lock promptly. Callers should +// ensure that Unlock is always called when Lock succeeds. +func Lock(f File) error { + return lock(f, writeLock) +} + +// RLock places an advisory read lock on the file, blocking until it can be locked. +// +// If RLock returns nil, no other process will be able to place a write lock on +// the file until this process exits, closes f, or calls Unlock on it. +// +// If f is already read- or write-locked, the behavior of RLock is unspecified. +// +// Closing the file may or may not release the lock promptly. Callers should +// ensure that Unlock is always called if RLock succeeds. +func RLock(f File) error { + return lock(f, readLock) +} + +// Unlock removes an advisory lock placed on f by this process. +// +// The caller must not attempt to unlock a file that is not locked. +func Unlock(f File) error { + return unlock(f) +} + +// String returns the name of the function corresponding to lt +// (Lock, RLock, or Unlock). +func (lt lockType) String() string { + switch lt { + case readLock: + return "RLock" + case writeLock: + return "Lock" + default: + return "Unlock" + } +} + +// IsNotSupported returns a boolean indicating whether the error is known to +// report that a function is not supported (possibly for a specific input). +// It is satisfied by ErrNotSupported as well as some syscall errors. +func IsNotSupported(err error) bool { + return isNotSupported(underlyingError(err)) +} + +// ErrNotSupported - operation not supported +var ErrNotSupported = errors.New("operation not supported") + +// underlyingError returns the underlying error for known os error types. +func underlyingError(err error) error { + switch err := err.(type) { + case *fs.PathError: + return err.Err + case *os.LinkError: + return err.Err + case *os.SyscallError: + return err.Err + } + return err +} diff --git a/internal/lockedfile/internal/filelock/filelock_fcntl.go b/internal/lockedfile/internal/filelock/filelock_fcntl.go new file mode 100644 index 0000000000..a37b2ad6d1 --- /dev/null +++ b/internal/lockedfile/internal/filelock/filelock_fcntl.go @@ -0,0 +1,217 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build aix || (solaris && !illumos) +// +build aix solaris,!illumos + +// This code implements the filelock API using POSIX 'fcntl' locks, which attach +// to an (inode, process) pair rather than a file descriptor. To avoid unlocking +// files prematurely when the same file is opened through different descriptors, +// we allow only one read-lock at a time. +// +// Most platforms provide some alternative API, such as an 'flock' system call +// or an F_OFD_SETLK command for 'fcntl', that allows for better concurrency and +// does not require per-inode bookkeeping in the application. + +package filelock + +import ( + "errors" + "io" + "io/fs" + "math/rand" + "sync" + "syscall" + "time" +) + +type lockType int16 + +const ( + readLock lockType = syscall.F_RDLCK + writeLock lockType = syscall.F_WRLCK +) + +type inode = uint64 // type of syscall.Stat_t.Ino + +type inodeLock struct { + owner File + queue []<-chan File +} + +type token struct{} + +var ( + mu sync.Mutex + inodes = map[File]inode{} + locks = map[inode]inodeLock{} +) + +func lock(f File, lt lockType) (err error) { + // POSIX locks apply per inode and process, and the lock for an inode is + // released when *any* descriptor for that inode is closed. So we need to + // synchronize access to each inode internally, and must serialize lock and + // unlock calls that refer to the same inode through different descriptors. + fi, err := f.Stat() + if err != nil { + return err + } + ino := fi.Sys().(*syscall.Stat_t).Ino + + mu.Lock() + if i, dup := inodes[f]; dup && i != ino { + mu.Unlock() + return &fs.PathError{ + Op: lt.String(), + Path: f.Name(), + Err: errors.New("inode for file changed since last Lock or RLock"), + } + } + inodes[f] = ino + + var wait chan File + l := locks[ino] + if l.owner == f { + // This file already owns the lock, but the call may change its lock type. + } else if l.owner == nil { + // No owner: it's ours now. + l.owner = f + } else { + // Already owned: add a channel to wait on. + wait = make(chan File) + l.queue = append(l.queue, wait) + } + locks[ino] = l + mu.Unlock() + + if wait != nil { + wait <- f + } + + // Spurious EDEADLK errors arise on platforms that compute deadlock graphs at + // the process, rather than thread, level. Consider processes P and Q, with + // threads P.1, P.2, and Q.3. The following trace is NOT a deadlock, but will be + // reported as a deadlock on systems that consider only process granularity: + // + // P.1 locks file A. + // Q.3 locks file B. + // Q.3 blocks on file A. + // P.2 blocks on file B. (This is erroneously reported as a deadlock.) + // P.1 unlocks file A. + // Q.3 unblocks and locks file A. + // Q.3 unlocks files A and B. + // P.2 unblocks and locks file B. + // P.2 unlocks file B. + // + // These spurious errors were observed in practice on AIX and Solaris in + // cmd/go: see https://golang.org/issue/32817. + // + // We work around this bug by treating EDEADLK as always spurious. If there + // really is a lock-ordering bug between the interacting processes, it will + // become a livelock instead, but that's not appreciably worse than if we had + // a proper flock implementation (which generally does not even attempt to + // diagnose deadlocks). + // + // In the above example, that changes the trace to: + // + // P.1 locks file A. + // Q.3 locks file B. + // Q.3 blocks on file A. + // P.2 spuriously fails to lock file B and goes to sleep. + // P.1 unlocks file A. + // Q.3 unblocks and locks file A. + // Q.3 unlocks files A and B. + // P.2 wakes up and locks file B. + // P.2 unlocks file B. + // + // We know that the retry loop will not introduce a *spurious* livelock + // because, according to the POSIX specification, EDEADLK is only to be + // returned when “the lock is blocked by a lock from another process”. + // If that process is blocked on some lock that we are holding, then the + // resulting livelock is due to a real deadlock (and would manifest as such + // when using, for example, the flock implementation of this package). + // If the other process is *not* blocked on some other lock that we are + // holding, then it will eventually release the requested lock. + + nextSleep := 1 * time.Millisecond + const maxSleep = 500 * time.Millisecond + for { + err = setlkw(f.Fd(), lt) + if err != syscall.EDEADLK { + break + } + time.Sleep(nextSleep) + + nextSleep += nextSleep + if nextSleep > maxSleep { + nextSleep = maxSleep + } + // Apply 10% jitter to avoid synchronizing collisions when we finally unblock. + nextSleep += time.Duration((0.1*rand.Float64() - 0.05) * float64(nextSleep)) + } + + if err != nil { + unlock(f) + return &fs.PathError{ + Op: lt.String(), + Path: f.Name(), + Err: err, + } + } + + return nil +} + +func unlock(f File) error { + var owner File + + mu.Lock() + ino, ok := inodes[f] + if ok { + owner = locks[ino].owner + } + mu.Unlock() + + if owner != f { + panic("unlock called on a file that is not locked") + } + + err := setlkw(f.Fd(), syscall.F_UNLCK) + + mu.Lock() + l := locks[ino] + if len(l.queue) == 0 { + // No waiters: remove the map entry. + delete(locks, ino) + } else { + // The first waiter is sending us their file now. + // Receive it and update the queue. + l.owner = <-l.queue[0] + l.queue = l.queue[1:] + locks[ino] = l + } + delete(inodes, f) + mu.Unlock() + + return err +} + +// setlkw calls FcntlFlock with F_SETLKW for the entire file indicated by fd. +func setlkw(fd uintptr, lt lockType) error { + for { + err := syscall.FcntlFlock(fd, syscall.F_SETLKW, &syscall.Flock_t{ + Type: int16(lt), + Whence: io.SeekStart, + Start: 0, + Len: 0, // All bytes. + }) + if err != syscall.EINTR { + return err + } + } +} + +func isNotSupported(err error) bool { + return err == syscall.ENOSYS || err == syscall.ENOTSUP || err == syscall.EOPNOTSUPP || err == ErrNotSupported +} diff --git a/internal/lockedfile/internal/filelock/filelock_other.go b/internal/lockedfile/internal/filelock/filelock_other.go new file mode 100644 index 0000000000..70f5d7a688 --- /dev/null +++ b/internal/lockedfile/internal/filelock/filelock_other.go @@ -0,0 +1,37 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build !aix && !darwin && !dragonfly && !freebsd && !linux && !netbsd && !openbsd && !plan9 && !solaris && !windows +// +build !aix,!darwin,!dragonfly,!freebsd,!linux,!netbsd,!openbsd,!plan9,!solaris,!windows + +package filelock + +import "io/fs" + +type lockType int8 + +const ( + readLock = iota + 1 + writeLock +) + +func lock(f File, lt lockType) error { + return &fs.PathError{ + Op: lt.String(), + Path: f.Name(), + Err: ErrNotSupported, + } +} + +func unlock(f File) error { + return &fs.PathError{ + Op: "Unlock", + Path: f.Name(), + Err: ErrNotSupported, + } +} + +func isNotSupported(err error) bool { + return err == ErrNotSupported +} diff --git a/internal/lockedfile/internal/filelock/filelock_plan9.go b/internal/lockedfile/internal/filelock/filelock_plan9.go new file mode 100644 index 0000000000..908afb6c8c --- /dev/null +++ b/internal/lockedfile/internal/filelock/filelock_plan9.go @@ -0,0 +1,37 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build plan9 +// +build plan9 + +package filelock + +import "io/fs" + +type lockType int8 + +const ( + readLock = iota + 1 + writeLock +) + +func lock(f File, lt lockType) error { + return &fs.PathError{ + Op: lt.String(), + Path: f.Name(), + Err: ErrNotSupported, + } +} + +func unlock(f File) error { + return &fs.PathError{ + Op: "Unlock", + Path: f.Name(), + Err: ErrNotSupported, + } +} + +func isNotSupported(err error) bool { + return err == ErrNotSupported +} diff --git a/internal/lockedfile/internal/filelock/filelock_test.go b/internal/lockedfile/internal/filelock/filelock_test.go new file mode 100644 index 0000000000..55c490053c --- /dev/null +++ b/internal/lockedfile/internal/filelock/filelock_test.go @@ -0,0 +1,212 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build !js && !plan9 +// +build !js,!plan9 + +//nolint + +package filelock_test + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "runtime" + "testing" + "time" + + "github.com/Azure/azure-container-networking/internal/lockedfile/internal/filelock" +) + +func lock(t *testing.T, f *os.File) { + t.Helper() + err := filelock.Lock(f) + t.Logf("Lock(fd %d) = %v", f.Fd(), err) + if err != nil { + t.Fail() + } +} + +func rLock(t *testing.T, f *os.File) { + t.Helper() + err := filelock.RLock(f) + t.Logf("RLock(fd %d) = %v", f.Fd(), err) + if err != nil { + t.Fail() + } +} + +func unlock(t *testing.T, f *os.File) { + t.Helper() + err := filelock.Unlock(f) + t.Logf("Unlock(fd %d) = %v", f.Fd(), err) + if err != nil { + t.Fail() + } +} + +func mustTempFile(t *testing.T) (f *os.File, remove func()) { + t.Helper() + + base := filepath.Base(t.Name()) + f, err := os.CreateTemp("", base) + if err != nil { + t.Fatalf(`os.CreateTemp("", %q) = %v`, base, err) + } + t.Logf("fd %d = %s", f.Fd(), f.Name()) + + return f, func() { + f.Close() + os.Remove(f.Name()) + } +} + +func mustOpen(t *testing.T, name string) *os.File { + t.Helper() + + f, err := os.OpenFile(name, os.O_RDWR, 0) + if err != nil { + t.Fatalf("os.Open(%q) = %v", name, err) + } + + t.Logf("fd %d = os.Open(%q)", f.Fd(), name) + return f +} + +const ( + quiescent = 10 * time.Millisecond + probablyStillBlocked = 10 * time.Second +) + +func mustBlock(t *testing.T, op string, f *os.File) (wait func(*testing.T)) { + t.Helper() + + desc := fmt.Sprintf("%s(fd %d)", op, f.Fd()) + + done := make(chan struct{}) + go func() { + t.Helper() + switch op { + case "Lock": + lock(t, f) + case "RLock": + rLock(t, f) + default: + panic("invalid op: " + op) + } + close(done) + }() + + select { + case <-done: + t.Fatalf("%s unexpectedly did not block", desc) + return nil + + case <-time.After(quiescent): + t.Logf("%s is blocked (as expected)", desc) + return func(t *testing.T) { + t.Helper() + select { + case <-time.After(probablyStillBlocked): + t.Fatalf("%s is unexpectedly still blocked", desc) + case <-done: + } + } + } +} + +func TestLockExcludesLock(t *testing.T) { + t.Parallel() + + f, remove := mustTempFile(t) + defer remove() + + other := mustOpen(t, f.Name()) + defer other.Close() + + lock(t, f) + lockOther := mustBlock(t, "Lock", other) + unlock(t, f) + lockOther(t) + unlock(t, other) +} + +func TestLockExcludesRLock(t *testing.T) { + t.Parallel() + + f, remove := mustTempFile(t) + defer remove() + + other := mustOpen(t, f.Name()) + defer other.Close() + + lock(t, f) + rLockOther := mustBlock(t, "RLock", other) + unlock(t, f) + rLockOther(t) + unlock(t, other) +} + +func TestRLockExcludesOnlyLock(t *testing.T) { + t.Parallel() + + f, remove := mustTempFile(t) + defer remove() + rLock(t, f) + + f2 := mustOpen(t, f.Name()) + defer f2.Close() + + doUnlockTF := false + switch runtime.GOOS { + case "aix", "solaris": + // When using POSIX locks (as on Solaris), we can't safely read-lock the + // same inode through two different descriptors at the same time: when the + // first descriptor is closed, the second descriptor would still be open but + // silently unlocked. So a second RLock must block instead of proceeding. + lockF2 := mustBlock(t, "RLock", f2) + unlock(t, f) + lockF2(t) + default: + rLock(t, f2) + doUnlockTF = true + } + + other := mustOpen(t, f.Name()) + defer other.Close() + lockOther := mustBlock(t, "Lock", other) + + unlock(t, f2) + if doUnlockTF { + unlock(t, f) + } + lockOther(t) + unlock(t, other) +} + +func TestLockNotDroppedByExecCommand(t *testing.T) { + f, remove := mustTempFile(t) + defer remove() + + lock(t, f) + + other := mustOpen(t, f.Name()) + defer other.Close() + + // Some kinds of file locks are dropped when a duplicated or forked file + // descriptor is unlocked. Double-check that the approach used by os/exec does + // not accidentally drop locks. + //nolint + cmd := exec.Command(os.Args[0], "-test.run=^$") + if err := cmd.Run(); err != nil { + t.Fatalf("exec failed: %v", err) + } + + lockOther := mustBlock(t, "Lock", other) + unlock(t, f) + lockOther(t) + unlock(t, other) +} diff --git a/internal/lockedfile/internal/filelock/filelock_unix.go b/internal/lockedfile/internal/filelock/filelock_unix.go new file mode 100644 index 0000000000..c737fe0234 --- /dev/null +++ b/internal/lockedfile/internal/filelock/filelock_unix.go @@ -0,0 +1,47 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build darwin || dragonfly || freebsd || illumos || linux || netbsd || openbsd +// +build darwin dragonfly freebsd illumos linux netbsd openbsd + +package filelock + +import ( + "io/fs" + "syscall" +) + +type lockType int16 + +const ( + readLock lockType = syscall.LOCK_SH + writeLock lockType = syscall.LOCK_EX +) + +func lock(f File, lt lockType) (err error) { + for { + err = syscall.Flock(int(f.Fd()), int(lt)) + //nolint // not changing std golib code + if err != syscall.EINTR { + break + } + } + if err != nil { + return &fs.PathError{ + Op: lt.String(), + Path: f.Name(), + Err: err, + } + } + return nil +} + +func unlock(f File) error { + return lock(f, syscall.LOCK_UN) +} + +func isNotSupported(err error) bool { + //nolint // not changing std golib code + return err == syscall.ENOSYS || err == syscall.ENOTSUP || err == syscall.EOPNOTSUPP || err == ErrNotSupported +} diff --git a/internal/lockedfile/internal/filelock/filelock_windows.go b/internal/lockedfile/internal/filelock/filelock_windows.go new file mode 100644 index 0000000000..799e530fa1 --- /dev/null +++ b/internal/lockedfile/internal/filelock/filelock_windows.go @@ -0,0 +1,66 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build windows +// +build windows + +package filelock + +import ( + "golang.org/x/sys/windows" + "io/fs" +) + +type lockType uint32 + +const ( + readLock lockType = 0 + writeLock lockType = windows.LOCKFILE_EXCLUSIVE_LOCK +) + +const ( + reserved = 0 + allBytes = ^uint32(0) +) + +func lock(f File, lt lockType) error { + // Per https://golang.org/issue/19098, “Programs currently expect the Fd + // method to return a handle that uses ordinary synchronous I/O.” + // However, LockFileEx still requires an OVERLAPPED structure, + // which contains the file offset of the beginning of the lock range. + // We want to lock the entire file, so we leave the offset as zero. + ol := new(windows.Overlapped) + + err := windows.LockFileEx(windows.Handle(f.Fd()), uint32(lt), reserved, allBytes, allBytes, ol) + if err != nil { + return &fs.PathError{ + Op: lt.String(), + Path: f.Name(), + Err: err, + } + } + return nil +} + +func unlock(f File) error { + ol := new(windows.Overlapped) + err := windows.UnlockFileEx(windows.Handle(f.Fd()), reserved, allBytes, allBytes, ol) + if err != nil { + return &fs.PathError{ + Op: "Unlock", + Path: f.Name(), + Err: err, + } + } + return nil +} + +func isNotSupported(err error) bool { + switch err { + case windows.ERROR_NOT_SUPPORTED, windows.ERROR_CALL_NOT_IMPLEMENTED, ErrNotSupported: + return true + default: + return false + } +} diff --git a/internal/lockedfile/lockedfile.go b/internal/lockedfile/lockedfile.go new file mode 100644 index 0000000000..7c18d5894c --- /dev/null +++ b/internal/lockedfile/lockedfile.go @@ -0,0 +1,188 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package lockedfile creates and manipulates files whose contents should only +// change atomically. +//nolint +package lockedfile + +import ( + "fmt" + "io" + "io/fs" + "os" + "runtime" +) + +// A File is a locked *os.File. +// +// Closing the file releases the lock. +// +// If the program exits while a file is locked, the operating system releases +// the lock but may not do so promptly: callers must ensure that all locked +// files are closed before exiting. +type File struct { + osFile + closed bool +} + +// osFile embeds a *os.File while keeping the pointer itself unexported. +// (When we close a File, it must be the same file descriptor that we opened!) +type osFile struct { + *os.File +} + +// OpenFile is like os.OpenFile, but returns a locked file. +// If flag includes os.O_WRONLY or os.O_RDWR, the file is write-locked; +// otherwise, it is read-locked. +func OpenFile(name string, flag int, perm fs.FileMode) (*File, error) { + var ( + f = new(File) + err error + ) + f.osFile.File, err = openFile(name, flag, perm) + if err != nil { + return nil, err + } + + // Although the operating system will drop locks for open files when the go + // command exits, we want to hold locks for as little time as possible, and we + // especially don't want to leave a file locked after we're done with it. Our + // Close method is what releases the locks, so use a finalizer to report + // missing Close calls on a best-effort basis. + runtime.SetFinalizer(f, func(f *File) { + panic(fmt.Sprintf("lockedfile.File %s became unreachable without a call to Close", f.Name())) + }) + + return f, nil +} + +// Open is like os.Open, but returns a read-locked file. +func Open(name string) (*File, error) { + return OpenFile(name, os.O_RDONLY, 0) +} + +// Create is like os.Create, but returns a write-locked file. +func Create(name string) (*File, error) { + return OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) +} + +// Edit creates the named file with mode 0666 (before umask), +// but does not truncate existing contents. +// +// If Edit succeeds, methods on the returned File can be used for I/O. +// The associated file descriptor has mode O_RDWR and the file is write-locked. +func Edit(name string) (*File, error) { + return OpenFile(name, os.O_RDWR|os.O_CREATE, 0666) +} + +// Close unlocks and closes the underlying file. +// +// Close may be called multiple times; all calls after the first will return a +// non-nil error. +func (f *File) Close() error { + if f.closed { + return &fs.PathError{ + Op: "close", + Path: f.Name(), + Err: fs.ErrClosed, + } + } + f.closed = true + + err := closeFile(f.osFile.File) + runtime.SetFinalizer(f, nil) + return err +} + +// Read opens the named file with a read-lock and returns its contents. +func Read(name string) ([]byte, error) { + f, err := Open(name) + if err != nil { + return nil, err + } + defer f.Close() + + return io.ReadAll(f) +} + +// Write opens the named file (creating it with the given permissions if needed), +// then write-locks it and overwrites it with the given content. +func Write(name string, content io.Reader, perm fs.FileMode) (err error) { + f, err := OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) + if err != nil { + return err + } + + _, err = io.Copy(f, content) + if closeErr := f.Close(); err == nil { + err = closeErr + } + return err +} + +// Transform invokes t with the result of reading the named file, with its lock +// still held. +// +// If t returns a nil error, Transform then writes the returned contents back to +// the file, making a best effort to preserve existing contents on error. +// +// t must not modify the slice passed to it. +func Transform(name string, t func([]byte) ([]byte, error)) (err error) { + f, err := Edit(name) + if err != nil { + return err + } + defer f.Close() + + old, err := io.ReadAll(f) + if err != nil { + return err + } + + latest, err := t(old) + if err != nil { + return err + } + + if len(latest) > len(old) { + // The overall file size is increasing, so write the tail first: if we're + // about to run out of space on the disk, we would rather detect that + // failure before we have overwritten the original contents. + if _, err = f.WriteAt(latest[len(old):], int64(len(old))); err != nil { + // Make a best effort to remove the incomplete tail. + f.Truncate(int64(len(old))) + return err + } + } + + // We're about to overwrite the old contents. In case of failure, make a best + // effort to roll back before we close the file. + defer func() { + if err != nil { + if _, err := f.WriteAt(old, 0); err == nil { + f.Truncate(int64(len(old))) + } + } + }() + + if len(latest) >= len(old) { + if _, err := f.WriteAt(latest[:len(old)], 0); err != nil { + return err + } + } else { + if _, err := f.WriteAt(latest, 0); err != nil { + return err + } + // The overall file size is decreasing, so shrink the file to its final size + // after writing. We do this after writing (instead of before) so that if + // the write fails, enough filesystem space will likely still be reserved + // to contain the previous contents. + if err := f.Truncate(int64(len(latest))); err != nil { + return err + } + } + + return nil +} diff --git a/internal/lockedfile/lockedfile_filelock.go b/internal/lockedfile/lockedfile_filelock.go new file mode 100644 index 0000000000..b15b76b936 --- /dev/null +++ b/internal/lockedfile/lockedfile_filelock.go @@ -0,0 +1,67 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build !plan9 +// +build !plan9 + +//nolint +package lockedfile + +import ( + "io/fs" + "os" + + "github.com/Azure/azure-container-networking/internal/lockedfile/internal/filelock" +) + +func openFile(name string, flag int, perm fs.FileMode) (*os.File, error) { + // On BSD systems, we could add the O_SHLOCK or O_EXLOCK flag to the OpenFile + // call instead of locking separately, but we have to support separate locking + // calls for Linux and Windows anyway, so it's simpler to use that approach + // consistently. + + f, err := os.OpenFile(name, flag&^os.O_TRUNC, perm) + if err != nil { + return nil, err + } + + switch flag & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR) { + case os.O_WRONLY, os.O_RDWR: + err = filelock.Lock(f) + default: + err = filelock.RLock(f) + } + if err != nil { + f.Close() + return nil, err + } + + if flag&os.O_TRUNC == os.O_TRUNC { + if err := f.Truncate(0); err != nil { + // The documentation for os.O_TRUNC says “if possible, truncate file when + // opened”, but doesn't define “possible” (golang.org/issue/28699). + // We'll treat regular files (and symlinks to regular files) as “possible” + // and ignore errors for the rest. + if fi, statErr := f.Stat(); statErr != nil || fi.Mode().IsRegular() { + filelock.Unlock(f) + f.Close() + return nil, err + } + } + } + + return f, nil +} + +func closeFile(f *os.File) error { + // Since locking syscalls operate on file descriptors, we must unlock the file + // while the descriptor is still valid — that is, before the file is closed — + // and avoid unlocking files that are already closed. + err := filelock.Unlock(f) + + if closeErr := f.Close(); err == nil { + err = closeErr + } + return err +} diff --git a/internal/lockedfile/lockedfile_plan9.go b/internal/lockedfile/lockedfile_plan9.go new file mode 100644 index 0000000000..979118b10a --- /dev/null +++ b/internal/lockedfile/lockedfile_plan9.go @@ -0,0 +1,95 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build plan9 +// +build plan9 + +package lockedfile + +import ( + "io/fs" + "math/rand" + "os" + "strings" + "time" +) + +// Opening an exclusive-use file returns an error. +// The expected error strings are: +// +// - "open/create -- file is locked" (cwfs, kfs) +// - "exclusive lock" (fossil) +// - "exclusive use file already open" (ramfs) +var lockedErrStrings = [...]string{ + "file is locked", + "exclusive lock", + "exclusive use file already open", +} + +// Even though plan9 doesn't support the Lock/RLock/Unlock functions to +// manipulate already-open files, IsLocked is still meaningful: os.OpenFile +// itself may return errors that indicate that a file with the ModeExclusive bit +// set is already open. +func isLocked(err error) bool { + s := err.Error() + + for _, frag := range lockedErrStrings { + if strings.Contains(s, frag) { + return true + } + } + + return false +} + +func openFile(name string, flag int, perm fs.FileMode) (*os.File, error) { + // Plan 9 uses a mode bit instead of explicit lock/unlock syscalls. + // + // Per http://man.cat-v.org/plan_9/5/stat: “Exclusive use files may be open + // for I/O by only one fid at a time across all clients of the server. If a + // second open is attempted, it draws an error.” + // + // So we can try to open a locked file, but if it fails we're on our own to + // figure out when it becomes available. We'll use exponential backoff with + // some jitter and an arbitrary limit of 500ms. + + // If the file was unpacked or created by some other program, it might not + // have the ModeExclusive bit set. Set it before we call OpenFile, so that we + // can be confident that a successful OpenFile implies exclusive use. + if fi, err := os.Stat(name); err == nil { + if fi.Mode()&fs.ModeExclusive == 0 { + if err := os.Chmod(name, fi.Mode()|fs.ModeExclusive); err != nil { + return nil, err + } + } + } else if !os.IsNotExist(err) { + return nil, err + } + + nextSleep := 1 * time.Millisecond + const maxSleep = 500 * time.Millisecond + for { + f, err := os.OpenFile(name, flag, perm|fs.ModeExclusive) + if err == nil { + return f, nil + } + + if !isLocked(err) { + return nil, err + } + + time.Sleep(nextSleep) + + nextSleep += nextSleep + if nextSleep > maxSleep { + nextSleep = maxSleep + } + // Apply 10% jitter to avoid synchronizing collisions. + nextSleep += time.Duration((0.1*rand.Float64() - 0.05) * float64(nextSleep)) + } +} + +func closeFile(f *os.File) error { + return f.Close() +} diff --git a/internal/lockedfile/lockedfile_test.go b/internal/lockedfile/lockedfile_test.go new file mode 100644 index 0000000000..dc33d9d173 --- /dev/null +++ b/internal/lockedfile/lockedfile_test.go @@ -0,0 +1,271 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build !js +// +build !js + +// js does not support inter-process file locking. +//nolint +package lockedfile_test + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "testing" + "time" + + "github.com/Azure/azure-container-networking/internal/lockedfile" +) + +func mustTempDir(t *testing.T) (dir string, remove func()) { + t.Helper() + + dir, err := os.MkdirTemp("", filepath.Base(t.Name())) + if err != nil { + t.Fatal(err) + } + return dir, func() { os.RemoveAll(dir) } +} + +const ( + quiescent = 10 * time.Millisecond + probablyStillBlocked = 10 * time.Second +) + +func mustBlock(t *testing.T, desc string, f func()) (wait func(*testing.T)) { + t.Helper() + + done := make(chan struct{}) + go func() { + f() + close(done) + }() + + select { + case <-done: + t.Fatalf("%s unexpectedly did not block", desc) + return nil + + case <-time.After(quiescent): + return func(t *testing.T) { + t.Helper() + select { + case <-time.After(probablyStillBlocked): + t.Fatalf("%s is unexpectedly still blocked after %v", desc, probablyStillBlocked) + case <-done: + } + } + } +} + +func TestMutexExcludes(t *testing.T) { + t.Parallel() + + dir, remove := mustTempDir(t) + defer remove() + + path := filepath.Join(dir, "lock") + + mu := lockedfile.MutexAt(path) + t.Logf("mu := MutexAt(_)") + + unlock, err := mu.Lock() + if err != nil { + t.Fatalf("mu.Lock: %v", err) + } + t.Logf("unlock, _ := mu.Lock()") + + mu2 := lockedfile.MutexAt(mu.Path) + t.Logf("mu2 := MutexAt(mu.Path)") + + wait := mustBlock(t, "mu2.Lock()", func() { + unlock2, err := mu2.Lock() + if err != nil { + t.Errorf("mu2.Lock: %v", err) + return + } + t.Logf("unlock2, _ := mu2.Lock()") + t.Logf("unlock2()") + unlock2() + }) + + t.Logf("unlock()") + unlock() + wait(t) +} + +func TestReadWaitsForLock(t *testing.T) { + t.Parallel() + + dir, remove := mustTempDir(t) + defer remove() + + path := filepath.Join(dir, "timestamp.txt") + + f, err := lockedfile.Create(path) + if err != nil { + t.Fatalf("Create: %v", err) + } + defer f.Close() + + const ( + part1 = "part 1\n" + part2 = "part 2\n" + ) + _, err = f.WriteString(part1) + if err != nil { + t.Fatalf("WriteString: %v", err) + } + t.Logf("WriteString(%q) = ", part1) + + wait := mustBlock(t, "Read", func() { + b, err := lockedfile.Read(path) + if err != nil { + t.Errorf("Read: %v", err) + return + } + + const want = part1 + part2 + got := string(b) + if got == want { + t.Logf("Read(_) = %q", got) + } else { + t.Errorf("Read(_) = %q, _; want %q", got, want) + } + }) + + _, err = f.WriteString(part2) + if err != nil { + t.Errorf("WriteString: %v", err) + } else { + t.Logf("WriteString(%q) = ", part2) + } + f.Close() + + wait(t) +} + +func TestCanLockExistingFile(t *testing.T) { + t.Parallel() + + dir, remove := mustTempDir(t) + defer remove() + path := filepath.Join(dir, "existing.txt") + + if err := os.WriteFile(path, []byte("ok"), 0777); err != nil { + t.Fatalf("os.WriteFile: %v", err) + } + + f, err := lockedfile.Edit(path) + if err != nil { + t.Fatalf("first Edit: %v", err) + } + + wait := mustBlock(t, "Edit", func() { + other, err := lockedfile.Edit(path) + if err != nil { + t.Errorf("second Edit: %v", err) + } + other.Close() + }) + + f.Close() + wait(t) +} + +// TestSpuriousEDEADLK verifies that the spurious EDEADLK reported in +// https://golang.org/issue/32817 no longer occurs. +func TestSpuriousEDEADLK(t *testing.T) { + // P.1 locks file A. + // Q.3 locks file B. + // Q.3 blocks on file A. + // P.2 blocks on file B. (Spurious EDEADLK occurs here.) + // P.1 unlocks file A. + // Q.3 unblocks and locks file A. + // Q.3 unlocks files A and B. + // P.2 unblocks and locks file B. + // P.2 unlocks file B. + + dirVar := t.Name() + "DIR" + + if dir := os.Getenv(dirVar); dir != "" { + // Q.3 locks file B. + b, err := lockedfile.Edit(filepath.Join(dir, "B")) + if err != nil { + t.Fatal(err) + } + defer b.Close() + + if err := os.WriteFile(filepath.Join(dir, "locked"), []byte("ok"), 0666); err != nil { + t.Fatal(err) + } + + // Q.3 blocks on file A. + a, err := lockedfile.Edit(filepath.Join(dir, "A")) + // Q.3 unblocks and locks file A. + if err != nil { + t.Fatal(err) + } + //nolint + defer a.Close() + + // Q.3 unlocks files A and B. + return + } + + dir, remove := mustTempDir(t) + defer remove() + + // P.1 locks file A. + a, err := lockedfile.Edit(filepath.Join(dir, "A")) + if err != nil { + t.Fatal(err) + } + + cmd := exec.Command(os.Args[0], "-test.run="+t.Name()) + cmd.Env = append(os.Environ(), fmt.Sprintf("%s=%s", dirVar, dir)) + + qDone := make(chan struct{}) + waitQ := mustBlock(t, "Edit A and B in subprocess", func() { + out, err := cmd.CombinedOutput() + if err != nil { + t.Errorf("%v:\n%s", err, out) + } + close(qDone) + }) + + // Wait until process Q has either failed or locked file B. + // Otherwise, P.2 might not block on file B as intended. +locked: + for { + if _, err := os.Stat(filepath.Join(dir, "locked")); !os.IsNotExist(err) { + break locked + } + select { + case <-qDone: + break locked + case <-time.After(1 * time.Millisecond): + } + } + + waitP2 := mustBlock(t, "Edit B", func() { + // P.2 blocks on file B. (Spurious EDEADLK occurs here.) + b, err := lockedfile.Edit(filepath.Join(dir, "B")) + // P.2 unblocks and locks file B. + if err != nil { + t.Error(err) + return + } + // P.2 unlocks file B. + b.Close() + }) + + // P.1 unlocks file A. + a.Close() + + waitQ(t) + waitP2(t) +} diff --git a/internal/lockedfile/mutex.go b/internal/lockedfile/mutex.go new file mode 100644 index 0000000000..de70da79d6 --- /dev/null +++ b/internal/lockedfile/mutex.go @@ -0,0 +1,68 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package lockedfile + +import ( + "fmt" + "os" + "sync" +) + +// A Mutex provides mutual exclusion within and across processes by locking a +// well-known file. Such a file generally guards some other part of the +// filesystem: for example, a Mutex file in a directory might guard access to +// the entire tree rooted in that directory. +// +// Mutex does not implement sync.Locker: unlike a sync.Mutex, a lockedfile.Mutex +// can fail to lock (e.g. if there is a permission error in the filesystem). +// +// Like a sync.Mutex, a Mutex may be included as a field of a larger struct but +// must not be copied after first use. The Path field must be set before first +// use and must not be change thereafter. +type Mutex struct { + Path string // The path to the well-known lock file. Must be non-empty. + mu sync.Mutex // A redundant mutex. The race detector doesn't know about file locking, so in tests we may need to lock something that it understands. +} + +// MutexAt returns a new Mutex with Path set to the given non-empty path. +func MutexAt(path string) *Mutex { + if path == "" { + panic("lockedfile.MutexAt: path must be non-empty") + } + return &Mutex{Path: path} +} + +func (mu *Mutex) String() string { + return fmt.Sprintf("lockedfile.Mutex(%s)", mu.Path) +} + +// Lock attempts to lock the Mutex. +// +// If successful, Lock returns a non-nil unlock function: it is provided as a +// return-value instead of a separate method to remind the caller to check the +// accompanying error. (See https://golang.org/issue/20803.) +func (mu *Mutex) Lock() (unlock func(), err error) { + if mu.Path == "" { + panic("lockedfile.Mutex: missing Path during Lock") + } + + // We could use either O_RDWR or O_WRONLY here. If we choose O_RDWR and the + // file at mu.Path is write-only, the call to OpenFile will fail with a + // permission error. That's actually what we want: if we add an RLock method + // in the future, it should call OpenFile with O_RDONLY and will require the + // files must be readable, so we should not let the caller make any + // assumptions about Mutex working with write-only files. + //nolint + f, err := OpenFile(mu.Path, os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + return nil, err + } + mu.mu.Lock() + + return func() { + mu.mu.Unlock() + f.Close() + }, nil +} diff --git a/internal/lockedfile/transform_test.go b/internal/lockedfile/transform_test.go new file mode 100644 index 0000000000..0fdeb558f4 --- /dev/null +++ b/internal/lockedfile/transform_test.go @@ -0,0 +1,107 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build !js +// +build !js + +// js does not support inter-process file locking. +//nolint +package lockedfile_test + +import ( + "bytes" + "encoding/binary" + "math/rand" + "path/filepath" + "testing" + "time" + + "github.com/Azure/azure-container-networking/internal/lockedfile" +) + +func isPowerOf2(x int) bool { + return x > 0 && x&(x-1) == 0 +} + +func roundDownToPowerOf2(x int) int { + if x <= 0 { + panic("nonpositive x") + } + bit := 1 + for x != bit { + x = x &^ bit + bit <<= 1 + } + return x +} + +func TestTransform(t *testing.T) { + dir, remove := mustTempDir(t) + defer remove() + path := filepath.Join(dir, "blob.bin") + + const maxChunkWords = 8 << 10 + buf := make([]byte, 2*maxChunkWords*8) + for i := uint64(0); i < 2*maxChunkWords; i++ { + binary.LittleEndian.PutUint64(buf[i*8:], i) + } + + if err := lockedfile.Write(path, bytes.NewReader(buf[:8]), 0666); err != nil { + t.Fatal(err) + } + + var attempts int64 = 128 + if !testing.Short() { + attempts *= 16 + } + const parallel = 32 + + var sem = make(chan bool, parallel) + + for n := attempts; n > 0; n-- { + sem <- true + go func() { + defer func() { <-sem }() + + time.Sleep(time.Duration(rand.Intn(100)) * time.Microsecond) + chunkWords := roundDownToPowerOf2(rand.Intn(maxChunkWords) + 1) + offset := rand.Intn(chunkWords) + + err := lockedfile.Transform(path, func(data []byte) (chunk []byte, err error) { + chunk = buf[offset*8 : (offset+chunkWords)*8] + + if len(data)&^7 != len(data) { + t.Errorf("read %d bytes, but each write is an integer multiple of 8 bytes", len(data)) + return chunk, nil + } + + words := len(data) / 8 + if !isPowerOf2(words) { + t.Errorf("read %d 8-byte words, but each write is a power-of-2 number of words", words) + return chunk, nil + } + + u := binary.LittleEndian.Uint64(data) + for i := 1; i < words; i++ { + next := binary.LittleEndian.Uint64(data[i*8:]) + if next != u+1 { + t.Errorf("wrote sequential integers, but read integer out of sequence at offset %d", i) + return chunk, nil + } + u = next + } + + return chunk, nil + }) + + if err != nil { + t.Errorf("unexpected error from Transform: %v", err) + } + }() + } + + for n := parallel; n > 0; n-- { + sem <- true + } +} diff --git a/processlock/filelock_test.go b/processlock/filelock_test.go new file mode 100644 index 0000000000..bdca70dd38 --- /dev/null +++ b/processlock/filelock_test.go @@ -0,0 +1,113 @@ +package processlock + +import ( + "io/ioutil" + "os" + "path/filepath" + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +var ( + lockDir, _ = os.Getwd() + existingLockFile string + newLockFile string +) + +func TestMain(m *testing.M) { + existingLockFile = filepath.Join(lockDir, "existing.lock") + newLockFile = filepath.Join(lockDir, "new.lock") + os.Remove(existingLockFile) + os.Remove(newLockFile) + f, _ := os.Create(existingLockFile) + exitCode := m.Run() + f.Close() + os.Remove(existingLockFile) + os.Remove(newLockFile) + os.Exit(exitCode) +} + +func TestFileLock(t *testing.T) { + tests := []struct { + name string + flock Interface + wantErr bool + deleteLockfile bool + wantErrMsg string + lockfileName string + }{ + { + name: "Create new file and acquire Lock", + flock: &fileLock{filePath: newLockFile}, + wantErr: false, + deleteLockfile: true, + lockfileName: newLockFile, + }, + { + name: "acquire Lock on existing file", + flock: &fileLock{filePath: existingLockFile}, + lockfileName: existingLockFile, + wantErr: false, + }, + { + name: "acquire Lock on existing file after releasing", + flock: &fileLock{filePath: existingLockFile}, + lockfileName: existingLockFile, + wantErr: false, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + err := tt.flock.Lock() + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + err = tt.flock.Unlock() + require.NoError(t, err) + err = tt.flock.Unlock() + require.NoError(t, err, "Calling Release lock again should not throw error for already released lock:%v", err) + + // read lockfile contents to check if contents match with pid of current process + b, errRead := ioutil.ReadFile(tt.lockfileName) + require.NoError(t, errRead, "Got error reading lockfile:%v", errRead) + pidStr := string(b) + pid, _ := strconv.Atoi(pidStr) + require.Equal(t, os.Getpid(), pid, "Expected pid %d but got %d", os.Getpid(), pid) + } + if tt.deleteLockfile { + os.Remove(tt.lockfileName) + } + }) + } +} + +func TestReleaseFileLockError(t *testing.T) { + tests := []struct { + name string + flock Interface + wantErr bool + wantErrMsg string + }{ + { + name: "Release file lock without acquring it", + flock: &fileLock{filePath: newLockFile}, + wantErr: true, + wantErrMsg: ErrInvalidFile.Error(), + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + err := tt.flock.Unlock() + if tt.wantErr { + require.Error(t, err) + require.Equal(t, tt.wantErrMsg, err.Error(), "Expected:%s but got:%s", tt.wantErrMsg, err.Error()) + } + }) + } +} diff --git a/processlock/mockprocesslock.go b/processlock/mockprocesslock.go new file mode 100644 index 0000000000..ed62de5628 --- /dev/null +++ b/processlock/mockprocesslock.go @@ -0,0 +1,34 @@ +package processlock + +import ( + "github.com/pkg/errors" +) + +// ErrMockFileLock - mock filelock error +var ErrMockFileLock = errors.New("mock filelock error") + +type mockFileLock struct { + fail bool +} + +func NewMockFileLock(fail bool) Interface { + return &mockFileLock{ + fail: fail, + } +} + +func (l *mockFileLock) Lock() error { + if l.fail { + return ErrMockFileLock + } + + return nil +} + +func (l *mockFileLock) Unlock() error { + if l.fail { + return ErrMockFileLock + } + + return nil +} diff --git a/processlock/processlock.go b/processlock/processlock.go new file mode 100644 index 0000000000..1e534d7683 --- /dev/null +++ b/processlock/processlock.go @@ -0,0 +1,73 @@ +package processlock + +import ( + "io/fs" + "os" + "path/filepath" + "strconv" + + "github.com/Azure/azure-container-networking/internal/lockedfile" + "github.com/pkg/errors" +) + +// ErrInvalidFile represents invalid file pointer +var ( + ErrEmptyFilePath = errors.New("empty file path") + ErrInvalidFile = errors.New("invalid File pointer") +) + +//nolint:revive // this naming makes sense +type Interface interface { + Lock() error + Unlock() error +} + +type fileLock struct { + filePath string + file *lockedfile.File +} + +func NewFileLock(fileAbsPath string) (Interface, error) { + if fileAbsPath == "" { + return nil, ErrEmptyFilePath + } + + //nolint:gomnd //0o664 - permission to create directory in octal + err := os.MkdirAll(filepath.Dir(fileAbsPath), os.FileMode(0o664)) + if err != nil { + return nil, errors.Wrap(err, "mkdir lock dir returned error") + } + + return &fileLock{ + filePath: fileAbsPath, + }, nil +} + +func (l *fileLock) Lock() error { + var err error + + l.file, err = lockedfile.Create(l.filePath) + if err != nil { + return errors.Wrap(err, "lockedfile create error in lock") + } + + _, err = l.file.WriteString(strconv.Itoa(os.Getpid())) + if err != nil { + return errors.Wrap(err, "write to lockfile failed") + } + + return nil +} + +func (l *fileLock) Unlock() error { + if l.file == nil { + return ErrInvalidFile + } + + err := l.file.Close() + if err != nil && !errors.Is(err, fs.ErrClosed) { + return errors.Wrap(err, "file close error in unlock") + } + + return nil +} diff --git a/store/json.go b/store/json.go index 91d75dcd51..36cd247297 100644 --- a/store/json.go +++ b/store/json.go @@ -9,55 +9,46 @@ import ( "io/ioutil" "os" "path/filepath" - "strconv" "sync" "time" "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/platform" + "github.com/Azure/azure-container-networking/processlock" + "github.com/pkg/errors" ) const ( // Default file name for backing persistent store. defaultFileName = "azure-container-networking.json" - // Extension added to the file name for lock. - lockExtension = ".lock" + // LockExtension - Extension added to the file name for lock. + LockExtension = ".lock" - // Maximum number of retries before failing a lock call. - lockMaxRetries = 100 - - // Delay between lock retries. - lockRetryDelay = 100 * time.Millisecond + // DefaultLockTimeout - lock timeout in milliseconds + DefaultLockTimeout = 10000 * time.Millisecond ) // jsonFileStore is an implementation of KeyValueStore using a local JSON file. type jsonFileStore struct { - fileName string - lockFileName string - data map[string]*json.RawMessage - inSync bool - locked bool + fileName string + data map[string]*json.RawMessage + inSync bool + processLock processlock.Interface sync.Mutex } +//nolint:revive // ignoring name change // NewJsonFileStore creates a new jsonFileStore object, accessed as a KeyValueStore. -func NewJsonFileStore(fileName string) (KeyValueStore, error) { +func NewJsonFileStore(fileName string, lockclient processlock.Interface) (KeyValueStore, error) { if fileName == "" { fileName = defaultFileName } - if platform.CNILockPath != "" { - err := os.MkdirAll(platform.CNILockPath, os.FileMode(0o664)) - if err != nil { - return nil, err - } - } - kvs := &jsonFileStore{ - fileName: fileName, - lockFileName: platform.CNILockPath + filepath.Base(fileName) + lockExtension, - data: make(map[string]*json.RawMessage), + fileName: fileName, + processLock: lockclient, + data: make(map[string]*json.RawMessage), } return kvs, nil @@ -174,84 +165,48 @@ func (kvs *jsonFileStore) flush() error { return nil } -// Lock locks the store for exclusive access. -func (kvs *jsonFileStore) Lock(block bool) error { - var ( - lockFile *os.File - err error - ) +func (kvs *jsonFileStore) lockUtil(status chan error) { + err := kvs.processLock.Lock() + status <- err +} +// Lock locks the store for exclusive access. +func (kvs *jsonFileStore) Lock(timeout time.Duration) error { kvs.Mutex.Lock() defer kvs.Mutex.Unlock() - if kvs.locked { - return ErrStoreLocked - } + afterTime := time.After(timeout) + status := make(chan error) - //nolint:gomnd // 0o664 - read write mode constant - lockPerm := os.FileMode(0o644) + os.FileMode(os.ModeExclusive) - - // Try to acquire the lock file. - var lockRetryCount uint - var modTimeCur time.Time - var modTimePrev time.Time - for lockRetryCount < lockMaxRetries { - lockFile, err = os.OpenFile(kvs.lockFileName, os.O_CREATE|os.O_EXCL|os.O_RDWR, lockPerm) - if err == nil { - break - } + log.Printf("Acquiring process lock") + go kvs.lockUtil(status) - if !block { - return ErrNonBlockingLockIsAlreadyLocked - } - - // Reset the lock retry count if the timestamp for the lock file changes. - if fileInfo, err := os.Stat(kvs.lockFileName); err == nil { - modTimeCur = fileInfo.ModTime() - if !modTimeCur.Equal(modTimePrev) { - lockRetryCount = 0 - } - modTimePrev = modTimeCur - } - - time.Sleep(lockRetryDelay) - - lockRetryCount++ - } - - if lockRetryCount == lockMaxRetries { + var err error + select { + case <-afterTime: return ErrTimeoutLockingStore + case err = <-status: } - defer lockFile.Close() - - // Write the process ID for easy identification. - if _, err = lockFile.WriteString(strconv.Itoa(os.Getpid())); err != nil { - return err + if err != nil { + return errors.Wrap(err, "processLock acquire error") } - kvs.locked = true - + log.Printf("Acquired process lock") return nil } // Unlock unlocks the store. -func (kvs *jsonFileStore) Unlock(forceUnlock bool) error { +func (kvs *jsonFileStore) Unlock() error { kvs.Mutex.Lock() defer kvs.Mutex.Unlock() - if !forceUnlock && !kvs.locked { - return ErrStoreNotLocked - } - - err := os.Remove(kvs.lockFileName) + err := kvs.processLock.Unlock() if err != nil { - return err + return errors.Wrap(err, "unlock error") } - kvs.inSync = false - kvs.locked = false - + log.Printf("Released process lock") return nil } @@ -269,24 +224,6 @@ func (kvs *jsonFileStore) GetModificationTime() (time.Time, error) { return info.ModTime().UTC(), nil } -// GetLockFileModificationTime returns the modification time of the lock file of the persistent store. -func (kvs *jsonFileStore) GetLockFileModificationTime() (time.Time, error) { - kvs.Mutex.Lock() - defer kvs.Mutex.Unlock() - - info, err := os.Stat(kvs.lockFileName) - if err != nil { - log.Printf("os.stat() for file %v failed: %v", kvs.lockFileName, err) - return time.Time{}.UTC(), err - } - - return info.ModTime().UTC(), nil -} - -func (kvs *jsonFileStore) GetLockFileName() string { - return kvs.lockFileName -} - func (kvs *jsonFileStore) Remove() { kvs.Mutex.Lock() if err := os.Remove(kvs.fileName); err != nil { diff --git a/store/json_test.go b/store/json_test.go index 6af3851ea5..e57d1e2348 100644 --- a/store/json_test.go +++ b/store/json_test.go @@ -5,9 +5,12 @@ package store import ( "os" - "runtime" "strings" "testing" + "time" + + "github.com/Azure/azure-container-networking/processlock" + "github.com/stretchr/testify/require" ) const ( @@ -46,7 +49,7 @@ func TestKeyValuePairsAreReinstantiatedFromJSONFile(t *testing.T) { defer os.Remove(testFileName) // Create the store, initialized using the JSON file. - kvs, err := NewJsonFileStore(testFileName) + kvs, err := NewJsonFileStore(testFileName, processlock.NewMockFileLock(false)) if err != nil { t.Fatalf("Failed to create KeyValueStore %v\n", err) } @@ -71,7 +74,7 @@ func TestKeyValuePairsArePersistedToJSONFile(t *testing.T) { var actualPair string // Create the store. - kvs, err := NewJsonFileStore(testFileName) + kvs, err := NewJsonFileStore(testFileName, processlock.NewMockFileLock(false)) if err != nil { t.Fatalf("Failed to create KeyValueStore %v\n", err) } @@ -117,7 +120,7 @@ func TestKeyValuePairsAreWrittenAndReadCorrectly(t *testing.T) { var readValue testType1 // Create the store. - kvs, err := NewJsonFileStore(testFileName) + kvs, err := NewJsonFileStore(testFileName, processlock.NewMockFileLock(false)) if err != nil { t.Fatalf("Failed to create KeyValueStore %v\n", err) } @@ -150,93 +153,70 @@ func TestKeyValuePairsAreWrittenAndReadCorrectly(t *testing.T) { os.Remove(testFileName) } -// Tests that locking a store gives the caller exclusive access. -func TestLockingStoreGivesExclusiveAccess(t *testing.T) { - anyValue := testType1{"test", 42} - - // Create the store. - kvs, err := NewJsonFileStore(testFileName) - if err != nil { - t.Fatalf("Failed to create first store: %v", err) - } - - // Lock for exclusive access. - err = kvs.Lock(false) - if err != nil { - t.Errorf("Failed to lock store: %v", err) - } - - // Write a key value pair. - err = kvs.Write(testKey1, &anyValue) - if err != nil { - t.Fatalf("Failed to write to store: %v", err) - } - - // Create a second store pointing to the same file. - kvs2, err := NewJsonFileStore(testFileName) - if err != nil { - t.Fatalf("Failed to create second store: %v", err) - } - - // Try locking the second store. - // This should fail because the first store has exclusive access. - err = kvs2.Lock(false) - if err == nil { - t.Errorf("Locking an already-locked store succeeded: %v", err) - } - - // Unlock the first store. - err = kvs.Unlock(false) - if err != nil { - t.Errorf("Failed to unlock first store: %v", err) - } - - // Try locking the second store again. - // This should succeed because the first store revoked exclusive access. - err = kvs2.Lock(false) - if err != nil { - t.Errorf("Failed to re-lock an unlocked store: %v", err) - } - - // Unlock the second store. - err = kvs2.Unlock(false) - if err != nil { - t.Errorf("Failed to unlock second store: %v", err) - } - - // Cleanup. - os.Remove(testFileName) -} - // test case for testing newjsonfilestore idempotent func TestNewJsonFileStoreIdempotent(t *testing.T) { - _, err := NewJsonFileStore(testLockFileName) + _, err := NewJsonFileStore(testLockFileName, processlock.NewMockFileLock(false)) if err != nil { t.Errorf("Failed to initialize store: %v", err) } - _, err = NewJsonFileStore(testLockFileName) + _, err = NewJsonFileStore(testLockFileName, processlock.NewMockFileLock(false)) if err != nil { t.Errorf("Failed to initialize same store second time: %v", err) } } -// test case for checking if lockfilepath is expected -func TestLockFilePath(t *testing.T) { - store, err := NewJsonFileStore(testLockFileName) - if err != nil { - t.Errorf("Failed to initialize store: %v", err) - } - - lockFileName := store.GetLockFileName() - - if runtime.GOOS == "linux" { - if lockFileName != "/var/run/azure-vnet/"+testLockFileName+".lock" { - t.Errorf("Not expected file lock name: %v", lockFileName) - } - } else { - if lockFileName != testLockFileName+".lock" { - t.Errorf("Not expected lockfilename: %v", lockFileName) - } +func TestLock(t *testing.T) { + tests := []struct { + name string + store KeyValueStore + timeoutms int + wantErr bool + wantErrMsg string + }{ + { + name: "Acquire Lock happy path", + store: func() KeyValueStore { + st, _ := NewJsonFileStore(testFileName, processlock.NewMockFileLock(false)) + return st + }(), + timeoutms: 10000, + wantErr: false, + }, + { + name: "Acquire Lock Fail", + store: func() KeyValueStore { + st, _ := NewJsonFileStore(testFileName, processlock.NewMockFileLock(true)) + return st + }(), + timeoutms: 10000, + wantErr: true, + wantErrMsg: processlock.ErrMockFileLock.Error(), + }, + { + name: "Acquire Lock timeout error", + store: func() KeyValueStore { + st, _ := NewJsonFileStore(testFileName, processlock.NewMockFileLock(false)) + return st + }(), + timeoutms: 0, + wantErr: true, + wantErrMsg: ErrTimeoutLockingStore.Error(), + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + err := tt.store.Lock(time.Duration(tt.timeoutms) * time.Millisecond) + if tt.wantErr { + require.Error(t, err) + require.Contains(t, err.Error(), tt.wantErrMsg, "Expected:%v but got:%v", tt.wantErrMsg, err.Error()) + } else { + require.NoError(t, err) + err = tt.store.Unlock() + require.NoError(t, err) + } + }) } } diff --git a/store/mockstore.go b/store/mockstore.go index fde8659656..c3acfff2af 100644 --- a/store/mockstore.go +++ b/store/mockstore.go @@ -28,11 +28,11 @@ func (ms *mockStore) Flush() error { return nil } -func (ms *mockStore) Lock(block bool) error { +func (ms *mockStore) Lock(duration time.Duration) error { return nil } -func (ms *mockStore) Unlock(forceUnlock bool) error { +func (ms *mockStore) Unlock() error { return nil } diff --git a/store/store.go b/store/store.go index eb2e44263f..0408310925 100644 --- a/store/store.go +++ b/store/store.go @@ -13,11 +13,9 @@ type KeyValueStore interface { Read(key string, value interface{}) error Write(key string, value interface{}) error Flush() error - Lock(block bool) error - Unlock(forceUnlock bool) error + Lock(timeout time.Duration) error + Unlock() error GetModificationTime() (time.Time, error) - GetLockFileModificationTime() (time.Time, error) - GetLockFileName() string Remove() } diff --git a/telemetry/aiwrapper.go b/telemetry/aiwrapper.go index eeb6c11529..3416b89c75 100644 --- a/telemetry/aiwrapper.go +++ b/telemetry/aiwrapper.go @@ -3,8 +3,6 @@ package telemetry import ( "fmt" - "runtime" - "github.com/Azure/azure-container-networking/aitelemetry" "github.com/Azure/azure-container-networking/log" ) @@ -73,7 +71,6 @@ func SendAIMetric(aiMetric AIMetric) { return } - aiMetric.Metric.CustomDimensions[OSTypeStr] = runtime.GOOS th.TrackMetric(aiMetric.Metric) } diff --git a/telemetry/constants.go b/telemetry/constants.go index b2abf5c781..f3172e1697 100644 --- a/telemetry/constants.go +++ b/telemetry/constants.go @@ -8,6 +8,7 @@ const ( CNIAddTimeMetricStr = "CNIAddTimeMs" CNIDelTimeMetricStr = "CNIDelTimeMs" CNIUpdateTimeMetricStr = "CNIUpdateTimeMs" + CNILockTimeoutStr = "CNILockTimeoutError" // Dimension Names ContextStr = "Context" diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index f2659dd05e..223d1fbbff 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -8,6 +8,7 @@ import ( "github.com/Azure/azure-container-networking/aitelemetry" "github.com/Azure/azure-container-networking/common" + "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/platform" "github.com/pkg/errors" ) @@ -112,6 +113,7 @@ func (reportMgr *ReportManager) SendReport(tb *TelemetryBuffer) error { if err == nil { // If write fails, try to re-establish connections as server/client if _, err = tb.Write(report); err != nil { + log.Printf("telemetry write failed:%v", err) tb.Cancel() } } diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index c5bfb960e4..91f31613d3 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -18,6 +18,7 @@ import ( "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/platform" + "github.com/Azure/azure-container-networking/processlock" "github.com/Azure/azure-container-networking/store" ) @@ -109,7 +110,11 @@ func (tb *TelemetryBuffer) StartServer() error { reportStr, err := read(conn) if err == nil { var tmp map[string]interface{} - json.Unmarshal(reportStr, &tmp) + err = json.Unmarshal(reportStr, &tmp) + if err != nil { + log.Logf("StartServer: unmarshal error:%v", err) + return + } if _, ok := tmp["CniSucceeded"]; ok { var cniReport CNIReport json.Unmarshal([]byte(reportStr), &cniReport) @@ -118,6 +123,8 @@ func (tb *TelemetryBuffer) StartServer() error { var aiMetric AIMetric json.Unmarshal([]byte(reportStr), &aiMetric) tb.data <- aiMetric + } else { + log.Logf("StartServer: default case:%+v...", tmp) } } else { var index int @@ -192,13 +199,14 @@ func read(conn net.Conn) (b []byte, err error) { return } -// Write - write to the file descriptor +// Write - write to the file descriptor. func (tb *TelemetryBuffer) Write(b []byte) (c int, err error) { buf := make([]byte, len(b)) - copy(b, buf) - b = append(buf, Delimiter) + copy(buf, b) + //nolint:makezero //keeping old code + buf = append(buf, Delimiter) w := bufio.NewWriter(tb.client) - c, err = w.Write(b) + c, err = w.Write(buf) if err == nil { err = w.Flush() } @@ -241,18 +249,35 @@ func push(x interface{}) { metadata, err := common.GetHostMetadata(metadataFile) if err != nil { log.Logf("Error getting metadata %v", err) - } else { - kvs, err := store.NewJsonFileStore(metadataFile) + + var lockclient processlock.Interface + lockclient, err = processlock.NewFileLock(metadataFile + store.LockExtension) + if err != nil { + log.Printf("Error initializing file lock:%v", err) + return + } + + var kvs store.KeyValueStore + kvs, err = store.NewJsonFileStore(metadataFile, lockclient) if err != nil { log.Printf("Error acuiring lock for writing metadata file: %v", err) } - kvs.Lock(true) + err = kvs.Lock(store.DefaultLockTimeout) + if err != nil { + log.Errorf("push: Not able to acquire lock:%v", err) + return + } + err = common.SaveHostMetadata(metadata, metadataFile) if err != nil { log.Logf("saving host metadata failed with :%v", err) } - kvs.Unlock(true) + + err = kvs.Unlock() + if err != nil { + log.Errorf("push: Not able to release lock:%v", err) + } } switch y := x.(type) { @@ -262,6 +287,8 @@ func push(x interface{}) { case AIMetric: SendAIMetric(y) + default: + log.Printf("Push fn: Default case:%+v", y) } } diff --git a/telemetry/telemetrybuffer_test.go b/telemetry/telemetrybuffer_test.go index f96750ab76..f226c6a87b 100644 --- a/telemetry/telemetrybuffer_test.go +++ b/telemetry/telemetrybuffer_test.go @@ -105,7 +105,7 @@ func TestWrite(t *testing.T) { return } require.NoError(t, err) - require.Equal(t, tt.want, got) + require.Equal(t, tt.want, got, "Expected:%d but got:%d", tt.want, got) }) } } diff --git a/testutils/store_mock.go b/testutils/store_mock.go index c626907c0a..5ddd66f9a5 100644 --- a/testutils/store_mock.go +++ b/testutils/store_mock.go @@ -21,39 +21,31 @@ type KeyValueStoreMock struct { GetModificationTimeError error } -func (store *KeyValueStoreMock) Read(key string, value interface{}) error { - return store.ReadError +func (mockst *KeyValueStoreMock) Read(key string, value interface{}) error { + return mockst.ReadError } -func (store *KeyValueStoreMock) Write(key string, value interface{}) error { - return store.WriteError +func (mockst *KeyValueStoreMock) Write(key string, value interface{}) error { + return mockst.WriteError } -func (store *KeyValueStoreMock) Flush() error { - return store.FlushError +func (mockst *KeyValueStoreMock) Flush() error { + return mockst.FlushError } -func (store *KeyValueStoreMock) Lock(block bool) error { - return store.LockError +func (mockst *KeyValueStoreMock) Lock(time.Duration) error { + return mockst.LockError } -func (store *KeyValueStoreMock) Unlock(forceUnlock bool) error { - return store.UnlockError +func (mockst *KeyValueStoreMock) Unlock() error { + return mockst.UnlockError } -func (store *KeyValueStoreMock) GetModificationTime() (time.Time, error) { - if store.GetModificationTimeError != nil { - return time.Time{}, store.GetModificationTimeError +func (mockst *KeyValueStoreMock) GetModificationTime() (time.Time, error) { + if mockst.GetModificationTimeError != nil { + return time.Time{}, mockst.GetModificationTimeError } - return store.ModificationTime, nil + return mockst.ModificationTime, nil } -func (store *KeyValueStoreMock) GetLockFileModificationTime() (time.Time, error) { - return time.Now(), nil -} - -func (store *KeyValueStoreMock) GetLockFileName() string { - return "" -} - -func (store *KeyValueStoreMock) Remove() {} +func (mockst *KeyValueStoreMock) Remove() {}