Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 32 additions & 43 deletions cns/fsnotify/fsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fsnotify
import (
"context"
"io"
"io/fs"
"os"
"sync"
"time"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type releaseIPsClient interface {
Expand All @@ -27,17 +29,18 @@ type watcher struct {
}

// Create the AsyncDelete watcher.
func New(cli releaseIPsClient, path string, logger *zap.Logger) *watcher { //nolint
func New(cli releaseIPsClient, path string, logger *zap.Logger) (*watcher, error) { //nolint
// Add directory where intended deletes are kept
if err := os.Mkdir(path, 0o755); err != nil { //nolint
if err := os.Mkdir(path, 0o755); err != nil && !errors.Is(err, fs.ErrExist) { //nolint
logger.Error("error making directory", zap.String("path", path), zap.Error(err))
return nil, errors.Wrapf(err, "failed to create dir %s", path)
}
return &watcher{
cli: cli,
path: path,
log: logger,
pendingDelete: make(map[string]struct{}),
}
}, nil
}

// releaseAll locks and iterates the pendingDeletes map and calls CNS to
Expand Down Expand Up @@ -111,10 +114,29 @@ func (w *watcher) watchFS(ctx context.Context) error {
}
defer watcher.Close()

// Start watching the directory, so that we don't miss any events.
err = watcher.Add(w.path)
if err != nil {
w.log.Error("failed to add path to fsnotify watcher", zap.String("path", w.path), zap.Error(err))
return errors.Wrap(err, "failed to add path to fsnotify watcher")
}
// List the directory and creates synthetic events for any existing items.
w.log.Info("listing directory", zap.String("path", w.path))
dirContents, err := os.ReadDir(w.path)
if err != nil {
w.log.Error("error reading deleteID directory", zap.String("path", w.path), zap.Error(err))
return errors.Wrapf(err, "failed to read %s", w.path)
}
if len(dirContents) == 0 {
w.log.Info("no missed deletes found")
}
w.lock.Lock()
for _, file := range dirContents {
w.log.Info("adding missed delete from file", zap.String("name", file.Name()))
w.pendingDelete[file.Name()] = struct{}{}
}
w.lock.Unlock()

// Start listening for events.
w.log.Info("listening for events from fsnotify watcher")
for {
Expand All @@ -139,51 +161,18 @@ func (w *watcher) watchFS(ctx context.Context) error {
}
}

// readFS lists the directory and enqueues any missed deletes that are already
// present on-disk.
func (w *watcher) readFS() error {
w.log.Info("listing directory", zap.String("path", w.path))
dirContents, err := os.ReadDir(w.path)
if err != nil {
w.log.Error("error reading deleteID directory", zap.String("path", w.path), zap.Error(err))
return errors.Wrapf(err, "failed to read %s", w.path)
}
if len(dirContents) == 0 {
w.log.Info("no missed deletes found")
return nil
}
w.lock.Lock()
for _, file := range dirContents {
w.log.Info("adding missed delete from file", zap.String("name", file.Name()))
w.pendingDelete[file.Name()] = struct{}{}
}
w.lock.Unlock()
return nil
}

// WatchFS starts the filesystem watcher to handle async Pod deletes.
// Start starts the filesystem watcher to handle async Pod deletes.
// Blocks until the context is closed; returns underlying fsnotify errors
// if something goes fatally wrong.
func (w *watcher) Start(ctx context.Context) error {
errs := make(chan error)
g, groupCtx := errgroup.WithContext(ctx)
// Start watching for enqueued missed deletes so that we process them as soon as they arrive.
go func(errs chan<- error) {
errs <- w.watchPendingDelete(ctx)
}(errs)

g.Go(func() error { return w.watchPendingDelete(groupCtx) })
// Start watching for changes to the filesystem so that we don't miss any async deletes.
go func(errs chan<- error) {
errs <- w.watchFS(ctx)
}(errs)

// Read the directory to enqueue any missed deletes that are already present on-disk.
if err := w.readFS(); err != nil {
return err
}

// block until one of the goroutines returns an error
err := <-errs
return err
g.Go(func() error { return w.watchFS(groupCtx) })
// the first error from the errgroup will trigger context cancellation for other goroutines in the group.
// this will block until all goroutines complete and return the first error.
return g.Wait() //nolint:wrapcheck // ignore
}

// AddFile creates new file using the containerID as name
Expand Down
16 changes: 10 additions & 6 deletions cns/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import (
localtls "github.com/Azure/azure-container-networking/server/tls"
"github.com/Azure/azure-container-networking/store"
"github.com/Azure/azure-container-networking/telemetry"
"github.com/avast/retry-go/v3"
"github.com/avast/retry-go/v4"
"github.com/pkg/errors"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -840,15 +840,19 @@ func main() {
z.Error("failed to create cnsclient", zap.Error(err))
}
go func() {
for {
_ = retry.Do(func() error {
z.Info("starting fsnotify watcher to process missed Pod deletes")
w := fsnotify.New(cnsclient, cnsconfig.AsyncPodDeletePath, z)
w, err := fsnotify.New(cnsclient, cnsconfig.AsyncPodDeletePath, z)
if err != nil {
z.Error("failed to create fsnotify watcher", zap.Error(err))
return errors.Wrap(err, "failed to create fsnotify watcher, will retry")
}
if err := w.Start(rootCtx); err != nil {
z.Error("failed to start fsnotify watcher, will retry", zap.Error(err))
time.Sleep(time.Minute)
continue
return errors.Wrap(err, "failed to start fsnotify watcher, will retry")
}
}
return nil
}, retry.DelayType(retry.BackOffDelay), retry.Attempts(0), retry.Context(rootCtx)) // infinite cancellable exponential backoff retrier
}()
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ require (
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/golang-jwt/jwt/v5 v5.0.0 // indirect
github.com/rootless-containers/rootlesskit v1.1.1 // indirect
golang.org/x/sync v0.5.0 // indirect
)

replace (
Expand Down
Loading