Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local disk cache: part 1 #530

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions go/pkg/diskcache/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "diskcache",
srcs = [
"diskcache.go",
"sys_darwin.go",
"sys_linux.go",
"sys_windows.go",
],
importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/diskcache",
visibility = ["//visibility:public"],
deps = [
"//go/pkg/digest",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_golang_glog//:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

go_test(
name = "diskcache_test",
srcs = ["diskcache_test.go"],
embed = [":diskcache"],
deps = [
"//go/pkg/digest",
"//go/pkg/testutil",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_google_go_cmp//cmp:go_default_library",
"@com_github_pborman_uuid//:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)
331 changes: 331 additions & 0 deletions go/pkg/diskcache/diskcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,331 @@
// Package diskcache implements a local disk LRU CAS cache.
package diskcache

import (
"container/heap"
"context"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"golang.org/x/sync/errgroup"

log "github.com/golang/glog"
)

type key struct {
digest digest.Digest
}

// An qitem is something we manage in a priority queue.
type qitem struct {
key key
lat time.Time // The last accessed time of the file.
index int // The index of the item in the heap.
mu sync.RWMutex // Protects the data-structure consistency for the given digest.
}

// A priorityQueue implements heap.Interface and holds qitems.
type priorityQueue struct {
items []*qitem
n int
}

func (q *priorityQueue) Len() int {
return q.n
}

func (q *priorityQueue) Less(i, j int) bool {
// We want Pop to give us the oldest item.
return q.items[i].lat.Before(q.items[j].lat)
}

func (q priorityQueue) Swap(i, j int) {
q.items[i], q.items[j] = q.items[j], q.items[i]
q.items[i].index = i
q.items[j].index = j
}

func (q *priorityQueue) Push(x any) {
if q.n == cap(q.items) {
// Resize the queue
old := q.items
q.items = make([]*qitem, 2*cap(old)) // Initial capacity needs to be > 0.
copy(q.items, old)
}
item := x.(*qitem)
item.index = q.n
q.items[item.index] = item
q.n++
}

func (q *priorityQueue) Pop() any {
item := q.items[q.n-1]
q.items[q.n-1] = nil // avoid memory leak
item.index = -1 // for safety
q.n--
return item
}

// bumps item to the head of the queue.
func (q *priorityQueue) Bump(item *qitem) {
// Sanity check, necessary because of possible racing between Bump and GC:
if item.index < 0 || item.index >= q.n || q.items[item.index].key != item.key {
return
}
item.lat = time.Now()
heap.Fix(q, item.index)
}

const maxConcurrentRequests = 1000

// DiskCache is a local disk LRU CAS and Action Cache cache.
type DiskCache struct {
root string // path to the root directory of the disk cache.
maxCapacityBytes uint64 // if disk size exceeds this, old items will be evicted as needed.
mu sync.Mutex // protects the queue.
store sync.Map // map of keys to qitems.
queue *priorityQueue // keys by last accessed time.
sizeBytes int64 // total size.
ctx context.Context
shutdown chan bool
gcTick uint64
gcReq chan uint64
testGcTicks chan uint64
}

func New(ctx context.Context, root string, maxCapacityBytes uint64) (*DiskCache, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think walking the disk on initialization is going to introduce a performance hit proportional to the size of the cache. I like the idea of persisting the state to a file and reloading that single file on startup. This has two main advantages:

  • cheaper startup cost
  • access times are persisted as known by the cache, regardless of the nuances of the underlying filesystem (e.g. may be mounted with noatime)

res := &DiskCache{
root: root,
maxCapacityBytes: maxCapacityBytes,
ctx: ctx,
queue: &priorityQueue{
items: make([]*qitem, 1000),
},
gcReq: make(chan uint64, maxConcurrentRequests),
shutdown: make(chan bool),
}
heap.Init(res.queue)
if err := os.MkdirAll(root, os.ModePerm); err != nil {
return nil, err

Check failure on line 118 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func os.MkdirAll(path string, perm io/fs.FileMode) error (wrapcheck)
}
// We use Git's directory/file naming structure as inspiration:
// https://git-scm.com/book/en/v2/Git-Internals-Git-Objects#:~:text=The%20subdirectory%20is%20named%20with%20the%20first%202%20characters%20of%20the%20SHA%2D1%2C%20and%20the%20filename%20is%20the%20remaining%2038%20characters.
eg, eCtx := errgroup.WithContext(ctx)
for i := 0; i < 256; i++ {
prefixDir := filepath.Join(root, fmt.Sprintf("%02x", i))
eg.Go(func() error {
if eCtx.Err() != nil {
return eCtx.Err()

Check failure on line 127 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from interface method should be wrapped: sig: func (context.Context).Err() error (wrapcheck)
}
if err := os.MkdirAll(prefixDir, os.ModePerm); err != nil {
return err

Check failure on line 130 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func os.MkdirAll(path string, perm io/fs.FileMode) error (wrapcheck)
}
return filepath.WalkDir(prefixDir, func(path string, d fs.DirEntry, err error) error {

Check failure on line 132 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func path/filepath.WalkDir(root string, fn io/fs.WalkDirFunc) error (wrapcheck)
// We log and continue on all errors, because cache read errors are not critical.
if err != nil {
return fmt.Errorf("error reading cache directory: %v", err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/%v/%w/g ?

}
if d.IsDir() {
return nil
}
subdir := filepath.Base(filepath.Dir(path))
k, err := res.getKeyFromFileName(subdir + d.Name())
if err != nil {
return fmt.Errorf("error parsing cached file name %s: %v", path, err)
}
atime, err := getLastAccessTime(path)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's likely already cached in d.Info(), I'd inline this function here to save one stat per file.

if err != nil {
return fmt.Errorf("error getting last accessed time of %s: %v", path, err)
}
it := &qitem{
key: k,
lat: atime,
}
size, err := res.getItemSize(k)
if err != nil {
return fmt.Errorf("error getting file size of %s: %v", path, err)
}
res.store.Store(k, it)
atomic.AddInt64(&res.sizeBytes, size)
res.mu.Lock()
heap.Push(res.queue, it)
res.mu.Unlock()
return nil
})
})
}
if err := eg.Wait(); err != nil {
return nil, err

Check failure on line 167 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func (*golang.org/x/sync/errgroup.Group).Wait() error (wrapcheck)
}
go res.gc()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think gc should only be called if len(res.queue) > X && res.sizeBytes > Y && res.queue[0].lat < time.Now().Sub(Z) to reduce unnecessary processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand -- gc is a daemon thread, it blocks on the GC request channel (gcReq) and only works if there are things to do (total size is over capacity). And we only send a GC request to the channel if we reach capacity. The current line only starts the daemon thread. Did you mean changes the gc thread itself, or to places where a GC request is added to the channel?

return res, nil
}

func (d *DiskCache) getItemSize(k key) (int64, error) {
return k.digest.Size, nil
}

// Releases resources and terminates the GC daemon. Should be the last call to the DiskCache.
func (d *DiskCache) Shutdown() {
d.shutdown <- true
}

func (d *DiskCache) TotalSizeBytes() uint64 {
return uint64(atomic.LoadInt64(&d.sizeBytes))
}

func (d *DiskCache) getKeyFromFileName(fname string) (key, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be a member function.

pair := strings.Split(fname, ".")
if len(pair) != 2 {
return key{}, fmt.Errorf("expected file name in the form [ac_]hash/size, got %s", fname)
}
size, err := strconv.ParseInt(pair[1], 10, 64)
if err != nil {
return key{}, fmt.Errorf("invalid size in digest %s: %s", fname, err)
}
dg, err := digest.New(pair[0], size)
if err != nil {
return key{}, fmt.Errorf("invalid digest from file name %s: %v", fname, err)
}
return key{digest: dg}, nil
}

func (d *DiskCache) getPath(k key) string {
return filepath.Join(d.root, k.digest.Hash[:2], fmt.Sprintf("%s.%d", k.digest.Hash[2:], k.digest.Size))
}

func (d *DiskCache) StoreCas(dg digest.Digest, path string) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think generalizing this method to Write(dg digest.Digest, io.Reader) error provides several advantages:

We should also document that the cache does not enforce any relationship between dg and the reader's bytes.

if dg.Size > int64(d.maxCapacityBytes) {
return fmt.Errorf("blob size %d exceeds DiskCache capacity %d", dg.Size, d.maxCapacityBytes)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more graceful behavior is to try to trigger a synchronous GC first before erroring out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? This will fail for sure regardless of GC. Note that we compare with maxCapacityBytes, not the current sizeBytes.

}
it := &qitem{
key: key{digest: dg},
lat: time.Now(),
}
it.mu.Lock()
defer it.mu.Unlock()
_, exists := d.store.LoadOrStore(it.key, it)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

d.store is already synchronized by it.mu. I'd vote to make store a normal map[digest]*qitem instead; it would be faster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, no, store cannot be synchronized by it.mu because it.mu is new (that would be a race condition). It could, in theory, be a normal map synchronized by mu instead (the DiskCache class variable that we use to protect queue), but that would be slower than using a sync map:

The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.

(from https://pkg.go.dev/sync#Map)

We have a classical case 1 here, where almost always a key is only written once but read many times from concurrent threads. A sync map should greatly reduce lock contention.

if exists {
return nil
}
d.mu.Lock()
heap.Push(d.queue, it)
d.mu.Unlock()
if err := copyFile(path, d.getPath(it.key), dg.Size); err != nil {
return err
}
newSize := uint64(atomic.AddInt64(&d.sizeBytes, dg.Size))
if newSize > d.maxCapacityBytes {
select {
case d.gcReq <- atomic.AddUint64(&d.gcTick, 1):
default:
}
}
return nil
}

func (d *DiskCache) gc() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this implementation may have the following issues:

  • if the capacity is not sufficient for the workload, eviction becomes part of every update.
  • having three different locks being acquired and released independently is prune to races; e.g. attempting to load an item after it's been deleted from the ordered list but before it's been deleted from the map, will fail to update the index in the list.
  • the client must remember to Shutdown the cache to avoid resource leaks.

The capacity issue can be mitigated by evicting a percentage of the cache rather than just enough to meet the threshold. This accommodates several new items before triggering another eviction.

The synchronization issue can be traded off with a simpler implementation that uses a single lock. Updating the list and the map should be an atomic operation in which the entire list is shared and therefore is the bottleneck. Actually, hits are bounded by IO anyways. Misses on the other hand can be faster with less locking. Then again, misses are always followed by updates which are bounded by IO. I think IO is more of a bottleneck than synchronization so it's worth going for a simple locking flow.

How bad is it if eviction events were blocking? I think if eviction saturates IO bandwidth it should be the same whether it's async or sync. Maybe blocking would be faster if we consider disk locality. I also think that most users would prefer using a large cache to avoid eviction anyways. Without a background eviction task, the cache would be simpler to work with.

for {
select {
case <-d.shutdown:
return
case <-d.ctx.Done():
return
case t := <-d.gcReq:
// Evict old entries until total size is below cap.
for uint64(atomic.LoadInt64(&d.sizeBytes)) > d.maxCapacityBytes {
d.mu.Lock()
it := heap.Pop(d.queue).(*qitem)
d.mu.Unlock()
size, err := d.getItemSize(it.key)
if err != nil {
log.Errorf("error getting item size for %v: %v", it.key, err)
size = 0
}
atomic.AddInt64(&d.sizeBytes, -size)
it.mu.Lock()
// We only delete the files, and not the prefix directories, because the prefixes are not worth worrying about.
if err := os.Remove(d.getPath(it.key)); err != nil {
log.Errorf("Error removing file: %v", err)
}
d.store.Delete(it.key)
it.mu.Unlock()
}
if d.testGcTicks != nil {
select {
case d.testGcTicks <- t:
default:
}
}
}
}
}

// Copy file contents retaining the source permissions.
func copyFile(src, dst string, size int64) error {
srcInfo, err := os.Stat(src)
if err != nil {
return err

Check failure on line 277 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func os.Stat(name string) (io/fs.FileInfo, error) (wrapcheck)
}
in, err := os.Open(src)
if err != nil {
return err

Check failure on line 281 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func os.Open(name string) (*os.File, error) (wrapcheck)
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
return err

Check failure on line 286 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func os.Create(name string) (*os.File, error) (wrapcheck)
}
if err := out.Chmod(srcInfo.Mode()); err != nil {
return err

Check failure on line 289 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func (*os.File).Chmod(mode io/fs.FileMode) error (wrapcheck)
}
defer out.Close()
n, err := io.Copy(out, in)
if err != nil {
return err

Check failure on line 294 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func io.Copy(dst io.Writer, src io.Reader) (written int64, err error) (wrapcheck)
}
// Required sanity check: if the file is being concurrently deleted, we may not always copy everything.
if n != size {
return fmt.Errorf("copy of %s to %s failed: src/dst size mismatch: wanted %d, got %d", src, dst, size, n)
}
return nil
}

// If the digest exists in the disk cache, copy the file contents to the given path.
func (d *DiskCache) LoadCas(dg digest.Digest, path string) bool {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd propose to rename from LoadCas to CopyFromCache

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize the sister function is called StoreCas. I don't know if it's a nomenclature reused in this code base, if so ignore this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, Load* and Store* are consistent with sync.Map's naming, we also sometimes use Get/Update. I'll leave the naming decisions to @mrahs.

Copy link
Collaborator

@mrahs mrahs Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the cache can have three main methods: putting bytes, getting bytes, copying bytes to a file path. Since the cache deals with bytes, I'm inclined towards Write(key, Reader), Read(key, Writer), Copy(key, filepath), but Store, Load, LoadInto/CopyTo also sound reasonable. I made another comment on generalizing this method which would avoid having to choose suffixes.

k := key{digest: dg}
iUntyped, loaded := d.store.Load(k)
if !loaded {
return false
}
it := iUntyped.(*qitem)
it.mu.RLock()
err := copyFile(d.getPath(k), path, dg.Size)
it.mu.RUnlock()
if err != nil {
// It is not possible to prevent a race with GC; hence, we return false on copy errors.
return false
}

d.mu.Lock()
d.queue.Bump(it)
d.mu.Unlock()
return true
}

func getLastAccessTime(path string) (time.Time, error) {
info, err := os.Stat(path)
if err != nil {
return time.Time{}, err
}
return FileInfoToAccessTime(info), nil
}
Loading
Loading