diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 9550c66e4..7241432d4 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -410,31 +410,38 @@ func (fc *FileCache) invalidateDirectory(name string) { log.Trace("FileCache::invalidateDirectory : %s", name) localPath := filepath.Join(fc.tmpPath, name) - _, err := os.Stat(localPath) - if os.IsNotExist(err) { - log.Info("FileCache::invalidateDirectory : %s does not exist in local cache.", name) - return - } else if err != nil { - log.Debug("FileCache::invalidateDirectory : %s stat err [%s].", name, err.Error()) - return - } // TODO : wouldn't this cause a race condition? a thread might get the lock before we purge - and the file would be non-existent // WalkDir goes through the tree in lexical order so 'dir' always comes before 'dir/file' - // Save the paths in lexical order and delete them in reverse order so folders are deleted after their children - var pathsToPurge []string - err = filepath.WalkDir(localPath, func(path string, d fs.DirEntry, err error) error { + var directoriesToPurge []string + err := filepath.WalkDir(localPath, func(path string, d fs.DirEntry, err error) error { if err == nil && d != nil { - pathsToPurge = append(pathsToPurge, path) + if !d.IsDir() { + log.Debug("FileCache::invalidateDirectory : removing file %s from cache", path) + fc.policy.CachePurge(path) + } else { + // remember to delete the directory later (after its children) + directoriesToPurge = append(directoriesToPurge, path) + } + } else { + // stat(localPath) failed. err is the one returned by stat + // documentation: https://pkg.go.dev/io/fs#WalkDirFunc + if os.IsNotExist(err) { + log.Info("FileCache::invalidateDirectory : %s does not exist in local cache.", name) + } else if err != nil { + log.Warn("FileCache::invalidateDirectory : %s stat err [%s].", name, err.Error()) + } } return nil }) - for i := len(pathsToPurge) - 1; i >= 0; i-- { - log.Debug("FileCache::invalidateDirectory : %s getting removed from cache", pathsToPurge[i]) - fc.policy.CachePurge(pathsToPurge[i]) + + // clean up leftover source directories in reverse order + for i := len(directoriesToPurge) - 1; i >= 0; i-- { + log.Debug("FileCache::invalidateDirectory : removing dir %s from cache", directoriesToPurge[i]) + fc.policy.CachePurge(directoriesToPurge[i]) } if err != nil { - log.Debug("FileCache::invalidateDirectory : Failed to iterate directory %s [%s].", localPath, err.Error()) + log.Debug("FileCache::invalidateDirectory : Failed to walk directory %s. Here's why: %v", localPath, err) return } } @@ -454,7 +461,7 @@ func (fc *FileCache) DeleteDir(options internal.DeleteDirOptions) error { // rest api delete will fail while we still need to cleanup the local cache for the same } - go fc.invalidateDirectory(options.Name) + fc.invalidateDirectory(options.Name) return err } @@ -569,7 +576,7 @@ func (fc *FileCache) IsDirEmpty(options internal.IsDirEmptyOptions) bool { return fc.NextComponent().IsDirEmpty(options) } -// RenameDir: Recursively invalidate the source directory and its children +// RenameDir: Recursively move the source directory func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { log.Trace("FileCache::RenameDir : src=%s, dst=%s", options.Src, options.Dst) @@ -579,9 +586,53 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { return err } - go fc.invalidateDirectory(options.Src) - // TLDR: Dst is guaranteed to be non-existent or empty. - // Note: We do not need to invalidate Dst due to the logic in our FUSE connector, see comments there. + // move the files in local storage + localSrcPath := filepath.Join(fc.tmpPath, options.Src) + localDstPath := filepath.Join(fc.tmpPath, options.Dst) + // WalkDir goes through the tree in lexical order so 'dir' always comes before 'dir/file' + var directoriesToPurge []string + _ = filepath.WalkDir(localSrcPath, func(path string, d fs.DirEntry, err error) error { + if err == nil && d != nil { + newPath := strings.Replace(path, localSrcPath, localDstPath, 1) + if !d.IsDir() { + log.Debug("FileCache::RenameDir : Renaming local file %s -> %s", path, newPath) + fc.renameCachedFile(path, newPath) + } else { + log.Debug("FileCache::RenameDir : Creating local destination directory %s", newPath) + // create the new directory + mkdirErr := os.MkdirAll(newPath, fc.defaultPermission) + if mkdirErr != nil { + // log any error but do nothing about it + log.Warn("FileCache::RenameDir : Failed to created directory %s. Here's why: %v", newPath, mkdirErr) + } + // remember to delete the src directory later (after its contents are deleted) + directoriesToPurge = append(directoriesToPurge, path) + } + } else { + // stat(localPath) failed. err is the one returned by stat + // documentation: https://pkg.go.dev/io/fs#WalkDirFunc + if os.IsNotExist(err) { + // none of the files that were moved actually exist in local storage + log.Info("FileCache::RenameDir : %s does not exist in local cache.", options.Src) + } else if err != nil { + log.Warn("FileCache::RenameDir : %s stat err [%v].", options.Src, err) + } + } + return nil + }) + + // clean up leftover source directories in reverse order + for i := len(directoriesToPurge) - 1; i >= 0; i-- { + log.Debug("FileCache::RenameDir : Removing local directory %s", directoriesToPurge[i]) + fc.policy.CachePurge(directoriesToPurge[i]) + } + + if fc.cacheTimeout == 0 { + // delete destination path immediately + log.Info("FileCache::RenameDir : Timeout is zero, so removing local destination %s", options.Dst) + go fc.invalidateDirectory(options.Dst) + } + return nil } @@ -1265,39 +1316,27 @@ func (fc *FileCache) RenameFile(options internal.RenameFileOptions) error { // if we do not perform rename operation locally and those destination files are cached then next time they are read // we will be serving the wrong content (as we did not rename locally, we still be having older destination files with // stale content). We either need to remove dest file as well from cache or just run rename to replace the content. - err = os.Rename(localSrcPath, localDstPath) - if err != nil && !os.IsNotExist(err) { - log.Err("FileCache::RenameFile : %s failed to rename local file %s [%s]", localSrcPath, err.Error()) - } + fc.renameCachedFile(localSrcPath, localDstPath) - if err != nil { - // If there was a problem in local rename then delete the destination file - // it might happen that dest file was already there and local rename failed - // so deleting local dest file ensures next open of that will get the updated file from container - err = deleteFile(localDstPath) - if err != nil && !os.IsNotExist(err) { - log.Err("FileCache::RenameFile : %s failed to delete local file %s [%s]", localDstPath, err.Error()) - } - - fc.policy.CachePurge(localDstPath) - } + return nil +} - err = deleteFile(localSrcPath) - if err != nil && !os.IsNotExist(err) { - log.Err("FileCache::RenameFile : %s failed to delete local file %s [%s]", localSrcPath, err.Error()) +func (fc *FileCache) renameCachedFile(localSrcPath string, localDstPath string) { + err := os.Rename(localSrcPath, localDstPath) + if err != nil { + // if rename fails, we just delete the source file anyway + log.Warn("FileCache::RenameDir : Failed to rename local file %s -> %s. Here's why: %v", localSrcPath, localDstPath, err) + } else { + fc.policy.CacheValid(localDstPath) } - + // delete the source from our cache policy + // this will also delete the source file from local storage (if rename failed) fc.policy.CachePurge(localSrcPath) if fc.cacheTimeout == 0 { // Destination file needs to be deleted immediately - fc.policy.CachePurge(localDstPath) - } else { - // Add destination file to cache, it will be removed on timeout - fc.policy.CacheValid(localDstPath) + go fc.policy.CachePurge(localDstPath) } - - return nil } // TruncateFile: Update the file with its new size. diff --git a/component/file_cache/file_cache_test.go b/component/file_cache/file_cache_test.go index 3393ed3ff..c2fa0d296 100644 --- a/component/file_cache/file_cache_test.go +++ b/component/file_cache/file_cache_test.go @@ -561,11 +561,18 @@ func (suite *fileCacheTestSuite) TestRenameDir() { suite.assert.False(suite.fileCache.policy.IsCached(src)) // Directory should not be cached // wait for asynchronous deletion time.Sleep(1 * time.Second) - // directory should not exist in local filesystem + // src directory should not exist in local filesystem fInfo, err := os.Stat(filepath.Join(suite.cache_path, src)) suite.assert.Nil(fInfo) suite.assert.Error(err) suite.assert.True(os.IsNotExist(err)) + // dst directory should exist and have contents from src + dstEntries, err := os.ReadDir(filepath.Join(suite.cache_path, dst)) + suite.assert.NoError(err) + suite.assert.Len(dstEntries, 5) + for i, entry := range dstEntries { + suite.assert.Equal("file"+strconv.Itoa(i), entry.Name()) + } } func (suite *fileCacheTestSuite) TestCreateFile() { diff --git a/component/file_cache/lru_policy.go b/component/file_cache/lru_policy.go index fc803a556..7048ac2dc 100644 --- a/component/file_cache/lru_policy.go +++ b/component/file_cache/lru_policy.go @@ -222,21 +222,17 @@ func (p *lruPolicy) asyncCacheValid() { } func (p *lruPolicy) cacheValidate(name string) { - var node *lruNode = nil - val, found := p.nodeMap.Load(name) - if !found { - node = &lruNode{ - name: name, - next: nil, - prev: nil, - usage: 0, - deleted: false, - } - p.nodeMap.Store(name, node) - } else { - node = val.(*lruNode) - } + // get existing entry, or if it doesn't exist then + // write a new one and return it + val, _ := p.nodeMap.LoadOrStore(name, &lruNode{ + name: name, + next: nil, + prev: nil, + usage: 0, + deleted: false, + }) + node := val.(*lruNode) p.Lock() defer p.Unlock() @@ -444,7 +440,7 @@ func (p *lruPolicy) deleteItem(name string) { return } - // There are no open handles for this file so its safe to remove this + // There are no open handles for this file so it's safe to remove this // Check if the file exists first, since this is often the second time we're calling deleteFile _, err := os.Stat(name) if err != nil && os.IsNotExist(err) {