Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local disk cache: part 1 #530

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions go/pkg/diskcache/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "diskcache",
srcs = [
"diskcache.go",
"sys_darwin.go",
"sys_linux.go",
"sys_windows.go",
],
importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/diskcache",
visibility = ["//visibility:public"],
deps = [
"//go/pkg/digest",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_golang_glog//:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
],
)

go_test(
name = "diskcache_test",
srcs = ["diskcache_test.go"],
embed = [":diskcache"],
deps = [
"//go/pkg/digest",
"//go/pkg/testutil",
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_google_go_cmp//cmp:go_default_library",
"@com_github_pborman_uuid//:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)
323 changes: 323 additions & 0 deletions go/pkg/diskcache/diskcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
// Package diskcache implements a local disk LRU CAS cache.
package diskcache

import (
"container/heap"
"context"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
log "github.com/golang/glog"
)

type key struct {
digest digest.Digest
}

// An qitem is something we manage in a priority queue.
type qitem struct {
key key
lat time.Time // The last accessed time of the file.
index int // The index of the item in the heap.
mu sync.RWMutex // Protects the data-structure consistency for the given digest.
}

// A priorityQueue implements heap.Interface and holds qitems.
type priorityQueue struct {
items []*qitem
n int
}

func (q *priorityQueue) Len() int {
return q.n
}

func (q *priorityQueue) Less(i, j int) bool {
// We want Pop to give us the oldest item.
return q.items[i].lat.Before(q.items[j].lat)
}

func (q priorityQueue) Swap(i, j int) {
q.items[i], q.items[j] = q.items[j], q.items[i]
q.items[i].index = i
q.items[j].index = j
}

func (q *priorityQueue) Push(x any) {
if q.n == cap(q.items) {
// Resize the queue
old := q.items
q.items = make([]*qitem, 2*cap(old)) // Initial capacity needs to be > 0.
copy(q.items, old)
}
item := x.(*qitem)
item.index = q.n
q.items[item.index] = item
q.n++
}

func (q *priorityQueue) Pop() any {
item := q.items[q.n-1]
q.items[q.n-1] = nil // avoid memory leak
item.index = -1 // for safety
q.n--
return item
}

// bumps item to the head of the queue.
func (q *priorityQueue) Bump(item *qitem) {
// Sanity check, necessary because of possible racing between Bump and GC:
if item.index < 0 || item.index >= q.n || q.items[item.index].key != item.key {
return
}
item.lat = time.Now()
heap.Fix(q, item.index)
}

const maxConcurrentRequests = 1000

// DiskCache is a local disk LRU CAS and Action Cache cache.
type DiskCache struct {
root string // path to the root directory of the disk cache.
maxCapacityBytes uint64 // if disk size exceeds this, old items will be evicted as needed.
mu sync.Mutex // protects the queue.
store sync.Map // map of keys to qitems.
queue *priorityQueue // keys by last accessed time.
sizeBytes int64 // total size.
ctx context.Context
shutdown chan bool
gcTick uint64
gcReq chan uint64
testGcTicks chan uint64
}

func New(ctx context.Context, root string, maxCapacityBytes uint64) *DiskCache {
res := &DiskCache{
root: root,
maxCapacityBytes: maxCapacityBytes,
ctx: ctx,
queue: &priorityQueue{
items: make([]*qitem, 1000),
},
gcReq: make(chan uint64, maxConcurrentRequests),
shutdown: make(chan bool),
}
heap.Init(res.queue)
_ = os.MkdirAll(root, os.ModePerm)
// We use Git's directory/file naming structure as inspiration:
// https://git-scm.com/book/en/v2/Git-Internals-Git-Objects#:~:text=The%20subdirectory%20is%20named%20with%20the%20first%202%20characters%20of%20the%20SHA%2D1%2C%20and%20the%20filename%20is%20the%20remaining%2038%20characters.
var wg sync.WaitGroup
wg.Add(256)
for i := 0; i < 256; i++ {
prefixDir := filepath.Join(root, fmt.Sprintf("%02x", i))
go func() {
defer wg.Done()
_ = os.MkdirAll(prefixDir, os.ModePerm)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are extreme cases where this could happen (e.g. disk full). I'd recommend to use https://pkg.go.dev/golang.org/x/sync/errgroup#WithContext for error management instead of logging as a silent failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, basically my main problem here was I didn't want to return an error from New, because that would mean that the client Opt can return an error on Apply, and the code currently doesn't support that.

To be fair, that is the right thing to do, imo -- options can, in general, fail. So I'd propose a separate independent PR to add an error return type to Apply, and then I can propagate all the errors properly from here. @mrahs does that SGTY?

Another idea (pushing the problem further down the line) would be to return an error from New now, but then log + ignore it in Apply. Which is what I'll do now, until @mrahs weighs in with whether he's okay with me changing the Apply type.

Thank you!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd push error handling on the user by making the client.Opt accept a *DiskCache instance.

_ = filepath.WalkDir(prefixDir, func(path string, d fs.DirEntry, err error) error {
// We log and continue on all errors, because cache read errors are not critical.
if err != nil {
log.Errorf("Error reading cache directory: %v", err)
return nil
}
if d.IsDir() {
return nil
}
subdir := filepath.Base(filepath.Dir(path))
k, err := res.getKeyFromFileName(subdir + d.Name())
if err != nil {
log.Errorf("Error parsing cached file name %s: %v", path, err)
return nil
}
atime, err := GetLastAccessTime(path)
if err != nil {
log.Errorf("Error getting last accessed time of %s: %v", path, err)
return nil
}
it := &qitem{
key: k,
lat: atime,
}
size, err := res.getItemSize(k)
if err != nil {
log.Errorf("Error getting file size of %s: %v", path, err)
return nil
}
res.store.Store(k, it)
atomic.AddInt64(&res.sizeBytes, size)
res.mu.Lock()
heap.Push(res.queue, it)
res.mu.Unlock()
return nil
})
}()
}
wg.Wait()
go res.gc()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think gc should only be called if len(res.queue) > X && res.sizeBytes > Y && res.queue[0].lat < time.Now().Sub(Z) to reduce unnecessary processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand -- gc is a daemon thread, it blocks on the GC request channel (gcReq) and only works if there are things to do (total size is over capacity). And we only send a GC request to the channel if we reach capacity. The current line only starts the daemon thread. Did you mean changes the gc thread itself, or to places where a GC request is added to the channel?

return res
}

func (d *DiskCache) getItemSize(k key) (int64, error) {
return k.digest.Size, nil
}

// Releases resources and terminates the GC daemon. Should be the last call to the DiskCache.
func (d *DiskCache) Shutdown() {
d.shutdown <- true
}

func (d *DiskCache) TotalSizeBytes() uint64 {
return uint64(atomic.LoadInt64(&d.sizeBytes))
}

func (d *DiskCache) getKeyFromFileName(fname string) (key, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be a member function.

pair := strings.Split(fname, ".")
if len(pair) != 2 {
return key{}, fmt.Errorf("expected file name in the form [ac_]hash/size, got %s", fname)
}
size, err := strconv.ParseInt(pair[1], 10, 64)
if err != nil {
return key{}, fmt.Errorf("invalid size in digest %s: %s", fname, err)
}
dg, err := digest.New(pair[0], size)
if err != nil {
return key{}, fmt.Errorf("invalid digest from file name %s: %v", fname, err)
}
return key{digest: dg}, nil
}

func (d *DiskCache) getPath(k key) string {
return filepath.Join(d.root, k.digest.Hash[:2], fmt.Sprintf("%s.%d", k.digest.Hash[2:], k.digest.Size))
}

func (d *DiskCache) StoreCas(dg digest.Digest, path string) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think generalizing this method to Write(dg digest.Digest, io.Reader) error provides several advantages:

We should also document that the cache does not enforce any relationship between dg and the reader's bytes.

if dg.Size > int64(d.maxCapacityBytes) {
return fmt.Errorf("blob size %d exceeds DiskCache capacity %d", dg.Size, d.maxCapacityBytes)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more graceful behavior is to try to trigger a synchronous GC first before erroring out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? This will fail for sure regardless of GC. Note that we compare with maxCapacityBytes, not the current sizeBytes.

}
it := &qitem{
key: key{digest: dg},
lat: time.Now(),
}
it.mu.Lock()
defer it.mu.Unlock()
_, exists := d.store.LoadOrStore(it.key, it)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

d.store is already synchronized by it.mu. I'd vote to make store a normal map[digest]*qitem instead; it would be faster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, no, store cannot be synchronized by it.mu because it.mu is new (that would be a race condition). It could, in theory, be a normal map synchronized by mu instead (the DiskCache class variable that we use to protect queue), but that would be slower than using a sync map:

The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.

(from https://pkg.go.dev/sync#Map)

We have a classical case 1 here, where almost always a key is only written once but read many times from concurrent threads. A sync map should greatly reduce lock contention.

if exists {
return nil
}
d.mu.Lock()
heap.Push(d.queue, it)
d.mu.Unlock()
if err := copyFile(path, d.getPath(it.key), dg.Size); err != nil {
return err
}
newSize := uint64(atomic.AddInt64(&d.sizeBytes, dg.Size))
if newSize > d.maxCapacityBytes {
select {
case d.gcReq <- atomic.AddUint64(&d.gcTick, 1):
default:
}
}
return nil
}

func (d *DiskCache) gc() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this implementation may have the following issues:

  • if the capacity is not sufficient for the workload, eviction becomes part of every update.
  • having three different locks being acquired and released independently is prune to races; e.g. attempting to load an item after it's been deleted from the ordered list but before it's been deleted from the map, will fail to update the index in the list.
  • the client must remember to Shutdown the cache to avoid resource leaks.

The capacity issue can be mitigated by evicting a percentage of the cache rather than just enough to meet the threshold. This accommodates several new items before triggering another eviction.

The synchronization issue can be traded off with a simpler implementation that uses a single lock. Updating the list and the map should be an atomic operation in which the entire list is shared and therefore is the bottleneck. Actually, hits are bounded by IO anyways. Misses on the other hand can be faster with less locking. Then again, misses are always followed by updates which are bounded by IO. I think IO is more of a bottleneck than synchronization so it's worth going for a simple locking flow.

How bad is it if eviction events were blocking? I think if eviction saturates IO bandwidth it should be the same whether it's async or sync. Maybe blocking would be faster if we consider disk locality. I also think that most users would prefer using a large cache to avoid eviction anyways. Without a background eviction task, the cache would be simpler to work with.

for {
select {
case <-d.shutdown:
return
case <-d.ctx.Done():
return
case t := <-d.gcReq:
// Evict old entries until total size is below cap.
for uint64(atomic.LoadInt64(&d.sizeBytes)) > d.maxCapacityBytes {
d.mu.Lock()
it := heap.Pop(d.queue).(*qitem)
d.mu.Unlock()
size, err := d.getItemSize(it.key)
if err != nil {
log.Errorf("error getting item size for %v: %v", it.key, err)
size = 0
}
atomic.AddInt64(&d.sizeBytes, -size)
it.mu.Lock()
// We only delete the files, and not the prefix directories, because the prefixes are not worth worrying about.
if err := os.Remove(d.getPath(it.key)); err != nil {
log.Errorf("Error removing file: %v", err)
}
d.store.Delete(it.key)
it.mu.Unlock()
}
if d.testGcTicks != nil {
select {
case d.testGcTicks <- t:
default:
}
}
}
}
}

// Copy file contents retaining the source permissions.
func copyFile(src, dst string, size int64) error {
srcInfo, err := os.Stat(src)
if err != nil {
return err

Check failure on line 272 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func os.Stat(name string) (io/fs.FileInfo, error) (wrapcheck)
}
in, err := os.Open(src)
if err != nil {
return err

Check failure on line 276 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func os.Open(name string) (*os.File, error) (wrapcheck)
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
return err

Check failure on line 281 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func os.Create(name string) (*os.File, error) (wrapcheck)
}
if err := out.Chmod(srcInfo.Mode()); err != nil {
return err

Check failure on line 284 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func (*os.File).Chmod(mode io/fs.FileMode) error (wrapcheck)
}
defer out.Close()
_, err = io.Copy(out, in)
Copy link

@goodov goodov Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move the file copy function into a separate source (similar to atim_*) so it can be altered for darwin and windows to make use of clonefile and upcoming Windows DevDrive CoW?

clonefile does preserve the chmod, so it can be a single call. We also don't need to manually do a buffer copy here. Just let the OS perform the file copy action and update chmod if it's required, it should be much faster.

CloneFile utils can be found here as an example:
https://github.com/git-lfs/git-lfs/blob/main/tools/util_darwin.go
https://github.com/git-lfs/git-lfs/blob/main/tools/util_linux.go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea, thank you! I didn't know of clonefile. I renamed the system-specific files to sys_* and changed the header to match, so that this change can be nicely done in a followup PR. It is a very contained optimization which doesn't affect the cache layout, so it's backward-compatible, and hence imo it belongs in a folllowup.

if err != nil {
return err

Check failure on line 289 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func io.Copy(dst io.Writer, src io.Reader) (written int64, err error) (wrapcheck)
}
// Required sanity check: sometimes the copy pretends to succeed, but doesn't, if
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on that? That's news to me. Or you mean in the general case of a race condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, if we write to a file while concurrently deleting it, the Copy might not return an error (even if no bytes were actually copied), and neither the Stat. But the Stat will return different size than expected (usually, 0).
But the comment made me notice that io.Copy actually returns the number of bytes copied and seems to be correct, so I amended the sanity check to only verify the copy return value to be faster.

// the file is being concurrently deleted.
dstInfo, err := os.Stat(dst)
if err != nil {
return err

Check failure on line 295 in go/pkg/diskcache/diskcache.go

View workflow job for this annotation

GitHub Actions / lint

error returned from external package is unwrapped: sig: func os.Stat(name string) (io/fs.FileInfo, error) (wrapcheck)
}
if dstInfo.Size() != size {
return fmt.Errorf("copy of %s to %s failed: src/dst size mismatch: wanted %d, got %d", src, dst, size, dstInfo.Size())
}
return nil
}

// If the digest exists in the disk cache, copy the file contents to the given path.
func (d *DiskCache) LoadCas(dg digest.Digest, path string) bool {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd propose to rename from LoadCas to CopyFromCache

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize the sister function is called StoreCas. I don't know if it's a nomenclature reused in this code base, if so ignore this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, Load* and Store* are consistent with sync.Map's naming, we also sometimes use Get/Update. I'll leave the naming decisions to @mrahs.

Copy link
Collaborator

@mrahs mrahs Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the cache can have three main methods: putting bytes, getting bytes, copying bytes to a file path. Since the cache deals with bytes, I'm inclined towards Write(key, Reader), Read(key, Writer), Copy(key, filepath), but Store, Load, LoadInto/CopyTo also sound reasonable. I made another comment on generalizing this method which would avoid having to choose suffixes.

k := key{digest: dg}
iUntyped, loaded := d.store.Load(k)
if !loaded {
return false
}
it := iUntyped.(*qitem)
it.mu.RLock()
if err := copyFile(d.getPath(k), path, dg.Size); err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This simplifies the control flow:

it.mu.RLock()
err := copyFile(d.getPath(k), path, dg.Size)
it.mu.RUnlock()
if err != nil {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, thank you.

// It is not possible to prevent a race with GC; hence, we return false on copy errors.
it.mu.RUnlock()
return false
}
it.mu.RUnlock()

d.mu.Lock()
d.queue.Bump(it)
d.mu.Unlock()
return true
}
Loading
Loading