From 7e442d514d12780279fa5e0265468eca8ddfd8ca Mon Sep 17 00:00:00 2001 From: Evan Baker Date: Tue, 9 Jan 2024 19:47:59 -0600 Subject: [PATCH] backport: fix: add contexts to clean up fsnotify goroutines (#2450) Signed-off-by: Evan Baker --- cns/fsnotify/fsnotify.go | 75 +++++++++++++++++----------------------- cns/service/main.go | 16 +++++---- go.mod | 3 +- go.sum | 12 +++++++ 4 files changed, 56 insertions(+), 50 deletions(-) diff --git a/cns/fsnotify/fsnotify.go b/cns/fsnotify/fsnotify.go index cd6bff28dc..32bb314808 100644 --- a/cns/fsnotify/fsnotify.go +++ b/cns/fsnotify/fsnotify.go @@ -3,6 +3,7 @@ package fsnotify import ( "context" "io" + "io/fs" "os" "sync" "time" @@ -11,6 +12,7 @@ import ( "github.com/fsnotify/fsnotify" "github.com/pkg/errors" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) type releaseIPsClient interface { @@ -27,17 +29,18 @@ type watcher struct { } // Create the AsyncDelete watcher. -func New(cli releaseIPsClient, path string, logger *zap.Logger) *watcher { //nolint +func New(cli releaseIPsClient, path string, logger *zap.Logger) (*watcher, error) { //nolint // Add directory where intended deletes are kept - if err := os.Mkdir(path, 0o755); err != nil { //nolint + if err := os.Mkdir(path, 0o755); err != nil && !errors.Is(err, fs.ErrExist) { //nolint logger.Error("error making directory", zap.String("path", path), zap.Error(err)) + return nil, errors.Wrapf(err, "failed to create dir %s", path) } return &watcher{ cli: cli, path: path, log: logger, pendingDelete: make(map[string]struct{}), - } + }, nil } // releaseAll locks and iterates the pendingDeletes map and calls CNS to @@ -111,10 +114,29 @@ func (w *watcher) watchFS(ctx context.Context) error { } defer watcher.Close() + // Start watching the directory, so that we don't miss any events. err = watcher.Add(w.path) if err != nil { w.log.Error("failed to add path to fsnotify watcher", zap.String("path", w.path), zap.Error(err)) + return errors.Wrap(err, "failed to add path to fsnotify watcher") + } + // List the directory and creates synthetic events for any existing items. + w.log.Info("listing directory", zap.String("path", w.path)) + dirContents, err := os.ReadDir(w.path) + if err != nil { + w.log.Error("error reading deleteID directory", zap.String("path", w.path), zap.Error(err)) + return errors.Wrapf(err, "failed to read %s", w.path) + } + if len(dirContents) == 0 { + w.log.Info("no missed deletes found") } + w.lock.Lock() + for _, file := range dirContents { + w.log.Info("adding missed delete from file", zap.String("name", file.Name())) + w.pendingDelete[file.Name()] = struct{}{} + } + w.lock.Unlock() + // Start listening for events. w.log.Info("listening for events from fsnotify watcher") for { @@ -139,51 +161,18 @@ func (w *watcher) watchFS(ctx context.Context) error { } } -// readFS lists the directory and enqueues any missed deletes that are already -// present on-disk. -func (w *watcher) readFS() error { - w.log.Info("listing directory", zap.String("path", w.path)) - dirContents, err := os.ReadDir(w.path) - if err != nil { - w.log.Error("error reading deleteID directory", zap.String("path", w.path), zap.Error(err)) - return errors.Wrapf(err, "failed to read %s", w.path) - } - if len(dirContents) == 0 { - w.log.Info("no missed deletes found") - return nil - } - w.lock.Lock() - for _, file := range dirContents { - w.log.Info("adding missed delete from file", zap.String("name", file.Name())) - w.pendingDelete[file.Name()] = struct{}{} - } - w.lock.Unlock() - return nil -} - -// WatchFS starts the filesystem watcher to handle async Pod deletes. +// Start starts the filesystem watcher to handle async Pod deletes. // Blocks until the context is closed; returns underlying fsnotify errors // if something goes fatally wrong. func (w *watcher) Start(ctx context.Context) error { - errs := make(chan error) + g, groupCtx := errgroup.WithContext(ctx) // Start watching for enqueued missed deletes so that we process them as soon as they arrive. - go func(errs chan<- error) { - errs <- w.watchPendingDelete(ctx) - }(errs) - + g.Go(func() error { return w.watchPendingDelete(groupCtx) }) // Start watching for changes to the filesystem so that we don't miss any async deletes. - go func(errs chan<- error) { - errs <- w.watchFS(ctx) - }(errs) - - // Read the directory to enqueue any missed deletes that are already present on-disk. - if err := w.readFS(); err != nil { - return err - } - - // block until one of the goroutines returns an error - err := <-errs - return err + g.Go(func() error { return w.watchFS(groupCtx) }) + // the first error from the errgroup will trigger context cancellation for other goroutines in the group. + // this will block until all goroutines complete and return the first error. + return g.Wait() //nolint:wrapcheck // ignore } // AddFile creates new file using the containerID as name diff --git a/cns/service/main.go b/cns/service/main.go index b12436c2a5..561cafe515 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -53,7 +53,7 @@ import ( localtls "github.com/Azure/azure-container-networking/server/tls" "github.com/Azure/azure-container-networking/store" "github.com/Azure/azure-container-networking/telemetry" - "github.com/avast/retry-go/v3" + "github.com/avast/retry-go/v4" "github.com/pkg/errors" "go.uber.org/zap" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -804,15 +804,19 @@ func main() { z.Error("failed to create cnsclient", zap.Error(err)) } go func() { - for { + _ = retry.Do(func() error { z.Info("starting fsnotify watcher to process missed Pod deletes") - w := fsnotify.New(cnsclient, cnsconfig.AsyncPodDeletePath, z) + w, err := fsnotify.New(cnsclient, cnsconfig.AsyncPodDeletePath, z) + if err != nil { + z.Error("failed to create fsnotify watcher", zap.Error(err)) + return errors.Wrap(err, "failed to create fsnotify watcher, will retry") + } if err := w.Start(rootCtx); err != nil { z.Error("failed to start fsnotify watcher, will retry", zap.Error(err)) - time.Sleep(time.Minute) - continue + return errors.Wrap(err, "failed to start fsnotify watcher, will retry") } - } + return nil + }, retry.DelayType(retry.BackOffDelay), retry.Attempts(0), retry.Context(rootCtx)) // infinite cancellable exponential backoff retrier }() } diff --git a/go.mod b/go.mod index bcd2b730d2..397e58e005 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/Azure/azure-container-networking -go 1.20 +go 1.21 require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.3 @@ -34,6 +34,7 @@ require ( github.com/stretchr/testify v1.8.2 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 + golang.org/x/sync v0.1.0 golang.org/x/sys v0.13.0 google.golang.org/grpc v1.52.0 google.golang.org/protobuf v1.28.1 diff --git a/go.sum b/go.sum index de72537791..bfe0fe461f 100644 --- a/go.sum +++ b/go.sum @@ -86,6 +86,7 @@ github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:C github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/avast/retry-go/v3 v3.1.1 h1:49Scxf4v8PmiQ/nY0aY3p0hDueqSmc7++cBbtiDGu2g= github.com/avast/retry-go/v3 v3.1.1/go.mod h1:6cXRK369RpzFL3UQGqIUp9Q7GDrams+KsYWrfNA1/nQ= @@ -93,6 +94,7 @@ github.com/avast/retry-go/v4 v4.3.4 h1:pHLkL7jvCvP317I8Ge+Km2Yhntv3SdkJm7uekkqbK github.com/avast/retry-go/v4 v4.3.4/go.mod h1:rv+Nla6Vk3/ilU0H51VHddWHiwimzX66yZ0JT6T+UvE= github.com/aws/aws-sdk-go v1.15.11/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -246,6 +248,7 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dnaeon/go-vcr v1.0.1/go.mod h1:aBB1+wY4s93YsC3HHjMBMrwTj2R9FHDzUr9KyGc8n1E= github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= +github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko= github.com/docker/cli v0.0.0-20191017083524-a8ff7f821017/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= github.com/docker/distribution v0.0.0-20190905152932-14b96e55d84c/go.mod h1:0+TTO4EOBfRPhZXAeF1Vu+W3hHZ8eLp8PgKVZlcvtFY= github.com/docker/distribution v2.7.1-0.20190205005809-0d3efadf0154+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= @@ -291,6 +294,7 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= +github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= @@ -317,6 +321,7 @@ github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/zapr v1.2.3 h1:a9vnzlIBPQBBkeaR9IuMUfmVOrQlkoC4YfPoFkX3T7A= +github.com/go-logr/zapr v1.2.3/go.mod h1:eIauM6P8qSvTw5o2ez6UEAfGjQKrxQTl5EoK+Qa2oG4= github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+35s3my2LFTysnkMfxsJBAMHj/DoqoB9knIWoYG/Vk0= github.com/go-openapi/jsonpointer v0.19.2/go.mod h1:3akKfEdA7DF1sugOqz1dVQHBcuDBPKZGEoHC/NkiQRg= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= @@ -355,6 +360,7 @@ github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzq github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt/v4 v4.2.0 h1:besgBTC8w8HjP6NzQdxwKH9Z5oQMZ24ThTrHp3cZ8eU= +github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -513,6 +519,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -596,6 +603,7 @@ github.com/onsi/ginkgo v1.12.0 h1:Iw5WCbBcaAAd0fpRb1c9r5YCylv4XDoCSigm1zLevwU= github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg= github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c= github.com/onsi/ginkgo/v2 v2.6.0 h1:9t9b9vRUbFq3C4qKFCGkVuq/fIHji802N1nrtkh1mNc= +github.com/onsi/ginkgo/v2 v2.6.0/go.mod h1:63DOGlLAH8+REH8jUGdL3YpCpu7JODesutUjdENfUAc= github.com/onsi/gomega v1.10.0 h1:Gwkk+PTu/nfOwNMtUB/mRUv0X7ewW5dO4AERT1ThVKo= github.com/onsi/gomega v1.10.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/opencontainers/go-digest v0.0.0-20170106003457-a6d0ee40d420/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= @@ -683,6 +691,7 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4= @@ -811,6 +820,7 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= @@ -943,6 +953,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=