diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index d85b9d2e8..047927c16 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -583,11 +583,22 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { log.Trace("FileCache::RenameDir : src=%s, dst=%s", options.Src, options.Dst) // get a list of source objects form both cloud and cache - objectNames, err := fc.listAllObjects(options.Src) + // cloud + var cloudObjects []string + cloudObjects, err := fc.listCloudObjects(options.Src) + if err != nil { + log.Err("FileCache::RenameDir : %s listCloudObjects failed. Here's why: %v", options.Src, err) + return err + } + // cache + var localObjects []string + localObjects, err = fc.listCachedObjects(options.Src) if err != nil { - log.Err("FileCache::RenameDir : %s listAllObjects failed. Here's why: %v", options.Src, err) + log.Err("FileCache::RenameDir : %s listCachedObjects failed. Here's why: %v", options.Src, err) return err } + // combine the lists + objectNames := combineLists(cloudObjects, localObjects) // add object destinations, and sort the result for _, srcName := range objectNames { @@ -622,15 +633,20 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { newPath := strings.Replace(path, localSrcPath, localDstPath, 1) if !d.IsDir() { log.Debug("FileCache::RenameDir : Renaming local file %s -> %s", path, newPath) + // get object names + srcName := fc.getObjectName(path) + dstName := fc.getObjectName(newPath) // get locks - sflock := fc.fileLocks.Get(fc.getObjectName(path)) - dflock := fc.fileLocks.Get(fc.getObjectName(newPath)) + sflock := fc.fileLocks.Get(srcName) + dflock := fc.fileLocks.Get(dstName) // complete local rename err := fc.renameCachedFile(path, newPath, sflock, dflock) if err != nil { // there's really not much we can do to handle the error, so just log it log.Err("FileCache::RenameDir : %s file rename failed. Directory state is inconsistent!", path) } + // handle should be updated regardless, for consistency on upload + fc.renameOpenHandles(srcName, dstName, sflock, dflock) } else { log.Debug("FileCache::RenameDir : Creating local destination directory %s", newPath) // create the new directory @@ -661,26 +677,17 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error { fc.policy.CachePurge(directoriesToPurge[i], nil) } - return nil -} - -func (fc *FileCache) listAllObjects(prefix string) (objectNames []string, err error) { - // get cloud objects - var cloudObjects []string - cloudObjects, err = fc.listCloudObjects(prefix) - if err != nil { - return - } - // get local / cached objects - var localObjects []string - localObjects, err = fc.listCachedObjects(prefix) - if err != nil { - return + // update any lazy open handles (which are not in the local listing) + for _, srcName := range cloudObjects { + dstName := strings.Replace(srcName, options.Src, options.Dst, 1) + // get locks + sflock := fc.fileLocks.Get(srcName) + dflock := fc.fileLocks.Get(dstName) + // update any remaining open handles + fc.renameOpenHandles(srcName, dstName, sflock, dflock) } - // combine the lists - objectNames = combineLists(cloudObjects, localObjects) - return + return nil } // recursively list all objects in the container at the given prefix / directory @@ -1563,21 +1570,8 @@ func (fc *FileCache) RenameFile(options internal.RenameFileOptions) error { return localRenameErr } - if sflock.Count() > 0 { - // update any open handles to the file with its new name - handlemap.GetHandles().Range(func(key, value any) bool { - handle := value.(*handlemap.Handle) - if handle.Path == options.Src { - handle.Path = options.Dst - } - return true - }) - // copy the number of open handles to the new name - for sflock.Count() > 0 { - sflock.Dec() - dflock.Inc() - } - } + // rename open handles + fc.renameOpenHandles(options.Src, options.Dst, sflock, dflock) return nil } @@ -1608,6 +1602,25 @@ func (fc *FileCache) renameCachedFile(localSrcPath, localDstPath string, sflock, return nil } +func (fc *FileCache) renameOpenHandles(srcName, dstName string, sflock, dflock *common.LockMapItem) { + // update open handles + if sflock.Count() > 0 { + // update any open handles to the file with its new name + handlemap.GetHandles().Range(func(key, value any) bool { + handle := value.(*handlemap.Handle) + if handle.Path == srcName { + handle.Path = dstName + } + return true + }) + // copy the number of open handles to the new name + for sflock.Count() > 0 { + sflock.Dec() + dflock.Inc() + } + } +} + // TruncateFile: Update the file with its new size. func (fc *FileCache) TruncateFile(options internal.TruncateFileOptions) error { log.Trace("FileCache::TruncateFile : name=%s, size=%d", options.Name, options.Size) diff --git a/component/file_cache/file_cache_test.go b/component/file_cache/file_cache_test.go index 9543b7a1b..8a2e42d6a 100644 --- a/component/file_cache/file_cache_test.go +++ b/component/file_cache/file_cache_test.go @@ -601,6 +601,154 @@ func (suite *fileCacheTestSuite) TestRenameDir() { } } +// Combined test for all three cases +func (suite *fileCacheTestSuite) TestRenameDirOpenFile() { + defer suite.cleanupTest() + + // Setup + srcDir := "src" + dstDir := "dst" + suite.fileCache.CreateDir(internal.CreateDirOptions{Name: srcDir, Mode: 0777}) + // + // Case 1 + case1src := srcDir + "/fileCase1" + case1dst := dstDir + "/fileCase1" + // create file in cloud + tempHandle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: case1src, Mode: 0777}) + suite.loopback.CloseFile(internal.CloseFileOptions{Handle: tempHandle}) + // open file for writing + handle1, err := suite.fileCache.OpenFile(internal.OpenFileOptions{Name: case1src, Flags: os.O_RDWR, Mode: 0777}) + suite.assert.NoError(err) + handlemap.Add(handle1) + // Path should not be in the file cache (lazy open) + suite.assert.NoFileExists(suite.cache_path + "/" + case1src) + // + // Case 2 + case2src := srcDir + "/fileCase2" + case2dst := dstDir + "/fileCase2" + // create source file + handle2, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: case2src, Mode: 0666}) + suite.assert.NoError(err) + handlemap.Add(handle2) + // Path should only be in the file cache + suite.assert.FileExists(suite.cache_path + "/" + case2src) + suite.assert.NoFileExists(suite.fake_storage_path + "/" + case2src) + // + // Case 3 + case3src := srcDir + "/fileCase3" + case3dst := dstDir + "/fileCase3" + // create source file + handle3, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: case3src, Mode: 0666}) + handlemap.Add(handle3) + // Path should be in the file cache + suite.assert.FileExists(suite.cache_path + "/" + case3src) + // write and flush to cloud + initialData := []byte("initialData") + n, err := suite.fileCache.WriteFile(internal.WriteFileOptions{ + Handle: handle3, + Data: initialData, + }) + suite.assert.NoError(err) + suite.assert.Equal(len(initialData), n) + err = suite.fileCache.FlushFile(internal.FlushFileOptions{ + Handle: handle3, + }) + suite.assert.NoError(err) + suite.assert.FileExists(filepath.Join(suite.fake_storage_path, case3src)) + + // Test: Rename the directory + err = suite.fileCache.RenameDir(internal.RenameDirOptions{Src: srcDir, Dst: dstDir}) + suite.assert.NoError(err) + // + // Case 1 + // rename succeeded in cloud + suite.assert.NoFileExists(filepath.Join(suite.fake_storage_path, case1src)) + suite.assert.FileExists(filepath.Join(suite.fake_storage_path, case1dst)) + // still in lazy open state + suite.assert.NoFileExists(filepath.Join(suite.cache_path, case1src)) + suite.assert.NoFileExists(filepath.Join(suite.cache_path, case1dst)) + // + // Case 2 + // local rename succeeded + suite.assert.NoFileExists(filepath.Join(suite.cache_path, case2src)) + suite.assert.FileExists(filepath.Join(suite.cache_path, case2dst)) + // file still in case 2 + suite.assert.NoFileExists(filepath.Join(suite.fake_storage_path, case2src)) + suite.assert.NoFileExists(filepath.Join(suite.fake_storage_path, case2dst)) + // + // Case 3 + // local rename succeeded + suite.assert.NoFileExists(filepath.Join(suite.cache_path, case3src)) + suite.assert.FileExists(filepath.Join(suite.cache_path, case3dst)) + // cloud rename succeeded + suite.assert.NoFileExists(filepath.Join(suite.fake_storage_path, case3src)) + suite.assert.FileExists(filepath.Join(suite.fake_storage_path, case3dst)) + + // Test: write new data + data := []byte("newdata") + // + // Case 1 + // write to file handle + n, err = suite.fileCache.WriteFile(internal.WriteFileOptions{ + Handle: handle1, + Data: data, + }) + suite.assert.NoError(err) + suite.assert.Equal(len(data), n) + // open is completed (file is downloaded), and writes go to the correct file + suite.assert.NoFileExists(filepath.Join(suite.cache_path, case1src)) + suite.assert.FileExists(filepath.Join(suite.cache_path, case1dst)) + // + // Case 2 + n, err = suite.fileCache.WriteFile(internal.WriteFileOptions{ + Handle: handle2, + Data: data, + }) + suite.assert.NoError(err) + suite.assert.Equal(len(data), n) + // + // Case 3 + n, err = suite.fileCache.WriteFile(internal.WriteFileOptions{ + Handle: handle3, + Data: data, + Offset: int64(len(initialData)), + }) + suite.assert.NoError(err) + suite.assert.Equal(len(data), n) + + // Test: Close handle + // + // Case 1 + err = suite.fileCache.CloseFile(internal.CloseFileOptions{ + Handle: handle1, + }) + suite.assert.NoError(err) + // check cloud data + dstData, err := os.ReadFile(path.Join(suite.fake_storage_path, case1dst)) + suite.assert.NoError(err) + suite.assert.Equal(data, dstData) + // + // Case 2 + err = suite.fileCache.CloseFile(internal.CloseFileOptions{ + Handle: handle2, + }) + suite.assert.NoError(err) + // check cloud data + dstData, err = os.ReadFile(path.Join(suite.fake_storage_path, case2dst)) + suite.assert.NoError(err) + suite.assert.Equal(data, dstData) + // + // Case 3 + err = suite.fileCache.CloseFile(internal.CloseFileOptions{ + Handle: handle3, + }) + suite.assert.NoError(err) + // check cloud data + dstData, err = os.ReadFile(path.Join(suite.fake_storage_path, case3dst)) + suite.assert.NoError(err) + suite.assert.Equal(append(initialData, data...), dstData) +} + func (suite *fileCacheTestSuite) TestCreateFile() { defer suite.cleanupTest() // Default is to not create empty files on create file to support immutable storage.