Skip to content

Commit

Permalink
Introduce disk_size_limit (part 2)
Browse files Browse the repository at this point in the history
Introduce a disk_size_limit for the total disk space of:

 - Files currently in the cache.
 - Reserved space for files currently being uploaded.
 - Evicted files not yet removed.

Setting this limit is optional (at least for now).

Reservations for Put requests are rejected when
disk_size_limit is exceeded.

The prometheus gauge bazel_remote_disk_cache_size_bytes is
updated to be a max value for the previous 30 seconds,
in order to be aware of short spikes when tuning the
disk_size_limit configuration.

There is also a new prometheus gauge
bazel_remote_disk_cache_size_bytes_limit showing current
configured limits in order to help visualize if current size
is getting close to the limit and help tuning the
disk_size_limit.

Change-Id: Iaec29af9a2e02796c29f294b993989783d575c4b
  • Loading branch information
ulrfa committed Sep 1, 2023
1 parent 62b1b16 commit 8196189
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 12 deletions.
25 changes: 18 additions & 7 deletions cache/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ func badReqErr(format string, a ...interface{}) *cache.Error {
}

var ErrOverloaded = &cache.Error{
Code: http.StatusServiceUnavailable, // Too many requests/disk overloaded.
Text: "Too many requests/disk overloaded (please try again later)",
Code: http.StatusInsufficientStorage,
Text: "Out of disk space, due to too large or too many concurrent cache requests. Please try again later.",
}

// Non-test users must call this to expose metrics.
Expand All @@ -120,6 +120,21 @@ func (c *diskCache) RegisterMetrics() {
// but since the updater func must lock the cache mu, it was deemed
// necessary to have greater control of when to get the cache age
go c.pollCacheAge()

go c.shiftMetricPeriodContinuously()
}

// Shift to new period for metrics every 30 seconds. A period of
// 30 seconds should give margin to catch all peaks (with for example
// a 10 second scrape interval) even in cases of delayed or missed
// scrapes from prometheus.
func (c *diskCache) shiftMetricPeriodContinuously() {
ticker := time.NewTicker(30 * time.Second)
for ; true; <-ticker.C {
c.mu.Lock()
c.lru.shiftToNextMetricPeriod()
c.mu.Unlock()
}
}

// Update metric every minute with the idle time of the least recently used item in the cache
Expand Down Expand Up @@ -293,11 +308,7 @@ func (c *diskCache) Put(ctx context.Context, kind cache.EntryKind, hash string,
}
if !ok {
c.mu.Unlock()
return &cache.Error{
Code: http.StatusInsufficientStorage,
Text: fmt.Sprintf("The item (%d) + reserved space is larger than the cache's maximum size (%d).",
size, c.lru.MaxSize()),
}
return ErrOverloaded
}
c.mu.Unlock()
unreserve = true
Expand Down
25 changes: 23 additions & 2 deletions cache/disk/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (Cache, error) {
if err != nil {
return nil, fmt.Errorf("Attempting to migrate the old directory structure failed: %w", err)
}
err = c.loadExistingFiles(maxSizeBytes)
err = c.loadExistingFiles(maxSizeBytes, cc)
if err != nil {
return nil, fmt.Errorf("Loading of existing cache entries failed due to error: %w", err)
}
Expand Down Expand Up @@ -537,7 +537,7 @@ func (c *diskCache) scanDir() (scanResult, error) {
// loadExistingFiles lists all files in the cache directory, and adds them to the
// LRU index so that they can be served. Files are sorted by access time first,
// so that the eviction behavior is preserved across server restarts.
func (c *diskCache) loadExistingFiles(maxSizeBytes int64) error {
func (c *diskCache) loadExistingFiles(maxSizeBytes int64, cc CacheConfig) error {
log.Printf("Loading existing files in %s.\n", c.dir)

result, err := c.scanDir()
Expand All @@ -561,6 +561,15 @@ func (c *diskCache) loadExistingFiles(maxSizeBytes int64) error {

c.lru = NewSizedLRU(maxSizeBytes, onEvict, len(result.item))

log.Printf("Will evict at max_size: %.2f GB", bytesToGigaBytes(maxSizeBytes))

if cc.diskSizeLimit > 0 {
// Only set and print if optional limit is enabled.
c.lru.diskSizeLimit = cc.diskSizeLimit
log.Printf("Will reject at disk_size_limit: %.2f GB",
bytesToGigaBytes(c.lru.diskSizeLimit))
}

// Start one single goroutine running in background, continuously
// waiting for files to be removed and removing them. Benchmarks on
// Linux with XFS file system have surprisingly shown that removal
Expand All @@ -585,7 +594,19 @@ func (c *diskCache) loadExistingFiles(maxSizeBytes int64) error {
}
}

// Printing progress gives awareness about slow operations.
// And waiting for evictions to complete before accepting client
// connection reduce risk for confusing overload errors at runtime.
log.Println("Waiting for evictions...")
for c.lru.queuedEvictionsSize.Load() > 0 {
time.Sleep(200 * time.Millisecond)
}

log.Println("Finished loading disk cache files.")

return nil
}

func bytesToGigaBytes(bytes int64) float64 {
return float64(bytes) / (1024.0 * 1024.0 * 1024.0)
}
99 changes: 96 additions & 3 deletions cache/disk/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"container/list"
"errors"
"fmt"
"sync/atomic"

"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -63,9 +64,28 @@ type SizedLRU struct {
onEvict EvictCallback

gaugeCacheSizeBytes prometheus.Gauge
gaugeCacheSizeBytesLimit *prometheus.GaugeVec
gaugeCacheLogicalBytes prometheus.Gauge
counterEvictedBytes prometheus.Counter
counterOverwrittenBytes prometheus.Counter

// Peak value of: currentSize + currentlyEvictingSize
// Type is uint64 instead of int64 in order to allow representing also
// large, rejected reservations that would have resulted in values above
// the int64 diskSizeLimit.
totalDiskSizePeak uint64

// Configured max allowed bytes on disk for the cache, including files
// queued for eviction but not yet removed. Value <= 0 means no
// limit. The diskSizeLimit is expected to be configured higher than
// maxSize (e.g., 5% higher) to allow the asynchronous removal to catch
// up after peaks of file writes.
diskSizeLimit int64

// Number of bytes currently being evicted (removed from lru but not
// yet removed from disk). Is allowed to be accessed and changed
// without holding the diskCache.mu lock.
queuedEvictionsSize atomic.Int64
}

type entry struct {
Expand All @@ -87,8 +107,15 @@ func NewSizedLRU(maxSize int64, onEvict EvictCallback, initialCapacity int) Size

gaugeCacheSizeBytes: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "bazel_remote_disk_cache_size_bytes",
Help: "The current number of bytes in the disk backend",
Help: "The peak number of bytes in the disk backend for the previous 30 second period.",
}),
gaugeCacheSizeBytesLimit: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "bazel_remote_disk_cache_size_bytes_limit",
Help: "The currently configured limits of different types, e.g. for when disk cache evicts data or rejects requests.",
},
[]string{"type"},
),
gaugeCacheLogicalBytes: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "bazel_remote_disk_cache_logical_bytes",
Help: "The current number of bytes in the disk backend if they were uncompressed",
Expand All @@ -108,9 +135,19 @@ func NewSizedLRU(maxSize int64, onEvict EvictCallback, initialCapacity int) Size

func (c *SizedLRU) RegisterMetrics() {
prometheus.MustRegister(c.gaugeCacheSizeBytes)
prometheus.MustRegister(c.gaugeCacheSizeBytesLimit)
prometheus.MustRegister(c.gaugeCacheLogicalBytes)
prometheus.MustRegister(c.counterEvictedBytes)
prometheus.MustRegister(c.counterOverwrittenBytes)

// Set gauges to constant configured values to help visualize configured limits
// and in particular help tuning disk_size_limit configuration by comparing it
// against peak values of the bazel_remote_disk_cache_size_bytes prometheus gauge
// and give awareness about if getting close to rejecting requests.
c.gaugeCacheSizeBytesLimit.WithLabelValues("evict").Set(float64(c.maxSize))
if c.diskSizeLimit > 0 {
c.gaugeCacheSizeBytesLimit.WithLabelValues("reject").Set(float64(c.diskSizeLimit))
}
}

// Add adds a (key, value) to the cache, evicting items as necessary.
Expand All @@ -129,6 +166,16 @@ func (c *SizedLRU) Add(key Key, value lruItem) (ok bool) {
return false
}

// The files are already stored on disk when Add is invoked, therefore
// it is not motivated to reject based on diskSizeLimit. The check
// against maxSize is considered sufficient. However, invoke
// calcTotalDiskSizeAndUpdatePeak to update the peak value. The peak
// value is updated BEFORE triggering new evictions, to make the
// metrics reflect that both the new file and the files it
// evicts/replaces exists at disk at same time for a short period of
// time (unless Reserve method was used and evicted them).
c.calcTotalDiskSizeAndUpdatePeak(roundedUpSizeOnDisk)

var sizeDelta, uncompressedSizeDelta int64
if ee, ok := c.cache[key]; ok {
sizeDelta = roundedUpSizeOnDisk - roundUp4k(ee.Value.(*entry).value.sizeOnDisk)
Expand Down Expand Up @@ -166,7 +213,6 @@ func (c *SizedLRU) Add(key Key, value lruItem) (ok bool) {
c.currentSize += sizeDelta
c.uncompressedSize += uncompressedSizeDelta

c.gaugeCacheSizeBytes.Set(float64(c.currentSize))
c.gaugeCacheLogicalBytes.Set(float64(c.uncompressedSize))

return true
Expand All @@ -186,7 +232,6 @@ func (c *SizedLRU) Get(key Key) (value lruItem, ok bool) {
func (c *SizedLRU) Remove(key Key) {
if ele, hit := c.cache[key]; hit {
c.removeElement(ele)
c.gaugeCacheSizeBytes.Set(float64(c.currentSize))
c.gaugeCacheLogicalBytes.Set(float64(c.uncompressedSize))
}
}
Expand Down Expand Up @@ -245,6 +290,35 @@ func (c *SizedLRU) Reserve(size int64) (bool, error) {
return false, nil
}

// Note that the calculated value and the potentially updated peak
// value, includes the value tried to be reserved. In other words,
// the peak value is updated even if the limit is exceeded, and the
// reservation rejected. That is on purpose to allow using the
// prometheus gague of the peak value to understand why reservations
// are rejected. That gauge is an aid for tuning disk size limit,
// and it is therefore beneficial that the same calculated
// value (returned as totalDiskSizeNow) is used both for the metrics
// gauge and the logic for deciding about rejection.
totalDiskSizeNow := c.calcTotalDiskSizeAndUpdatePeak(size)

if c.diskSizeLimit > 0 && totalDiskSizeNow > (uint64(c.diskSizeLimit)) {

// Reject and let the client decide about retries. E.g., a bazel
// client either building locally or with
// --remote_local_fallback, can choose to have minimal number
// of retries since uploading the build result is not
// critical. And a client depending on remote execution
// where upload is critical can choose a large number of
// retries. Retrying only critical writes increases the chance
// for bazel-remote to recover from the overload quicker.
// Note that bazel-remote can continue serving reads even when
// overloaded by writes, e.g., when SSD's write IOPS capacity
// is overloaded but reads can be served from operating
// system's file system cache in RAM.

return false, nil
}

// Evict elements until we are able to reserve enough space.
for sumLargerThan(size, c.currentSize, c.maxSize) {
ele := c.ll.Back()
Expand Down Expand Up @@ -314,6 +388,7 @@ func (c *SizedLRU) getTailItem() (Key, lruItem) {
// it is invoked only when holding the diskCache.mu mutex and no one else tries
// to send to queuedEvictionsChan concurrently.
func (c *SizedLRU) appendEvictionToQueue(e *entry) {
c.queuedEvictionsSize.Add(e.value.sizeOnDisk)
select {
case queuedEvictions := <-c.queuedEvictionsChan:
c.queuedEvictionsChan <- append(queuedEvictions, e)
Expand All @@ -329,6 +404,7 @@ func (c *SizedLRU) appendEvictionToQueue(e *entry) {
func (c *SizedLRU) performQueuedEvictions() {
for _, kv := range <-c.queuedEvictionsChan {
c.onEvict(kv.key, kv.value)
c.queuedEvictionsSize.Add(-kv.value.sizeOnDisk)
}
}

Expand All @@ -338,3 +414,20 @@ func (c *SizedLRU) performQueuedEvictionsContinuously() {
c.performQueuedEvictions()
}
}

// Note that this function only needs to be called when the disk size usage
// can grow (e.g., from Reserve and Add, but not from Remove).
// Note that diskCache.mu mutex must be held when invoking this method.
func (c *SizedLRU) calcTotalDiskSizeAndUpdatePeak(sizeOfNewFile int64) uint64 {
totalDiskSizeNow := uint64(c.currentSize) + uint64(c.queuedEvictionsSize.Load()) + uint64(sizeOfNewFile)
if totalDiskSizeNow > c.totalDiskSizePeak {
c.totalDiskSizePeak = totalDiskSizeNow
}
return totalDiskSizeNow
}

// Note that diskCache.mu mutex must be held when invoking this method.
func (c *SizedLRU) shiftToNextMetricPeriod() {
c.gaugeCacheSizeBytes.Set(float64(c.totalDiskSizePeak))
c.totalDiskSizePeak = uint64(c.currentSize) + uint64(c.queuedEvictionsSize.Load())
}
8 changes: 8 additions & 0 deletions cache/disk/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Option func(*CacheConfig) error
type CacheConfig struct {
diskCache *diskCache // Assumed to be non-nil.
metrics *metricsDecorator // May be nil.
diskSizeLimit int64
}

func WithStorageMode(mode string) Option {
Expand Down Expand Up @@ -101,3 +102,10 @@ func WithEndpointMetrics() Option {
return nil
}
}

func WithDiskSizeLimit(diskSizeLimit int64) Option {
return func(cc *CacheConfig) error {
cc.diskSizeLimit = diskSizeLimit
return nil
}
}
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Config struct {
ProfileAddress string `yaml:"profile_address"`
Dir string `yaml:"dir"`
MaxSize int `yaml:"max_size"`
DiskSizeLimit int `yaml:"disk_size_limit"`
StorageMode string `yaml:"storage_mode"`
ZstdImplementation string `yaml:"zstd_implementation"`
HtpasswdFile string `yaml:"htpasswd_file"`
Expand Down Expand Up @@ -119,6 +120,7 @@ func newFromArgs(dir string, maxSize int, storageMode string, zstdImplementation
httpWriteTimeout time.Duration,
accessLogLevel string,
logTimezone string,
diskSizeLimit int,
maxBlobSize int64,
maxProxyBlobSize int64) (*Config, error) {

Expand All @@ -128,6 +130,7 @@ func newFromArgs(dir string, maxSize int, storageMode string, zstdImplementation
ProfileAddress: profileAddress,
Dir: dir,
MaxSize: maxSize,
DiskSizeLimit: diskSizeLimit,
StorageMode: storageMode,
ZstdImplementation: zstdImplementation,
HtpasswdFile: htpasswdFile,
Expand Down Expand Up @@ -521,6 +524,7 @@ func get(ctx *cli.Context) (*Config, error) {
ctx.Duration("http_write_timeout"),
ctx.String("access_log_level"),
ctx.String("log_timezone"),
ctx.Int("disk_size_limit"),
ctx.Int64("max_blob_size"),
ctx.Int64("max_proxy_blob_size"),
)
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func run(ctx *cli.Context) error {
disk.WithZstdImplementation(c.ZstdImplementation),
disk.WithMaxBlobSize(c.MaxBlobSize),
disk.WithProxyMaxBlobSize(c.MaxProxyBlobSize),
disk.WithDiskSizeLimit(int64(c.DiskSizeLimit) * 1024 * 1024 * 1024),
disk.WithAccessLogger(c.AccessLogger),
}
if c.ProxyBackend != nil {
Expand Down
6 changes: 6 additions & 0 deletions utils/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ func GetCliFlags() []cli.Flag {
Usage: "The maximum size of bazel-remote's disk cache in GiB. This flag is required.",
EnvVars: []string{"BAZEL_REMOTE_MAX_SIZE"},
},
&cli.Int64Flag{
Name: "disk_size_limit",
Value: -1,
Usage: "The maximum size of bazel-remote's disk cache, including files queue for eviction. in GiB. Limit is disabled by default.",
EnvVars: []string{"BAZEL_REMOTE_DISK_SIZE_LIMIT"},
},
&cli.StringFlag{
Name: "storage_mode",
Value: "zstd",
Expand Down

0 comments on commit 8196189

Please sign in to comment.