Skip to content

Commit

Permalink
all: try hard removing directory with contents
Browse files Browse the repository at this point in the history
Fixes #61
  • Loading branch information
valyala committed Jun 10, 2019
1 parent cd1bc32 commit ac7b186
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 26 deletions.
2 changes: 1 addition & 1 deletion app/vmselect/netstorage/tmp_blocks_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func InitTmpBlocksDir(tmpDirPath string) {
tmpDirPath = os.TempDir()
}
tmpBlocksDir = tmpDirPath + "/searchResults"
if err := os.RemoveAll(tmpBlocksDir); err != nil {
if err := fs.RemoveAllHard(tmpBlocksDir); err != nil {
logger.Panicf("FATAL: cannot remove %q: %s", tmpBlocksDir, err)
}
if err := fs.MkdirAllIfNotExist(tmpBlocksDir); err != nil {
Expand Down
64 changes: 61 additions & 3 deletions lib/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"io"
"os"
"path/filepath"
"strings"
"time"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
Expand Down Expand Up @@ -159,7 +161,7 @@ func RemoveDirContents(dir string) {
continue
}
fullPath := dir + "/" + name
if err := os.RemoveAll(fullPath); err != nil {
if err := RemoveAllHard(fullPath); err != nil {
logger.Panicf("FATAL: cannot remove %q: %s", fullPath, err)
}
}
Expand Down Expand Up @@ -188,10 +190,66 @@ func IsPathExist(path string) bool {
// MustRemoveAllSynced removes path with all the contents
// and syncs the parent directory, so it no longer contains the path.
func MustRemoveAllSynced(path string) {
if err := os.RemoveAll(path); err != nil {
MustRemoveAll(path)
SyncPath(filepath.Dir(path))
}

// MustRemoveAll removes path with all the contents.
func MustRemoveAll(path string) {
if err := RemoveAllHard(path); err != nil {
logger.Panicf("FATAL: cannot remove %q: %s", path, err)
}
SyncPath(filepath.Dir(path))
}

// RemoveAllHard removes path with all the contents.
//
// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
func RemoveAllHard(path string) error {
err := os.RemoveAll(path)
if err == nil {
return nil
}
if !strings.Contains(err.Error(), "directory not empty") {
return err
}
// This may be NFS-related issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
// Schedule for later directory removal.
select {
case removeDirCh <- path:
default:
return fmt.Errorf("cannot schedule %s for removal, since the removal queue is full (%d entries)", path, cap(removeDirCh))
}
return nil
}

var removeDirCh = make(chan string, 1024)

func dirRemover() {
for path := range removeDirCh {
attempts := 0
for {
err := os.RemoveAll(path)
if err == nil {
break
}
if !strings.Contains(err.Error(), "directory not empty") {
logger.Errorf("cannot remove %q: %s", path, err)
break
}
// NFS-related issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
// Sleep for a while and try again.
attempts++
if attempts > 50 {
logger.Errorf("cannot remove %q in %d attempts: %s", path, attempts, err)
break
}
time.Sleep(10 * time.Millisecond)
}
}
}

func init() {
go dirRemover()
}

// HardLinkFiles makes hard links for all the files from srcDir in dstDir.
Expand Down
9 changes: 4 additions & 5 deletions lib/mergeset/block_stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package mergeset

import (
"fmt"
"os"
"path/filepath"
"sync"

Expand Down Expand Up @@ -92,15 +91,15 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
metaindexPath := path + "/metaindex.bin"
metaindexFile, err := filestream.Create(metaindexPath, false)
if err != nil {
_ = os.RemoveAll(path)
fs.MustRemoveAll(path)
return fmt.Errorf("cannot create metaindex file: %s", err)
}

indexPath := path + "/index.bin"
indexFile, err := filestream.Create(indexPath, nocache)
if err != nil {
metaindexFile.MustClose()
_ = os.RemoveAll(path)
fs.MustRemoveAll(path)
return fmt.Errorf("cannot create index file: %s", err)
}

Expand All @@ -109,7 +108,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
if err != nil {
metaindexFile.MustClose()
indexFile.MustClose()
_ = os.RemoveAll(path)
fs.MustRemoveAll(path)
return fmt.Errorf("cannot create items file: %s", err)
}

Expand All @@ -119,7 +118,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
metaindexFile.MustClose()
indexFile.MustClose()
itemsFile.MustClose()
_ = os.RemoveAll(path)
fs.MustRemoveAll(path)
return fmt.Errorf("cannot create lens file: %s", err)
}

Expand Down
6 changes: 3 additions & 3 deletions lib/mergeset/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,15 +846,15 @@ func openParts(path string) ([]*partWrapper, error) {
}

txnDir := path + "/txn"
if err := os.RemoveAll(txnDir); err != nil {
if err := fs.RemoveAllHard(txnDir); err != nil {
return nil, fmt.Errorf("cannot remove %q: %s", txnDir, err)
}
if err := fs.MkdirAllFailIfExist(txnDir); err != nil {
return nil, fmt.Errorf("cannot create %q: %s", txnDir, err)
}

tmpDir := path + "/tmp"
if err := os.RemoveAll(tmpDir); err != nil {
if err := fs.RemoveAllHard(tmpDir); err != nil {
return nil, fmt.Errorf("cannot remove %q: %s", tmpDir, err)
}
if err := fs.MkdirAllFailIfExist(tmpDir); err != nil {
Expand Down Expand Up @@ -1033,7 +1033,7 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix, txnPath string) error {
if err != nil {
return fmt.Errorf("invalid path to remove: %s", err)
}
if err := os.RemoveAll(path); err != nil {
if err := fs.RemoveAllHard(path); err != nil {
return fmt.Errorf("cannot remove %q: %s", path, err)
}
}
Expand Down
9 changes: 4 additions & 5 deletions lib/storage/block_stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package storage

import (
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -85,15 +84,15 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
timestampsPath := path + "/timestamps.bin"
timestampsFile, err := filestream.Create(timestampsPath, nocache)
if err != nil {
_ = os.RemoveAll(path)
fs.MustRemoveAll(path)
return fmt.Errorf("cannot create timestamps file: %s", err)
}

valuesPath := path + "/values.bin"
valuesFile, err := filestream.Create(valuesPath, nocache)
if err != nil {
timestampsFile.MustClose()
_ = os.RemoveAll(path)
fs.MustRemoveAll(path)
return fmt.Errorf("cannot create values file: %s", err)
}

Expand All @@ -102,7 +101,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
if err != nil {
timestampsFile.MustClose()
valuesFile.MustClose()
_ = os.RemoveAll(path)
fs.MustRemoveAll(path)
return fmt.Errorf("cannot create index file: %s", err)
}

Expand All @@ -114,7 +113,7 @@ func (bsw *blockStreamWriter) InitFromFilePart(path string, nocache bool, compre
timestampsFile.MustClose()
valuesFile.MustClose()
indexFile.MustClose()
_ = os.RemoveAll(path)
fs.MustRemoveAll(path)
return fmt.Errorf("cannot create metaindex file: %s", err)
}

Expand Down
4 changes: 2 additions & 2 deletions lib/storage/index_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"sync"
Expand All @@ -15,6 +14,7 @@ import (

"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
Expand Down Expand Up @@ -266,7 +266,7 @@ func (db *indexDB) decRef() {
}

logger.Infof("dropping indexDB %q", tbPath)
if err := os.RemoveAll(tbPath); err != nil {
if err := fs.RemoveAllHard(tbPath); err != nil {
logger.Panicf("FATAL: cannot remove %q: %s", tbPath, err)
}
logger.Infof("indexDB %q has been dropped", tbPath)
Expand Down
12 changes: 6 additions & 6 deletions lib/storage/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,10 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str
func (pt *partition) Drop() {
logger.Infof("dropping partition %q at smallPartsPath=%q, bigPartsPath=%q", pt.name, pt.smallPartsPath, pt.bigPartsPath)

if err := os.RemoveAll(pt.smallPartsPath); err != nil {
if err := fs.RemoveAllHard(pt.smallPartsPath); err != nil {
logger.Panicf("FATAL: cannot remove small parts directory %q: %s", pt.smallPartsPath, err)
}
if err := os.RemoveAll(pt.bigPartsPath); err != nil {
if err := fs.RemoveAllHard(pt.bigPartsPath); err != nil {
logger.Panicf("FATAL: cannot remove big parts directory %q: %s", pt.bigPartsPath, err)
}

Expand Down Expand Up @@ -1223,11 +1223,11 @@ func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) {
}

txnDir := path + "/txn"
if err := os.RemoveAll(txnDir); err != nil {
if err := fs.RemoveAllHard(txnDir); err != nil {
return nil, fmt.Errorf("cannot delete transaction directory %q: %s", txnDir, err)
}
tmpDir := path + "/tmp"
if err := os.RemoveAll(tmpDir); err != nil {
if err := fs.RemoveAllHard(tmpDir); err != nil {
return nil, fmt.Errorf("cannot remove temporary directory %q: %s", tmpDir, err)
}
if err := createPartitionDirs(path); err != nil {
Expand Down Expand Up @@ -1408,7 +1408,7 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
if err != nil {
return fmt.Errorf("invalid path to remove: %s", err)
}
if err := os.RemoveAll(path); err != nil {
if err := fs.RemoveAllHard(path); err != nil {
return fmt.Errorf("cannot remove %q: %s", path, err)
}
}
Expand Down Expand Up @@ -1438,7 +1438,7 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
}
} else {
// Just remove srcPath.
if err := os.RemoveAll(srcPath); err != nil {
if err := fs.RemoveAllHard(srcPath); err != nil {
return fmt.Errorf("cannot remove %q: %s", srcPath, err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache *fastcache.Ca
for _, tn := range tableNames[:len(tableNames)-2] {
pathToRemove := path + "/" + tn
logger.Infof("removing obsolete indexdb dir %q...", pathToRemove)
if err := os.RemoveAll(pathToRemove); err != nil {
if err := fs.RemoveAllHard(pathToRemove); err != nil {
return nil, nil, fmt.Errorf("cannot remove obsolete indexdb dir %q: %s", pathToRemove, err)
}
logger.Infof("removed obsolete indexdb dir %q", pathToRemove)
Expand Down

0 comments on commit ac7b186

Please sign in to comment.