From ab28115f952340f5476b7e2a3355222adaaa7ab1 Mon Sep 17 00:00:00 2001 From: Gilbert Chen Date: Wed, 29 Aug 2018 23:15:00 -0400 Subject: [PATCH] Retry on EOF errors in the SFTP backend --- src/duplicacy_sftpstorage.go | 180 +++++++++++++++++++++++++---------- 1 file changed, 131 insertions(+), 49 deletions(-) diff --git a/src/duplicacy_sftpstorage.go b/src/duplicacy_sftpstorage.go index fae4ced9..2d1dcb78 100644 --- a/src/duplicacy_sftpstorage.go +++ b/src/duplicacy_sftpstorage.go @@ -14,6 +14,7 @@ import ( "runtime" "strings" "time" + "sync" "github.com/pkg/sftp" "golang.org/x/crypto/ssh" @@ -23,9 +24,13 @@ type SFTPStorage struct { StorageBase client *sftp.Client + clientLock sync.Mutex minimumNesting int // The minimum level of directories to dive into before searching for the chunk file. storageDir string numberOfThreads int + numberOfTries int + serverAddress string + sftpConfig *ssh.ClientConfig } func CreateSFTPStorageWithPassword(server string, port int, username string, storageDir string, @@ -86,6 +91,9 @@ func CreateSFTPStorage(server string, port int, username string, storageDir stri storageDir: storageDir, minimumNesting: minimumNesting, numberOfThreads: threads, + numberOfTries: 6, + serverAddress: serverAddress, + sftpConfig: sftpConfig, } // Random number fo generating the temporary chunk file suffix. @@ -99,13 +107,60 @@ func CreateSFTPStorage(server string, port int, username string, storageDir stri } func CloseSFTPStorage(storage *SFTPStorage) { - storage.client.Close() + if storage.client != nil { + storage.client.Close() + storage.client = nil + } +} + +func (storage *SFTPStorage) getSFTPClient() *sftp.Client { + storage.clientLock.Lock() + defer storage.clientLock.Unlock() + return storage.client } +func (storage *SFTPStorage) retry(f func () error) error { + delay := time.Second + for i := 0;; i++ { + err := f() + if err != nil && strings.Contains(err.Error(), "EOF") && i < storage.numberOfTries { + LOG_WARN("SFTP_RETRY", "Encountered an error (%v); retry after %d second(s)", err, delay/time.Second) + time.Sleep(delay) + delay *= 2 + + storage.clientLock.Lock() + if storage.client != nil { + storage.client.Close() + storage.client = nil + } + + connection, err := ssh.Dial("tcp", storage.serverAddress, storage.sftpConfig) + if err != nil { + storage.clientLock.Unlock() + return err + } + + client, err := sftp.NewClient(connection) + if err != nil { + connection.Close() + storage.clientLock.Unlock() + return err + } + storage.client = client + storage.clientLock.Unlock() + continue + } + return err + } +} // ListFiles return the list of files and subdirectories under 'file' (non-recursively) func (storage *SFTPStorage) ListFiles(threadIndex int, dirPath string) (files []string, sizes []int64, err error) { - entries, err := storage.client.ReadDir(path.Join(storage.storageDir, dirPath)) + var entries []os.FileInfo + err = storage.retry(func() error { + entries, err = storage.getSFTPClient().ReadDir(path.Join(storage.storageDir, dirPath)) + return err + }) if err != nil { return nil, nil, err } @@ -126,7 +181,11 @@ func (storage *SFTPStorage) ListFiles(threadIndex int, dirPath string) (files [] // DeleteFile deletes the file or directory at 'filePath'. func (storage *SFTPStorage) DeleteFile(threadIndex int, filePath string) (err error) { fullPath := path.Join(storage.storageDir, filePath) - fileInfo, err := storage.client.Stat(fullPath) + var fileInfo os.FileInfo + err = storage.retry(func() error { + fileInfo, err = storage.getSFTPClient().Stat(fullPath) + return err + }) if err != nil { if os.IsNotExist(err) { LOG_TRACE("SFTP_STORAGE", "File %s has disappeared before deletion", filePath) @@ -137,33 +196,47 @@ func (storage *SFTPStorage) DeleteFile(threadIndex int, filePath string) (err er if fileInfo == nil { return nil } - return storage.client.Remove(path.Join(storage.storageDir, filePath)) + return storage.retry(func() error { return storage.getSFTPClient().Remove(path.Join(storage.storageDir, filePath)) }) } // MoveFile renames the file. func (storage *SFTPStorage) MoveFile(threadIndex int, from string, to string) (err error) { toPath := path.Join(storage.storageDir, to) - fileInfo, err := storage.client.Stat(toPath) + var fileInfo os.FileInfo + err = storage.retry(func() error { + fileInfo, err = storage.getSFTPClient().Stat(toPath) + return err + }) if fileInfo != nil { return fmt.Errorf("The destination file %s already exists", toPath) } - return storage.client.Rename(path.Join(storage.storageDir, from), - path.Join(storage.storageDir, to)) + err = storage.retry(func() error { return storage.getSFTPClient().Rename(path.Join(storage.storageDir, from), + path.Join(storage.storageDir, to)) }) + return err } // CreateDirectory creates a new directory. func (storage *SFTPStorage) CreateDirectory(threadIndex int, dirPath string) (err error) { fullPath := path.Join(storage.storageDir, dirPath) - fileInfo, err := storage.client.Stat(fullPath) + var fileInfo os.FileInfo + err = storage.retry(func() error { + fileInfo, err = storage.getSFTPClient().Stat(fullPath) + return err + }) if fileInfo != nil && fileInfo.IsDir() { return nil } - return storage.client.Mkdir(path.Join(storage.storageDir, dirPath)) + return storage.retry(func() error { return storage.getSFTPClient().Mkdir(path.Join(storage.storageDir, dirPath)) }) } // GetFileInfo returns the information about the file or directory at 'filePath'. func (storage *SFTPStorage) GetFileInfo(threadIndex int, filePath string) (exist bool, isDir bool, size int64, err error) { - fileInfo, err := storage.client.Stat(path.Join(storage.storageDir, filePath)) + var fileInfo os.FileInfo + err = storage.retry(func() error { + fileInfo, err = storage.getSFTPClient().Stat(path.Join(storage.storageDir, filePath)) + return err + }) + if err != nil { if os.IsNotExist(err) { return false, false, 0, nil @@ -181,18 +254,19 @@ func (storage *SFTPStorage) GetFileInfo(threadIndex int, filePath string) (exist // DownloadFile reads the file at 'filePath' into the chunk. func (storage *SFTPStorage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) { - file, err := storage.client.Open(path.Join(storage.storageDir, filePath)) + return storage.retry(func() error { + file, err := storage.getSFTPClient().Open(path.Join(storage.storageDir, filePath)) - if err != nil { - return err - } - - defer file.Close() - if _, err = RateLimitedCopy(chunk, file, storage.DownloadRateLimit/storage.numberOfThreads); err != nil { - return err - } + if err != nil { + return err + } - return nil + defer file.Close() + if _, err = RateLimitedCopy(chunk, file, storage.DownloadRateLimit/storage.numberOfThreads); err != nil { + return err + } + return nil + }) } // UploadFile writes 'content' to the file at 'filePath'. @@ -203,20 +277,26 @@ func (storage *SFTPStorage) UploadFile(threadIndex int, filePath string, content dirs := strings.Split(filePath, "/") if len(dirs) > 1 { fullDir := path.Dir(fullPath) - _, err := storage.client.Stat(fullDir) + err = storage.retry(func() error { + _, err := storage.getSFTPClient().Stat(fullDir) + return err + }) if err != nil { // The error may be caused by a non-existent fullDir, or a broken connection. In either case, // we just assume it is the former because there isn't a way to tell which is the case. for i, _ := range dirs[1 : len(dirs)-1] { subDir := path.Join(storage.storageDir, path.Join(dirs[0:i+2]...)) // We don't check the error; just keep going blindly but always store the last err - err = storage.client.Mkdir(subDir) + err = storage.getSFTPClient().Mkdir(subDir) } // If there is an error creating the dirs, we check fullDir one more time, because another thread // may happen to create the same fullDir ahead of this thread if err != nil { - _, err := storage.client.Stat(fullDir) + err = storage.retry(func() error { + _, err := storage.getSFTPClient().Stat(fullDir) + return err + }) if err != nil { return err } @@ -224,39 +304,41 @@ func (storage *SFTPStorage) UploadFile(threadIndex int, filePath string, content } } - letters := "abcdefghijklmnopqrstuvwxyz" - suffix := make([]byte, 8) - for i := range suffix { - suffix[i] = letters[rand.Intn(len(letters))] - } + return storage.retry(func() error { - temporaryFile := fullPath + "." + string(suffix) + ".tmp" + letters := "abcdefghijklmnopqrstuvwxyz" + suffix := make([]byte, 8) + for i := range suffix { + suffix[i] = letters[rand.Intn(len(letters))] + } - file, err := storage.client.OpenFile(temporaryFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC) - if err != nil { - return err - } + temporaryFile := fullPath + "." + string(suffix) + ".tmp" - reader := CreateRateLimitedReader(content, storage.UploadRateLimit/storage.numberOfThreads) - _, err = io.Copy(file, reader) - if err != nil { - file.Close() - return err - } - file.Close() + file, err := storage.getSFTPClient().OpenFile(temporaryFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC) + if err != nil { + return err + } - err = storage.client.Rename(temporaryFile, fullPath) - if err != nil { + reader := CreateRateLimitedReader(content, storage.UploadRateLimit/storage.numberOfThreads) + _, err = io.Copy(file, reader) + if err != nil { + file.Close() + return err + } + file.Close() - if _, err = storage.client.Stat(fullPath); err == nil { - storage.client.Remove(temporaryFile) - return nil - } else { - return fmt.Errorf("Uploaded file but failed to store it at %s: %v", fullPath, err) + err = storage.getSFTPClient().Rename(temporaryFile, fullPath) + if err != nil { + if _, err = storage.getSFTPClient().Stat(fullPath); err == nil { + storage.getSFTPClient().Remove(temporaryFile) + return nil + } else { + return fmt.Errorf("Uploaded file but failed to store it at %s: %v", fullPath, err) + } } - } - return nil + return nil + }) } // If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when