Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create configurable eviction channel #695

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cache/disk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ go_library(
"@org_golang_google_grpc//status:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
"@org_golang_x_sync//semaphore:go_default_library",
],
)

Expand All @@ -44,8 +43,10 @@ go_test(
"//cache/httpproxy:go_default_library",
"//genproto/build/bazel/remote/execution/v2:go_default_library",
"//utils:go_default_library",
"//utils/tempfile:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/testutil:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
"@org_golang_x_sync//semaphore:go_default_library",
],
)
34 changes: 10 additions & 24 deletions cache/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"google.golang.org/protobuf/proto"

"github.com/prometheus/client_golang/prometheus"

"golang.org/x/sync/semaphore"
)

var tfc = tempfile.NewCreator()
Expand Down Expand Up @@ -77,13 +75,16 @@ type diskCache struct {
accessLogger *log.Logger
containsQueue chan proxyCheck

// Limit the number of simultaneous file removals.
fileRemovalSem *semaphore.Weighted

mu sync.Mutex
lru SizedLRU

gaugeCacheAge prometheus.Gauge
gaugeCacheAge prometheus.Gauge
evictionCounter *prometheus.CounterVec
evictionGauge prometheus.Gauge
evictionBytesGauge prometheus.Gauge

maxQueuedEvictions int
maxConcurrentEvictions int
}

const sha256HashStrSize = sha256.Size * 2 // Two hex characters per byte.
Expand All @@ -107,7 +108,7 @@ func badReqErr(format string, a ...interface{}) *cache.Error {
func (c *diskCache) RegisterMetrics() {
c.lru.RegisterMetrics()

prometheus.MustRegister(c.gaugeCacheAge)
prometheus.MustRegister(c.gaugeCacheAge, c.evictionCounter, c.evictionGauge, c.evictionBytesGauge)

// Update the cache age metric on a static interval
// Note: this could be modeled as a GuageFunc that updates as needed
Expand Down Expand Up @@ -166,19 +167,6 @@ func (c *diskCache) getElementPath(key Key, value lruItem) string {
return filepath.Join(c.dir, c.FileLocation(kind, value.legacy, hash, value.size, value.random))
}

func (c *diskCache) removeFile(f string) {
if err := c.fileRemovalSem.Acquire(context.Background(), 1); err != nil {
log.Printf("ERROR: failed to aquire semaphore: %v, unable to remove %s", err, f)
return
}
defer c.fileRemovalSem.Release(1)

err := os.Remove(f)
if err != nil {
log.Printf("ERROR: failed to remove evicted cache file: %s", f)
}
}

func (c *diskCache) FileLocationBase(kind cache.EntryKind, legacy bool, hash string, size int64) string {
if kind == cache.RAW {
return path.Join("raw.v2", hash[:2], hash)
Expand Down Expand Up @@ -405,10 +393,8 @@ func (c *diskCache) commit(key string, legacy bool, tempfile string, reservedSiz
random: random,
}

if !c.lru.Add(key, newItem) {
err = fmt.Errorf("INTERNAL ERROR: failed to add: %s, size %d (on disk: %d)",
key, logicalSize, sizeOnDisk)
log.Println(err.Error())
if err := c.lru.Add(key, newItem); err != nil {
log.Println(err)
return unreserve, removeTempfile, err
}

Expand Down
4 changes: 2 additions & 2 deletions cache/disk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,9 +455,9 @@ func TestCacheExistingFiles(t *testing.T) {

evicted := []Key{}
origOnEvict := testCache.lru.onEvict
testCache.lru.onEvict = func(key Key, value lruItem) {
testCache.lru.onEvict = func(key Key, value lruItem) error {
evicted = append(evicted, key.(string))
origOnEvict(key, value)
return origOnEvict(key, value)
}

if testCache.lru.Len() != 4 {
Expand Down
115 changes: 71 additions & 44 deletions cache/disk/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/prometheus/client_golang/prometheus"

"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)

const lowercaseDSStoreFile = ".ds_store"
Expand All @@ -44,20 +43,6 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (Cache, error) {
return nil, err
}

// Go defaults to a limit of 10,000 operating system threads.
// We probably don't need half of those for file removals at
// any given point in time, unless the disk/fs can't keep up.
// I suppose it's better to slow down processing than to crash
// when hitting the 10k limit or to run out of disk space.
semaphoreWeight := int64(5000)

if strings.HasPrefix(runtime.GOOS, "darwin") {
// Mac seems to fail to create os threads when removing
// lots of files, so allow fewer than linux.
semaphoreWeight = 3000
}
log.Printf("Limiting concurrent file removals to %d\n", semaphoreWeight)

zi, err := zstdimpl.Get("go")
if err != nil {
return nil, err
Expand All @@ -72,12 +57,28 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (Cache, error) {
maxBlobSize: math.MaxInt64,
maxProxyBlobSize: math.MaxInt64,

fileRemovalSem: semaphore.NewWeighted(semaphoreWeight),

gaugeCacheAge: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "bazel_remote_disk_cache_longest_item_idle_time_seconds",
Help: "The idle time (now - atime) of the last item in the LRU cache, updated once per minute. Depending on filesystem mount options (e.g. relatime), the resolution may be measured in 'days' and not accurate to the second. If using noatime this will be 0.",
}),
evictionCounter: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "bazel_remote_disk_cache_evictions_total",
Help: "The total number of evictions events.",
},
[]string{"status"},
),
evictionGauge: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "bazel_remote_disk_cache_evictions_enqueued",
Help: "The total number of entries enqueue for evictions.",
}),
evictionBytesGauge: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "bazel_remote_disk_cache_evictions_bytes_enqueued",
Help: "The number of bytes enqueue for evictions.",
}),

maxQueuedEvictions: 1000,
maxConcurrentEvictions: 10,
}

cc := CacheConfig{diskCache: &c}
Expand Down Expand Up @@ -129,30 +130,30 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (Cache, error) {
}

func (c *diskCache) migrateDirectories() error {
err := migrateDirectory(c.dir, cache.AC)
err := migrateDirectory(c.dir, cache.AC, c.accessLogger)
if err != nil {
return err
}
err = migrateDirectory(c.dir, cache.CAS)
err = migrateDirectory(c.dir, cache.CAS, c.accessLogger)
if err != nil {
return err
}
err = migrateDirectory(c.dir, cache.RAW)
err = migrateDirectory(c.dir, cache.RAW, c.accessLogger)
if err != nil {
return err
}
return nil
}

func migrateDirectory(baseDir string, kind cache.EntryKind) error {
func migrateDirectory(baseDir string, kind cache.EntryKind, accessLogger *log.Logger) error {
sourceDir := path.Join(baseDir, kind.String())

_, err := os.Stat(sourceDir)
if os.IsNotExist(err) {
return nil
}

log.Println("Migrating files (if any) to new directory structure:", sourceDir)
accessLogger.Println("Migrating files (if any) to new directory structure:", sourceDir)

listing, err := os.ReadDir(sourceDir)
if err != nil {
Expand Down Expand Up @@ -185,13 +186,13 @@ func migrateDirectory(baseDir string, kind cache.EntryKind) error {
if item.IsDir() {
if !v1DirRegex.MatchString(oldName) {
// Warn about non-v1 subdirectories.
log.Println("Warning: unexpected directory", oldNamePath)
accessLogger.Println("Warning: unexpected directory", oldNamePath)
}

destDir := filepath.Join(targetDir, oldName[:2])
err := migrateV1Subdir(oldNamePath, destDir, kind)
if err != nil {
log.Printf("Warning: failed to read subdir %q: %s",
accessLogger.Printf("Warning: failed to read subdir %q: %s",
oldNamePath, err)
continue
}
Expand All @@ -200,12 +201,12 @@ func migrateDirectory(baseDir string, kind cache.EntryKind) error {
}

if !item.Type().IsRegular() {
log.Println("Warning: skipping non-regular file:", oldNamePath)
accessLogger.Println("Warning: skipping non-regular file:", oldNamePath)
continue
}

if !validate.HashKeyRegex.MatchString(oldName) {
log.Println("Warning: skipping unexpected file:", oldNamePath)
accessLogger.Println("Warning: skipping unexpected file:", oldNamePath)
continue
}

Expand Down Expand Up @@ -234,10 +235,10 @@ func migrateDirectory(baseDir string, kind cache.EntryKind) error {
for _, item := range listing {
select {
case itemChan <- item:
log.Printf("Migrating %s item(s) %d/%d, %s\n", sourceDir, i, numItems, item.Name())
accessLogger.Printf("Migrating %s item(s) %d/%d, %s\n", sourceDir, i, numItems, item.Name())
i++
case err = <-errChan:
log.Println("Encountered error while migrating files:", err)
accessLogger.Println("Encountered error while migrating files:", err)
close(itemChan)
}
}
Expand Down Expand Up @@ -352,7 +353,7 @@ func (c *diskCache) scanDir() (scanResult, error) {
} else if numWorkers > 16 {
numWorkers = 16 // Consider increasing the upper limit after more testing.
}
log.Println("Scanning cache directory with", numWorkers, "goroutines")
c.accessLogger.Println("Scanning cache directory with", numWorkers, "goroutines")

dc := make(chan string, numWorkers) // Feed directory names to workers.
dcClosed := false
Expand Down Expand Up @@ -562,41 +563,67 @@ func (c *diskCache) scanDir() (scanResult, error) {
// LRU index so that they can be served. Files are sorted by access time first,
// so that the eviction behavior is preserved across server restarts.
func (c *diskCache) loadExistingFiles(maxSizeBytes int64) error {
log.Printf("Loading existing files in %s.\n", c.dir)
c.accessLogger.Printf("Loading existing files in %s.\n", c.dir)

result, err := c.scanDir()
if err != nil {
log.Printf("Failed to scan cache dir: %s", err.Error())
c.accessLogger.Printf("Failed to scan cache dir: %s", err.Error())
return err
}

log.Println("Sorting cache files by atime.")
c.accessLogger.Println("Sorting cache files by atime.")
sort.Sort(result)

type EvictionTask struct {
Key
lruItem
}
evictionQueue := make(chan EvictionTask, c.maxQueuedEvictions)
for i := 0; i < c.maxConcurrentEvictions; i++ {
go func() {
for pair := range evictionQueue {
f := c.getElementPath(pair.Key, pair.lruItem)
c.evictionGauge.Dec()
c.evictionBytesGauge.Sub(float64(pair.lruItem.sizeOnDisk))
err := os.Remove(f)
if err != nil {
c.evictionCounter.WithLabelValues("fail").Inc()
log.Printf("ERROR: failed to remove evicted cache file: %s", err)
} else {
c.evictionCounter.WithLabelValues("success").Inc()
}
}
}()
}

// The eviction callback deletes the file from disk.
// This function is only called while the lock is held
// by the current goroutine.
onEvict := func(key Key, value lruItem) {
f := c.getElementPath(key, value)
// Run in a goroutine so we can release the lock sooner.
go c.removeFile(f)
onEvict := func(key Key, value lruItem) error {
select {
case evictionQueue <- EvictionTask{Key: key, lruItem: value}:
c.evictionGauge.Inc()
c.evictionBytesGauge.Add(float64(value.sizeOnDisk))
return nil
default:
c.evictionCounter.WithLabelValues("full").Inc()
return fmt.Errorf("Too many enqueued evictions, could not evict %s", key)
}
}

log.Println("Building LRU index.")
c.accessLogger.Println("Building LRU index.")

c.lru = NewSizedLRU(maxSizeBytes, onEvict, len(result.item))

for i := 0; i < len(result.item); i++ {
ok := c.lru.Add(result.metadata[i].lookupKey, *result.item[i])
if !ok {
err = os.Remove(filepath.Join(c.dir, result.metadata[i].lookupKey))
if err != nil {
return err
}
err := c.lru.Add(result.metadata[i].lookupKey, *result.item[i])
if err != nil {
_ = os.Remove(filepath.Join(c.dir, result.metadata[i].lookupKey))
println(err)
}
}

log.Println("Finished loading disk cache files.")
c.accessLogger.Println("Finished loading disk cache files.")

return nil
}
Loading