Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 50 additions & 37 deletions component/file_cache/file_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,11 +583,22 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error {
log.Trace("FileCache::RenameDir : src=%s, dst=%s", options.Src, options.Dst)

// get a list of source objects form both cloud and cache
objectNames, err := fc.listAllObjects(options.Src)
// cloud
var cloudObjects []string
cloudObjects, err := fc.listCloudObjects(options.Src)
if err != nil {
log.Err("FileCache::RenameDir : %s listCloudObjects failed. Here's why: %v", options.Src, err)
return err
}
// cache
var localObjects []string
localObjects, err = fc.listCachedObjects(options.Src)
if err != nil {
log.Err("FileCache::RenameDir : %s listAllObjects failed. Here's why: %v", options.Src, err)
log.Err("FileCache::RenameDir : %s listCachedObjects failed. Here's why: %v", options.Src, err)
return err
}
// combine the lists
objectNames := combineLists(cloudObjects, localObjects)

// add object destinations, and sort the result
for _, srcName := range objectNames {
Expand Down Expand Up @@ -622,15 +633,20 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error {
newPath := strings.Replace(path, localSrcPath, localDstPath, 1)
if !d.IsDir() {
log.Debug("FileCache::RenameDir : Renaming local file %s -> %s", path, newPath)
// get object names
srcName := fc.getObjectName(path)
dstName := fc.getObjectName(newPath)
// get locks
sflock := fc.fileLocks.Get(fc.getObjectName(path))
dflock := fc.fileLocks.Get(fc.getObjectName(newPath))
sflock := fc.fileLocks.Get(srcName)
dflock := fc.fileLocks.Get(dstName)
// complete local rename
err := fc.renameCachedFile(path, newPath, sflock, dflock)
if err != nil {
// there's really not much we can do to handle the error, so just log it
log.Err("FileCache::RenameDir : %s file rename failed. Directory state is inconsistent!", path)
}
// handle should be updated regardless, for consistency on upload
fc.renameOpenHandles(srcName, dstName, sflock, dflock)
} else {
log.Debug("FileCache::RenameDir : Creating local destination directory %s", newPath)
// create the new directory
Expand Down Expand Up @@ -661,26 +677,17 @@ func (fc *FileCache) RenameDir(options internal.RenameDirOptions) error {
fc.policy.CachePurge(directoriesToPurge[i], nil)
}

return nil
}

func (fc *FileCache) listAllObjects(prefix string) (objectNames []string, err error) {
// get cloud objects
var cloudObjects []string
cloudObjects, err = fc.listCloudObjects(prefix)
if err != nil {
return
}
// get local / cached objects
var localObjects []string
localObjects, err = fc.listCachedObjects(prefix)
if err != nil {
return
// update any lazy open handles (which are not in the local listing)
for _, srcName := range cloudObjects {
dstName := strings.Replace(srcName, options.Src, options.Dst, 1)
// get locks
sflock := fc.fileLocks.Get(srcName)
dflock := fc.fileLocks.Get(dstName)
// update any remaining open handles
fc.renameOpenHandles(srcName, dstName, sflock, dflock)
}
// combine the lists
objectNames = combineLists(cloudObjects, localObjects)

return
return nil
}

// recursively list all objects in the container at the given prefix / directory
Expand Down Expand Up @@ -1563,21 +1570,8 @@ func (fc *FileCache) RenameFile(options internal.RenameFileOptions) error {
return localRenameErr
}

if sflock.Count() > 0 {
// update any open handles to the file with its new name
handlemap.GetHandles().Range(func(key, value any) bool {
handle := value.(*handlemap.Handle)
if handle.Path == options.Src {
handle.Path = options.Dst
}
return true
})
// copy the number of open handles to the new name
for sflock.Count() > 0 {
sflock.Dec()
dflock.Inc()
}
}
// rename open handles
fc.renameOpenHandles(options.Src, options.Dst, sflock, dflock)

return nil
}
Expand Down Expand Up @@ -1608,6 +1602,25 @@ func (fc *FileCache) renameCachedFile(localSrcPath, localDstPath string, sflock,
return nil
}

func (fc *FileCache) renameOpenHandles(srcName, dstName string, sflock, dflock *common.LockMapItem) {
// update open handles
if sflock.Count() > 0 {
// update any open handles to the file with its new name
handlemap.GetHandles().Range(func(key, value any) bool {
handle := value.(*handlemap.Handle)
if handle.Path == srcName {
handle.Path = dstName
}
return true
})
// copy the number of open handles to the new name
for sflock.Count() > 0 {
sflock.Dec()
dflock.Inc()
}
}
}

// TruncateFile: Update the file with its new size.
func (fc *FileCache) TruncateFile(options internal.TruncateFileOptions) error {
log.Trace("FileCache::TruncateFile : name=%s, size=%d", options.Name, options.Size)
Expand Down
148 changes: 148 additions & 0 deletions component/file_cache/file_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,154 @@ func (suite *fileCacheTestSuite) TestRenameDir() {
}
}

// Combined test for all three cases
func (suite *fileCacheTestSuite) TestRenameDirOpenFile() {
defer suite.cleanupTest()

// Setup
srcDir := "src"
dstDir := "dst"
suite.fileCache.CreateDir(internal.CreateDirOptions{Name: srcDir, Mode: 0777})
//
// Case 1
case1src := srcDir + "/fileCase1"
case1dst := dstDir + "/fileCase1"
// create file in cloud
tempHandle, _ := suite.loopback.CreateFile(internal.CreateFileOptions{Name: case1src, Mode: 0777})
suite.loopback.CloseFile(internal.CloseFileOptions{Handle: tempHandle})
// open file for writing
handle1, err := suite.fileCache.OpenFile(internal.OpenFileOptions{Name: case1src, Flags: os.O_RDWR, Mode: 0777})
suite.assert.NoError(err)
handlemap.Add(handle1)
// Path should not be in the file cache (lazy open)
suite.assert.NoFileExists(suite.cache_path + "/" + case1src)
//
// Case 2
case2src := srcDir + "/fileCase2"
case2dst := dstDir + "/fileCase2"
// create source file
handle2, err := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: case2src, Mode: 0666})
suite.assert.NoError(err)
handlemap.Add(handle2)
// Path should only be in the file cache
suite.assert.FileExists(suite.cache_path + "/" + case2src)
suite.assert.NoFileExists(suite.fake_storage_path + "/" + case2src)
//
// Case 3
case3src := srcDir + "/fileCase3"
case3dst := dstDir + "/fileCase3"
// create source file
handle3, _ := suite.fileCache.CreateFile(internal.CreateFileOptions{Name: case3src, Mode: 0666})
handlemap.Add(handle3)
// Path should be in the file cache
suite.assert.FileExists(suite.cache_path + "/" + case3src)
// write and flush to cloud
initialData := []byte("initialData")
n, err := suite.fileCache.WriteFile(internal.WriteFileOptions{
Handle: handle3,
Data: initialData,
})
suite.assert.NoError(err)
suite.assert.Equal(len(initialData), n)
err = suite.fileCache.FlushFile(internal.FlushFileOptions{
Handle: handle3,
})
suite.assert.NoError(err)
suite.assert.FileExists(filepath.Join(suite.fake_storage_path, case3src))

// Test: Rename the directory
err = suite.fileCache.RenameDir(internal.RenameDirOptions{Src: srcDir, Dst: dstDir})
suite.assert.NoError(err)
//
// Case 1
// rename succeeded in cloud
suite.assert.NoFileExists(filepath.Join(suite.fake_storage_path, case1src))
suite.assert.FileExists(filepath.Join(suite.fake_storage_path, case1dst))
// still in lazy open state
suite.assert.NoFileExists(filepath.Join(suite.cache_path, case1src))
suite.assert.NoFileExists(filepath.Join(suite.cache_path, case1dst))
//
// Case 2
// local rename succeeded
suite.assert.NoFileExists(filepath.Join(suite.cache_path, case2src))
suite.assert.FileExists(filepath.Join(suite.cache_path, case2dst))
// file still in case 2
suite.assert.NoFileExists(filepath.Join(suite.fake_storage_path, case2src))
suite.assert.NoFileExists(filepath.Join(suite.fake_storage_path, case2dst))
//
// Case 3
// local rename succeeded
suite.assert.NoFileExists(filepath.Join(suite.cache_path, case3src))
suite.assert.FileExists(filepath.Join(suite.cache_path, case3dst))
// cloud rename succeeded
suite.assert.NoFileExists(filepath.Join(suite.fake_storage_path, case3src))
suite.assert.FileExists(filepath.Join(suite.fake_storage_path, case3dst))

// Test: write new data
data := []byte("newdata")
//
// Case 1
// write to file handle
n, err = suite.fileCache.WriteFile(internal.WriteFileOptions{
Handle: handle1,
Data: data,
})
suite.assert.NoError(err)
suite.assert.Equal(len(data), n)
// open is completed (file is downloaded), and writes go to the correct file
suite.assert.NoFileExists(filepath.Join(suite.cache_path, case1src))
suite.assert.FileExists(filepath.Join(suite.cache_path, case1dst))
//
// Case 2
n, err = suite.fileCache.WriteFile(internal.WriteFileOptions{
Handle: handle2,
Data: data,
})
suite.assert.NoError(err)
suite.assert.Equal(len(data), n)
//
// Case 3
n, err = suite.fileCache.WriteFile(internal.WriteFileOptions{
Handle: handle3,
Data: data,
Offset: int64(len(initialData)),
})
suite.assert.NoError(err)
suite.assert.Equal(len(data), n)

// Test: Close handle
//
// Case 1
err = suite.fileCache.CloseFile(internal.CloseFileOptions{
Handle: handle1,
})
suite.assert.NoError(err)
// check cloud data
dstData, err := os.ReadFile(path.Join(suite.fake_storage_path, case1dst))
suite.assert.NoError(err)
suite.assert.Equal(data, dstData)
//
// Case 2
err = suite.fileCache.CloseFile(internal.CloseFileOptions{
Handle: handle2,
})
suite.assert.NoError(err)
// check cloud data
dstData, err = os.ReadFile(path.Join(suite.fake_storage_path, case2dst))
suite.assert.NoError(err)
suite.assert.Equal(data, dstData)
//
// Case 3
err = suite.fileCache.CloseFile(internal.CloseFileOptions{
Handle: handle3,
})
suite.assert.NoError(err)
// check cloud data
dstData, err = os.ReadFile(path.Join(suite.fake_storage_path, case3dst))
suite.assert.NoError(err)
suite.assert.Equal(append(initialData, data...), dstData)
}

func (suite *fileCacheTestSuite) TestCreateFile() {
defer suite.cleanupTest()
// Default is to not create empty files on create file to support immutable storage.
Expand Down