Skip to content

Commit

Permalink
fix(blooms): Clean block directories recursively on startup (#12895)
Browse files Browse the repository at this point in the history
Any empty directories in the block directory cache directory should recursively be removed to avoid a lot of dangling, empty directories.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum authored May 6, 2024
1 parent 738c274 commit 7b77e31
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 12 deletions.
32 changes: 32 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,29 @@ func LoadBlocksDirIntoCache(paths []string, c Cache, logger log.Logger) error {
return err.Err()
}

func removeRecursively(root, path string) error {
if path == root {
// stop when reached root directory
return nil
}

entries, err := os.ReadDir(path)
if err != nil {
// stop in case of error
return err
}

if len(entries) == 0 {
base := filepath.Dir(path)
if err := os.RemoveAll(path); err != nil {
return err
}
return removeRecursively(root, base)
}

return nil
}

func loadBlockDirectories(root string, logger log.Logger) (keys []string, values []BlockDirectory) {
resolver := NewPrefixedResolver(root, defaultKeyResolver{})
_ = filepath.WalkDir(root, func(path string, dirEntry fs.DirEntry, e error) error {
Expand All @@ -57,6 +80,15 @@ func loadBlockDirectories(root string, logger log.Logger) (keys []string, values
return nil
}

// Remove empty directories recursively
// filepath.WalkDir() does not support depth-first traversal,
// so this is not very efficient
err := removeRecursively(root, path)
if err != nil {
level.Warn(logger).Log("msg", "failed to remove directory", "path", path, "err", err)
return nil
}

ref, err := resolver.ParseBlockKey(key(path))
if err != nil {
return nil
Expand Down
49 changes: 37 additions & 12 deletions pkg/storage/stores/shipper/bloomshipper/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomshipper

import (
"context"
"io/fs"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -66,19 +67,24 @@ func Test_LoadBlocksDirIntoCache(t *testing.T) {
fp.Close()

// invalid directory
_ = os.MkdirAll(filepath.Join(wd, "not/a/valid/blockdir"), 0o755)
invalidDir := "not/a/valid/blockdir"
_ = os.MkdirAll(filepath.Join(wd, invalidDir), 0o755)

// empty block directory
fn1 := "bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-abcd"
_ = os.MkdirAll(filepath.Join(wd, fn1), 0o755)
// empty block directories
emptyDir1 := "bloom/table_1/tenant/blocks/0000000000000000-000000000000ffff/0-3600000-abcd"
_ = os.MkdirAll(filepath.Join(wd, emptyDir1), 0o755)
emptyDir2 := "bloom/table_1/tenant/blocks/0000000000010000-000000000001ffff/0-3600000-ef01"
_ = os.MkdirAll(filepath.Join(wd, emptyDir2), 0o755)
emptyDir3 := "bloom/table_1/tenant/blocks/0000000000020000-000000000002ffff/0-3600000-2345"
_ = os.MkdirAll(filepath.Join(wd, emptyDir3), 0o755)

// valid block directory
fn2 := "bloom/table_2/tenant/blocks/0000000000010000-000000000001ffff/0-3600000-abcd"
_ = os.MkdirAll(filepath.Join(wd, fn2), 0o755)
fp, _ = os.Create(filepath.Join(wd, fn2, "bloom"))
fp.Close()
fp, _ = os.Create(filepath.Join(wd, fn2, "series"))
fp.Close()
validDir := "bloom/table_2/tenant/blocks/0000000000010000-000000000001ffff/0-3600000-abcd"
_ = os.MkdirAll(filepath.Join(wd, validDir), 0o755)
for _, fn := range []string{"bloom", "series"} {
fp, _ = os.Create(filepath.Join(wd, validDir, fn))
fp.Close()
}

cfg := config.BlocksCacheConfig{
SoftLimit: 1 << 20,
Expand All @@ -93,9 +99,28 @@ func Test_LoadBlocksDirIntoCache(t *testing.T) {

require.Equal(t, 1, len(c.entries))

key := filepath.Join(wd, fn2) + ".tar.gz"
key := filepath.Join(wd, validDir) + ".tar.gz"
elem, found := c.entries[key]
require.True(t, found)
blockDir := elem.Value.(*Entry).Value
require.Equal(t, filepath.Join(wd, fn2), blockDir.Path)
require.Equal(t, filepath.Join(wd, validDir), blockDir.Path)

// check cleaned directories
dirs := make([]string, 0, 6)
_ = filepath.WalkDir(wd, func(path string, dirEntry fs.DirEntry, _ error) error {
if !dirEntry.IsDir() {
return nil
}
dirs = append(dirs, path)
return nil
})
require.Equal(t, []string{
filepath.Join(wd),
filepath.Join(wd, "bloom/"),
filepath.Join(wd, "bloom/table_2/"),
filepath.Join(wd, "bloom/table_2/tenant/"),
filepath.Join(wd, "bloom/table_2/tenant/blocks/"),
filepath.Join(wd, "bloom/table_2/tenant/blocks/0000000000010000-000000000001ffff"),
filepath.Join(wd, "bloom/table_2/tenant/blocks/0000000000010000-000000000001ffff/0-3600000-abcd"),
}, dirs)
}

0 comments on commit 7b77e31

Please sign in to comment.