Skip to content

Commit

Permalink
Merge pull request #489 from gilbertchen/sftp_retry
Browse files Browse the repository at this point in the history
Retry on EOF errors in the SFTP backend
  • Loading branch information
gilbertchen committed Jan 4, 2019
2 parents c4a3dd1 + ab28115 commit 46376d8
Showing 1 changed file with 131 additions and 49 deletions.
180 changes: 131 additions & 49 deletions src/duplicacy_sftpstorage.go
Expand Up @@ -14,6 +14,7 @@ import (
"runtime"
"strings"
"time"
"sync"

"github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
Expand All @@ -23,9 +24,13 @@ type SFTPStorage struct {
StorageBase

client *sftp.Client
clientLock sync.Mutex
minimumNesting int // The minimum level of directories to dive into before searching for the chunk file.
storageDir string
numberOfThreads int
numberOfTries int
serverAddress string
sftpConfig *ssh.ClientConfig
}

func CreateSFTPStorageWithPassword(server string, port int, username string, storageDir string,
Expand Down Expand Up @@ -86,6 +91,9 @@ func CreateSFTPStorage(server string, port int, username string, storageDir stri
storageDir: storageDir,
minimumNesting: minimumNesting,
numberOfThreads: threads,
numberOfTries: 6,
serverAddress: serverAddress,
sftpConfig: sftpConfig,
}

// Random number fo generating the temporary chunk file suffix.
Expand All @@ -99,13 +107,60 @@ func CreateSFTPStorage(server string, port int, username string, storageDir stri
}

func CloseSFTPStorage(storage *SFTPStorage) {
storage.client.Close()
if storage.client != nil {
storage.client.Close()
storage.client = nil
}
}

func (storage *SFTPStorage) getSFTPClient() *sftp.Client {
storage.clientLock.Lock()
defer storage.clientLock.Unlock()
return storage.client
}

func (storage *SFTPStorage) retry(f func () error) error {
delay := time.Second
for i := 0;; i++ {
err := f()
if err != nil && strings.Contains(err.Error(), "EOF") && i < storage.numberOfTries {
LOG_WARN("SFTP_RETRY", "Encountered an error (%v); retry after %d second(s)", err, delay/time.Second)
time.Sleep(delay)
delay *= 2

storage.clientLock.Lock()
if storage.client != nil {
storage.client.Close()
storage.client = nil
}

connection, err := ssh.Dial("tcp", storage.serverAddress, storage.sftpConfig)
if err != nil {
storage.clientLock.Unlock()
return err
}

client, err := sftp.NewClient(connection)
if err != nil {
connection.Close()
storage.clientLock.Unlock()
return err
}
storage.client = client
storage.clientLock.Unlock()
continue
}
return err
}
}
// ListFiles return the list of files and subdirectories under 'file' (non-recursively)
func (storage *SFTPStorage) ListFiles(threadIndex int, dirPath string) (files []string, sizes []int64, err error) {

entries, err := storage.client.ReadDir(path.Join(storage.storageDir, dirPath))
var entries []os.FileInfo
err = storage.retry(func() error {
entries, err = storage.getSFTPClient().ReadDir(path.Join(storage.storageDir, dirPath))
return err
})
if err != nil {
return nil, nil, err
}
Expand All @@ -126,7 +181,11 @@ func (storage *SFTPStorage) ListFiles(threadIndex int, dirPath string) (files []
// DeleteFile deletes the file or directory at 'filePath'.
func (storage *SFTPStorage) DeleteFile(threadIndex int, filePath string) (err error) {
fullPath := path.Join(storage.storageDir, filePath)
fileInfo, err := storage.client.Stat(fullPath)
var fileInfo os.FileInfo
err = storage.retry(func() error {
fileInfo, err = storage.getSFTPClient().Stat(fullPath)
return err
})
if err != nil {
if os.IsNotExist(err) {
LOG_TRACE("SFTP_STORAGE", "File %s has disappeared before deletion", filePath)
Expand All @@ -137,33 +196,47 @@ func (storage *SFTPStorage) DeleteFile(threadIndex int, filePath string) (err er
if fileInfo == nil {
return nil
}
return storage.client.Remove(path.Join(storage.storageDir, filePath))
return storage.retry(func() error { return storage.getSFTPClient().Remove(path.Join(storage.storageDir, filePath)) })
}

// MoveFile renames the file.
func (storage *SFTPStorage) MoveFile(threadIndex int, from string, to string) (err error) {
toPath := path.Join(storage.storageDir, to)
fileInfo, err := storage.client.Stat(toPath)
var fileInfo os.FileInfo
err = storage.retry(func() error {
fileInfo, err = storage.getSFTPClient().Stat(toPath)
return err
})
if fileInfo != nil {
return fmt.Errorf("The destination file %s already exists", toPath)
}
return storage.client.Rename(path.Join(storage.storageDir, from),
path.Join(storage.storageDir, to))
err = storage.retry(func() error { return storage.getSFTPClient().Rename(path.Join(storage.storageDir, from),
path.Join(storage.storageDir, to)) })
return err
}

// CreateDirectory creates a new directory.
func (storage *SFTPStorage) CreateDirectory(threadIndex int, dirPath string) (err error) {
fullPath := path.Join(storage.storageDir, dirPath)
fileInfo, err := storage.client.Stat(fullPath)
var fileInfo os.FileInfo
err = storage.retry(func() error {
fileInfo, err = storage.getSFTPClient().Stat(fullPath)
return err
})
if fileInfo != nil && fileInfo.IsDir() {
return nil
}
return storage.client.Mkdir(path.Join(storage.storageDir, dirPath))
return storage.retry(func() error { return storage.getSFTPClient().Mkdir(path.Join(storage.storageDir, dirPath)) })
}

// GetFileInfo returns the information about the file or directory at 'filePath'.
func (storage *SFTPStorage) GetFileInfo(threadIndex int, filePath string) (exist bool, isDir bool, size int64, err error) {
fileInfo, err := storage.client.Stat(path.Join(storage.storageDir, filePath))
var fileInfo os.FileInfo
err = storage.retry(func() error {
fileInfo, err = storage.getSFTPClient().Stat(path.Join(storage.storageDir, filePath))
return err
})

if err != nil {
if os.IsNotExist(err) {
return false, false, 0, nil
Expand All @@ -181,18 +254,19 @@ func (storage *SFTPStorage) GetFileInfo(threadIndex int, filePath string) (exist

// DownloadFile reads the file at 'filePath' into the chunk.
func (storage *SFTPStorage) DownloadFile(threadIndex int, filePath string, chunk *Chunk) (err error) {
file, err := storage.client.Open(path.Join(storage.storageDir, filePath))
return storage.retry(func() error {
file, err := storage.getSFTPClient().Open(path.Join(storage.storageDir, filePath))

if err != nil {
return err
}

defer file.Close()
if _, err = RateLimitedCopy(chunk, file, storage.DownloadRateLimit/storage.numberOfThreads); err != nil {
return err
}
if err != nil {
return err
}

return nil
defer file.Close()
if _, err = RateLimitedCopy(chunk, file, storage.DownloadRateLimit/storage.numberOfThreads); err != nil {
return err
}
return nil
})
}

// UploadFile writes 'content' to the file at 'filePath'.
Expand All @@ -203,60 +277,68 @@ func (storage *SFTPStorage) UploadFile(threadIndex int, filePath string, content
dirs := strings.Split(filePath, "/")
if len(dirs) > 1 {
fullDir := path.Dir(fullPath)
_, err := storage.client.Stat(fullDir)
err = storage.retry(func() error {
_, err := storage.getSFTPClient().Stat(fullDir)
return err
})
if err != nil {
// The error may be caused by a non-existent fullDir, or a broken connection. In either case,
// we just assume it is the former because there isn't a way to tell which is the case.
for i := range dirs[1 : len(dirs)-1] {
subDir := path.Join(storage.storageDir, path.Join(dirs[0:i+2]...))
// We don't check the error; just keep going blindly but always store the last err
err = storage.client.Mkdir(subDir)
err = storage.getSFTPClient().Mkdir(subDir)
}

// If there is an error creating the dirs, we check fullDir one more time, because another thread
// may happen to create the same fullDir ahead of this thread
if err != nil {
_, err := storage.client.Stat(fullDir)
err = storage.retry(func() error {
_, err := storage.getSFTPClient().Stat(fullDir)
return err
})
if err != nil {
return err
}
}
}
}

letters := "abcdefghijklmnopqrstuvwxyz"
suffix := make([]byte, 8)
for i := range suffix {
suffix[i] = letters[rand.Intn(len(letters))]
}
return storage.retry(func() error {

temporaryFile := fullPath + "." + string(suffix) + ".tmp"
letters := "abcdefghijklmnopqrstuvwxyz"
suffix := make([]byte, 8)
for i := range suffix {
suffix[i] = letters[rand.Intn(len(letters))]
}

file, err := storage.client.OpenFile(temporaryFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC)
if err != nil {
return err
}
temporaryFile := fullPath + "." + string(suffix) + ".tmp"

reader := CreateRateLimitedReader(content, storage.UploadRateLimit/storage.numberOfThreads)
_, err = io.Copy(file, reader)
if err != nil {
file.Close()
return err
}
file.Close()
file, err := storage.getSFTPClient().OpenFile(temporaryFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC)
if err != nil {
return err
}

err = storage.client.Rename(temporaryFile, fullPath)
if err != nil {
reader := CreateRateLimitedReader(content, storage.UploadRateLimit/storage.numberOfThreads)
_, err = io.Copy(file, reader)
if err != nil {
file.Close()
return err
}
file.Close()

if _, err = storage.client.Stat(fullPath); err == nil {
storage.client.Remove(temporaryFile)
return nil
} else {
return fmt.Errorf("Uploaded file but failed to store it at %s: %v", fullPath, err)
err = storage.getSFTPClient().Rename(temporaryFile, fullPath)
if err != nil {
if _, err = storage.getSFTPClient().Stat(fullPath); err == nil {
storage.getSFTPClient().Remove(temporaryFile)
return nil
} else {
return fmt.Errorf("Uploaded file but failed to store it at %s: %v", fullPath, err)
}
}
}

return nil
return nil
})
}

// If a local snapshot cache is needed for the storage to avoid downloading/uploading chunks too often when
Expand Down

0 comments on commit 46376d8

Please sign in to comment.