From 1046709ff9ad0107e55c3e36ecda80bcb1dcde7e Mon Sep 17 00:00:00 2001 From: Ola Rozenfeld Date: Wed, 31 Jan 2024 17:31:50 -0500 Subject: [PATCH 1/3] Local disk cache: part 1 --- go/pkg/diskcache/BUILD.bazel | 33 +++ go/pkg/diskcache/atim_darwin.go | 16 ++ go/pkg/diskcache/atim_linux.go | 16 ++ go/pkg/diskcache/atim_windows.go | 18 ++ go/pkg/diskcache/diskcache.go | 314 +++++++++++++++++++++++++++++ go/pkg/diskcache/diskcache_test.go | 256 +++++++++++++++++++++++ 6 files changed, 653 insertions(+) create mode 100644 go/pkg/diskcache/BUILD.bazel create mode 100644 go/pkg/diskcache/atim_darwin.go create mode 100644 go/pkg/diskcache/atim_linux.go create mode 100644 go/pkg/diskcache/atim_windows.go create mode 100644 go/pkg/diskcache/diskcache.go create mode 100644 go/pkg/diskcache/diskcache_test.go diff --git a/go/pkg/diskcache/BUILD.bazel b/go/pkg/diskcache/BUILD.bazel new file mode 100644 index 00000000..e21bc258 --- /dev/null +++ b/go/pkg/diskcache/BUILD.bazel @@ -0,0 +1,33 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "diskcache", + srcs = [ + "atim_darwin.go", + "atim_linux.go", + "atim_windows.go", + "diskcache.go", + ], + importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/diskcache", + visibility = ["//visibility:public"], + deps = [ + "//go/pkg/digest", + "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", + "@com_github_golang_glog//:go_default_library", + "@org_golang_google_protobuf//proto:go_default_library", + ], +) + +go_test( + name = "diskcache_test", + srcs = ["diskcache_test.go"], + embed = [":diskcache"], + deps = [ + "//go/pkg/digest", + "//go/pkg/testutil", + "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", + "@com_github_google_go_cmp//cmp:go_default_library", + "@com_github_pborman_uuid//:go_default_library", + "@org_golang_x_sync//errgroup:go_default_library", + ], +) \ No newline at end of file diff --git a/go/pkg/diskcache/atim_darwin.go b/go/pkg/diskcache/atim_darwin.go new file mode 100644 index 00000000..82509e75 --- /dev/null +++ b/go/pkg/diskcache/atim_darwin.go @@ -0,0 +1,16 @@ +// Utility to get the last accessed time on Darwin. +package diskcache + +import ( + "os" + "syscall" + "time" +) + +func GetLastAccessTime(path string) (time.Time, error) { + info, err := os.Stat(path) + if err != nil { + return time.Time{}, err + } + return time.Unix(info.Sys().(*syscall.Stat_t).Atimespec.Unix()), nil +} diff --git a/go/pkg/diskcache/atim_linux.go b/go/pkg/diskcache/atim_linux.go new file mode 100644 index 00000000..c34df512 --- /dev/null +++ b/go/pkg/diskcache/atim_linux.go @@ -0,0 +1,16 @@ +// Utility to get the last accessed time on Linux. +package diskcache + +import ( + "os" + "syscall" + "time" +) + +func GetLastAccessTime(path string) (time.Time, error) { + info, err := os.Stat(path) + if err != nil { + return time.Time{}, err + } + return time.Unix(info.Sys().(*syscall.Stat_t).Atim.Unix()), nil +} diff --git a/go/pkg/diskcache/atim_windows.go b/go/pkg/diskcache/atim_windows.go new file mode 100644 index 00000000..c633346a --- /dev/null +++ b/go/pkg/diskcache/atim_windows.go @@ -0,0 +1,18 @@ +// Utility to get the last accessed time on Windows. +package diskcache + +import ( + "os" + "syscall" + "time" +) + +// This will return correct values only if `fsutil behavior set disablelastaccess 0` is set. +// Tracking of last access time is disabled by default on Windows. +func GetLastAccessTime(path string) (time.Time, error) { + info, err := os.Stat(path) + if err != nil { + return time.Time{}, err + } + return time.Unix(0, info.Sys().(*syscall.Win32FileAttributeData).LastAccessTime.Nanoseconds()), nil +} diff --git a/go/pkg/diskcache/diskcache.go b/go/pkg/diskcache/diskcache.go new file mode 100644 index 00000000..d6d5bb4f --- /dev/null +++ b/go/pkg/diskcache/diskcache.go @@ -0,0 +1,314 @@ +// Package diskcache implements a local disk LRU CAS cache. +package diskcache + +import ( + "container/heap" + "context" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/bazelbuild/remote-apis-sdks/go/pkg/digest" + log "github.com/golang/glog" +) + +type key struct { + digest digest.Digest +} + +// An qitem is something we manage in a priority queue. +type qitem struct { + key key + lat time.Time // The last accessed time of the file. + index int // The index of the item in the heap. + mu sync.RWMutex // Protects the data-structure consistency for the given digest. +} + +// A priorityQueue implements heap.Interface and holds qitems. +type priorityQueue struct { + items []*qitem + n int +} + +func (q *priorityQueue) Len() int { + return q.n +} + +func (q *priorityQueue) Less(i, j int) bool { + // We want Pop to give us the oldest item. + return q.items[i].lat.Before(q.items[j].lat) +} + +func (q priorityQueue) Swap(i, j int) { + q.items[i], q.items[j] = q.items[j], q.items[i] + q.items[i].index = i + q.items[j].index = j +} + +func (q *priorityQueue) Push(x any) { + if q.n == cap(q.items) { + // Resize the queue + old := q.items + q.items = make([]*qitem, 2*cap(old)) // Initial capacity needs to be > 0. + copy(q.items, old) + } + item := x.(*qitem) + item.index = q.n + q.items[item.index] = item + q.n++ +} + +func (q *priorityQueue) Pop() any { + item := q.items[q.n-1] + q.items[q.n-1] = nil // avoid memory leak + item.index = -1 // for safety + q.n-- + return item +} + +// bumps item to the head of the queue. +func (q *priorityQueue) Bump(item *qitem) { + // Sanity check, necessary because of possible racing between Bump and GC: + if item.index < 0 || item.index >= q.n || q.items[item.index].key != item.key { + return + } + item.lat = time.Now() + heap.Fix(q, item.index) +} + +const maxConcurrentRequests = 1000 + +// DiskCache is a local disk LRU CAS and Action Cache cache. +type DiskCache struct { + root string // path to the root directory of the disk cache. + maxCapacityBytes uint64 // if disk size exceeds this, old items will be evicted as needed. + mu sync.Mutex // protects the queue. + store sync.Map // map of keys to qitems. + queue *priorityQueue // keys by last accessed time. + sizeBytes int64 // total size. + ctx context.Context + shutdown chan bool + gcTick uint64 + gcReq chan uint64 + testGcTicks chan uint64 +} + +func New(ctx context.Context, root string, maxCapacityBytes uint64) *DiskCache { + res := &DiskCache{ + root: root, + maxCapacityBytes: maxCapacityBytes, + ctx: ctx, + queue: &priorityQueue{ + items: make([]*qitem, 1000), + }, + gcReq: make(chan uint64, maxConcurrentRequests), + shutdown: make(chan bool), + } + heap.Init(res.queue) + _ = os.MkdirAll(root, os.ModePerm) + // We use Git's directory/file naming structure as inspiration: + // https://git-scm.com/book/en/v2/Git-Internals-Git-Objects#:~:text=The%20subdirectory%20is%20named%20with%20the%20first%202%20characters%20of%20the%20SHA%2D1%2C%20and%20the%20filename%20is%20the%20remaining%2038%20characters. + for i := 0; i < 256; i++ { + _ = os.MkdirAll(filepath.Join(root, fmt.Sprintf("%02x", i)), os.ModePerm) + } + _ = filepath.WalkDir(root, func(path string, d fs.DirEntry, err error) error { + // We log and continue on all errors, because cache read errors are not critical. + if err != nil { + log.Errorf("Error reading cache directory: %v", err) + return nil + } + if d.IsDir() { + return nil + } + subdir := filepath.Base(filepath.Dir(path)) + k, err := res.getKeyFromFileName(subdir + d.Name()) + if err != nil { + log.Errorf("Error parsing cached file name %s: %v", path, err) + return nil + } + atime, err := GetLastAccessTime(path) + if err != nil { + log.Errorf("Error getting last accessed time of %s: %v", path, err) + return nil + } + it := &qitem{ + key: k, + lat: atime, + } + size, err := res.getItemSize(k) + if err != nil { + log.Errorf("Error getting file size of %s: %v", path, err) + return nil + } + res.store.Store(k, it) + atomic.AddInt64(&res.sizeBytes, size) + heap.Push(res.queue, it) + return nil + }) + go res.gc() + return res +} + +func (d *DiskCache) getItemSize(k key) (int64, error) { + return k.digest.Size, nil +} + +// Releases resources and terminates the GC daemon. Should be the last call to the DiskCache. +func (d *DiskCache) Shutdown() { + d.shutdown <- true +} + +func (d *DiskCache) TotalSizeBytes() uint64 { + return uint64(atomic.LoadInt64(&d.sizeBytes)) +} + +func (d *DiskCache) getKeyFromFileName(fname string) (key, error) { + pair := strings.Split(fname, ".") + if len(pair) != 2 { + return key{}, fmt.Errorf("expected file name in the form [ac_]hash/size, got %s", fname) + } + size, err := strconv.ParseInt(pair[1], 10, 64) + if err != nil { + return key{}, fmt.Errorf("invalid size in digest %s: %s", fname, err) + } + dg, err := digest.New(pair[0], size) + if err != nil { + return key{}, fmt.Errorf("invalid digest from file name %s: %v", fname, err) + } + return key{digest: dg}, nil +} + +func (d *DiskCache) getPath(k key) string { + return filepath.Join(d.root, k.digest.Hash[:2], fmt.Sprintf("%s.%d", k.digest.Hash[2:], k.digest.Size)) +} + +func (d *DiskCache) StoreCas(dg digest.Digest, path string) error { + if dg.Size > int64(d.maxCapacityBytes) { + return fmt.Errorf("blob size %d exceeds DiskCache capacity %d", dg.Size, d.maxCapacityBytes) + } + it := &qitem{ + key: key{digest: dg}, + lat: time.Now(), + } + it.mu.Lock() + defer it.mu.Unlock() + _, exists := d.store.LoadOrStore(it.key, it) + if exists { + return nil + } + d.mu.Lock() + heap.Push(d.queue, it) + d.mu.Unlock() + if err := copyFile(path, d.getPath(it.key), dg.Size); err != nil { + return err + } + newSize := uint64(atomic.AddInt64(&d.sizeBytes, dg.Size)) + if newSize > d.maxCapacityBytes { + select { + case d.gcReq <- atomic.AddUint64(&d.gcTick, 1): + default: + } + } + return nil +} + +func (d *DiskCache) gc() { + for { + select { + case <-d.shutdown: + return + case <-d.ctx.Done(): + return + case t := <-d.gcReq: + // Evict old entries until total size is below cap. + for uint64(atomic.LoadInt64(&d.sizeBytes)) > d.maxCapacityBytes { + d.mu.Lock() + it := heap.Pop(d.queue).(*qitem) + d.mu.Unlock() + size, err := d.getItemSize(it.key) + if err != nil { + log.Errorf("error getting item size for %v: %v", it.key, err) + size = 0 + } + atomic.AddInt64(&d.sizeBytes, -size) + it.mu.Lock() + // We only delete the files, and not the prefix directories, because the prefixes are not worth worrying about. + if err := os.Remove(d.getPath(it.key)); err != nil { + log.Errorf("Error removing file: %v", err) + } + d.store.Delete(it.key) + it.mu.Unlock() + } + if d.testGcTicks != nil { + select { + case d.testGcTicks <- t: + default: + } + } + } + } +} + +// Copy file contents retaining the source permissions. +func copyFile(src, dst string, size int64) error { + srcInfo, err := os.Stat(src) + if err != nil { + return err + } + in, err := os.Open(src) + if err != nil { + return err + } + defer in.Close() + out, err := os.Create(dst) + if err != nil { + return err + } + if err := out.Chmod(srcInfo.Mode()); err != nil { + return err + } + defer out.Close() + _, err = io.Copy(out, in) + if err != nil { + return err + } + // Required sanity check: sometimes the copy pretends to succeed, but doesn't, if + // the file is being concurrently deleted. + dstInfo, err := os.Stat(dst) + if err != nil { + return err + } + if dstInfo.Size() != size { + return fmt.Errorf("copy of %s to %s failed: src/dst size mismatch: wanted %d, got %d", src, dst, size, dstInfo.Size()) + } + return nil +} + +// If the digest exists in the disk cache, copy the file contents to the given path. +func (d *DiskCache) LoadCas(dg digest.Digest, path string) bool { + k := key{digest: dg} + iUntyped, loaded := d.store.Load(k) + if !loaded { + return false + } + it := iUntyped.(*qitem) + it.mu.RLock() + if err := copyFile(d.getPath(k), path, dg.Size); err != nil { + // It is not possible to prevent a race with GC; hence, we return false on copy errors. + it.mu.RUnlock() + return false + } + it.mu.RUnlock() + + d.mu.Lock() + d.queue.Bump(it) + d.mu.Unlock() + return true +} diff --git a/go/pkg/diskcache/diskcache_test.go b/go/pkg/diskcache/diskcache_test.go new file mode 100644 index 00000000..5ac93c07 --- /dev/null +++ b/go/pkg/diskcache/diskcache_test.go @@ -0,0 +1,256 @@ +package diskcache + +import ( + "context" + "fmt" + "math/rand" + "os" + "path/filepath" + "sync/atomic" + "testing" + + "github.com/bazelbuild/remote-apis-sdks/go/pkg/digest" + "github.com/bazelbuild/remote-apis-sdks/go/pkg/testutil" + "github.com/pborman/uuid" + "golang.org/x/sync/errgroup" +) + +// Test utility only. Assumes all modifications are done, and at least one GC is expected. +func waitForGc(d *DiskCache) { + for t := range d.testGcTicks { + if t == d.gcTick { + return + } + } +} + +func TestStoreLoadCasPerm(t *testing.T) { + tests := []struct { + name string + executable bool + }{ + { + name: "+X", + executable: true, + }, + { + name: "-X", + executable: false, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + root := t.TempDir() + d := New(context.Background(), filepath.Join(root, "cache"), 20) + defer d.Shutdown() + fname, _ := testutil.CreateFile(t, tc.executable, "12345") + srcInfo, err := os.Stat(fname) + if err != nil { + t.Fatalf("os.Stat() failed: %v", err) + } + dg, err := digest.NewFromFile(fname) + if err != nil { + t.Fatalf("digest.NewFromFile failed: %v", err) + } + if err := d.StoreCas(dg, fname); err != nil { + t.Errorf("StoreCas(%s, %s) failed: %v", dg, fname, err) + } + newName := filepath.Join(root, "new") + if !d.LoadCas(dg, newName) { + t.Errorf("expected to load %s from the cache to %s", dg, newName) + } + fileInfo, err := os.Stat(newName) + if err != nil { + t.Fatalf("os.Stat(%s) failed: %v", newName, err) + } + if fileInfo.Mode() != srcInfo.Mode() { + t.Errorf("expected %s to have %v permissions, got: %v", newName, srcInfo.Mode(), fileInfo.Mode()) + } + contents, err := os.ReadFile(newName) + if err != nil { + t.Errorf("error reading from %s: %v", newName, err) + } + if string(contents) != "12345" { + t.Errorf("Cached result did not match: want %q, got %q", "12345", string(contents)) + } + }) + } +} + +func TestLoadCasNotFound(t *testing.T) { + root := t.TempDir() + d := New(context.Background(), filepath.Join(root, "cache"), 20) + defer d.Shutdown() + newName := filepath.Join(root, "new") + dg := digest.NewFromBlob([]byte("bla")) + if d.LoadCas(dg, newName) { + t.Errorf("expected to not load %s from the cache to %s", dg, newName) + } +} + +func TestGcOldestCas(t *testing.T) { + root := t.TempDir() + d := New(context.Background(), filepath.Join(root, "cache"), 20) + defer d.Shutdown() + d.testGcTicks = make(chan uint64, 1) + for i := 0; i < 5; i++ { + fname, _ := testutil.CreateFile(t, false, fmt.Sprintf("aaa %d", i)) + dg, err := digest.NewFromFile(fname) + if err != nil { + t.Fatalf("digest.NewFromFile failed: %v", err) + } + if err := d.StoreCas(dg, fname); err != nil { + t.Errorf("StoreCas(%s, %s) failed: %v", dg, fname, err) + } + } + waitForGc(d) + if d.TotalSizeBytes() != d.maxCapacityBytes { + t.Errorf("expected total size bytes to be %d, got %d", d.maxCapacityBytes, d.TotalSizeBytes()) + } + newName := filepath.Join(root, "new") + for i := 0; i < 5; i++ { + dg := digest.NewFromBlob([]byte(fmt.Sprintf("aaa %d", i))) + if d.LoadCas(dg, newName) != (i > 0) { + t.Errorf("expected loaded to be %v for %s from the cache to %s", i > 0, dg, newName) + } + } +} + +// We say that Last Access Time is behaving accurately on a system if reading from the file +// bumps the LAT time forward. From experience, Mac and Linux Debian are accurate. Ubuntu -- not. +// From experience, even when the LAT gets modified on access on Ubuntu, it can be imprecise to +// an order of seconds (!). +func isSystemLastAccessTimeAccurate(t *testing.T) bool { + t.Helper() + fname, _ := testutil.CreateFile(t, false, "foo") + lat, _ := GetLastAccessTime(fname) + if _, err := os.ReadFile(fname); err != nil { + t.Fatalf("%v", err) + } + newLat, _ := GetLastAccessTime(fname) + return lat.Before(newLat) +} + +func TestInitFromExistingCas(t *testing.T) { + if !isSystemLastAccessTimeAccurate(t) { + // This effectively skips the test on Ubuntu, because to make the test work there, + // we would need to inject too many / too long time.Sleep statements to beat the system's + // inaccuracy. + t.Logf("Skipping TestInitFromExisting, because system Last Access Time is unreliable.") + return + } + root := t.TempDir() + d := New(context.Background(), filepath.Join(root, "cache"), 20) + for i := 0; i < 4; i++ { + fname, _ := testutil.CreateFile(t, false, fmt.Sprintf("aaa %d", i)) + dg, err := digest.NewFromFile(fname) + if err != nil { + t.Fatalf("digest.NewFromFile failed: %v", err) + } + if err := d.StoreCas(dg, fname); err != nil { + t.Errorf("StoreCas(%s, %s) failed: %v", dg, fname, err) + } + } + newName := filepath.Join(root, "new") + dg := digest.NewFromBlob([]byte("aaa 0")) + if !d.LoadCas(dg, newName) { // Now 0 has been accessed, 1 is the oldest file. + t.Errorf("expected %s to be cached", dg) + } + d.Shutdown() + + // Re-initialize from existing files. + d = New(context.Background(), filepath.Join(root, "cache"), 20) + defer d.Shutdown() + d.testGcTicks = make(chan uint64, 1) + + // Check old files are cached: + dg = digest.NewFromBlob([]byte("aaa 1")) + if !d.LoadCas(dg, newName) { // Now 1 has been accessed, 2 is the oldest file. + t.Errorf("expected %s to be cached", dg) + } + fname, _ := testutil.CreateFile(t, false, "aaa 4") + dg, err := digest.NewFromFile(fname) + if err != nil { + t.Fatalf("digest.NewFromFile failed: %v", err) + } + if d.TotalSizeBytes() != d.maxCapacityBytes { + t.Errorf("expected total size bytes to be %d, got %d", d.maxCapacityBytes, d.TotalSizeBytes()) + } + // Trigger a GC by adding a new file. + if err := d.StoreCas(dg, fname); err != nil { + t.Errorf("StoreCas(%s, %s) failed: %v", dg, fname, err) + } + waitForGc(d) + dg = digest.NewFromBlob([]byte("aaa 2")) + if d.LoadCas(dg, newName) { + t.Errorf("expected to not load %s from the cache to %s", dg, newName) + } +} + +func TestThreadSafetyCas(t *testing.T) { + root := t.TempDir() + if err := os.MkdirAll(filepath.Join(root, "orig"), os.ModePerm); err != nil { + t.Fatalf("%v", err) + } + if err := os.MkdirAll(filepath.Join(root, "new"), os.ModePerm); err != nil { + t.Fatalf("%v", err) + } + nFiles := 10 + attempts := 5000 + // All blobs are size 5 exactly. We will have half the byte capacity we need. + d := New(context.Background(), filepath.Join(root, "cache"), uint64(nFiles*5)/2) + d.testGcTicks = make(chan uint64, attempts) + defer d.Shutdown() + var files []string + var dgs []digest.Digest + for i := 0; i < nFiles; i++ { + fname := filepath.Join(root, "orig", fmt.Sprintf("%d", i)) + if err := os.WriteFile(fname, []byte(fmt.Sprintf("aa %02d", i)), 0644); err != nil { + t.Fatalf("os.WriteFile: %v", err) + } + files = append(files, fname) + dg, err := digest.NewFromFile(fname) + if err != nil { + t.Fatalf("digest.NewFromFile failed: %v", err) + } + dgs = append(dgs, dg) + if err := d.StoreCas(dg, fname); err != nil { + t.Errorf("StoreCas(%s, %s) failed: %v", dg, fname, err) + } + } + // Randomly access and store files from different threads. + eg, _ := errgroup.WithContext(context.Background()) + var hits uint64 + var runs []int + for k := 0; k < attempts; k++ { + eg.Go(func() error { + i := rand.Intn(nFiles) + runs = append(runs, i) + newName := filepath.Join(root, "new", uuid.New()) + if d.LoadCas(dgs[i], newName) { + atomic.AddUint64(&hits, 1) + contents, err := os.ReadFile(newName) + if err != nil { + return fmt.Errorf("os.ReadFile: %v", err) + } + want := fmt.Sprintf("aa %02d", i) + if string(contents) != want { + return fmt.Errorf("Cached result did not match: want %q, got %q for digest %v", want, string(contents), dgs[i]) + } + } else if err := d.StoreCas(dgs[i], files[i]); err != nil { + return fmt.Errorf("StoreCas: %v", err) + } + return nil + }) + } + if err := eg.Wait(); err != nil { + t.Error(err) + } + waitForGc(d) + if d.TotalSizeBytes() != d.maxCapacityBytes { + t.Errorf("expected total size bytes to be %d, got %d", d.maxCapacityBytes, d.TotalSizeBytes()) + } + if int(hits) < attempts/2 { + t.Errorf("Unexpectedly low cache hits %d out of %d attempts", hits, attempts) + } +} From 21666d48ffd0d938b5bd067469861bd25dcd269a Mon Sep 17 00:00:00 2001 From: Ola Rozenfeld Date: Thu, 1 Feb 2024 16:51:30 -0500 Subject: [PATCH 2/3] Adressing comments --- go/pkg/diskcache/BUILD.bazel | 6 +- go/pkg/diskcache/diskcache.go | 79 +++++++++++-------- .../{atim_darwin.go => sys_darwin.go} | 2 +- .../diskcache/{atim_linux.go => sys_linux.go} | 2 +- .../{atim_windows.go => sys_windows.go} | 2 +- 5 files changed, 50 insertions(+), 41 deletions(-) rename go/pkg/diskcache/{atim_darwin.go => sys_darwin.go} (81%) rename go/pkg/diskcache/{atim_linux.go => sys_linux.go} (80%) rename go/pkg/diskcache/{atim_windows.go => sys_windows.go} (88%) diff --git a/go/pkg/diskcache/BUILD.bazel b/go/pkg/diskcache/BUILD.bazel index e21bc258..f0a365b8 100644 --- a/go/pkg/diskcache/BUILD.bazel +++ b/go/pkg/diskcache/BUILD.bazel @@ -3,10 +3,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "diskcache", srcs = [ - "atim_darwin.go", - "atim_linux.go", - "atim_windows.go", "diskcache.go", + "sys_darwin.go", + "sys_linux.go", + "sys_windows.go", ], importpath = "github.com/bazelbuild/remote-apis-sdks/go/pkg/diskcache", visibility = ["//visibility:public"], diff --git a/go/pkg/diskcache/diskcache.go b/go/pkg/diskcache/diskcache.go index d6d5bb4f..196865f8 100644 --- a/go/pkg/diskcache/diskcache.go +++ b/go/pkg/diskcache/diskcache.go @@ -115,43 +115,52 @@ func New(ctx context.Context, root string, maxCapacityBytes uint64) *DiskCache { _ = os.MkdirAll(root, os.ModePerm) // We use Git's directory/file naming structure as inspiration: // https://git-scm.com/book/en/v2/Git-Internals-Git-Objects#:~:text=The%20subdirectory%20is%20named%20with%20the%20first%202%20characters%20of%20the%20SHA%2D1%2C%20and%20the%20filename%20is%20the%20remaining%2038%20characters. + var wg sync.WaitGroup + wg.Add(256) for i := 0; i < 256; i++ { - _ = os.MkdirAll(filepath.Join(root, fmt.Sprintf("%02x", i)), os.ModePerm) + prefixDir := filepath.Join(root, fmt.Sprintf("%02x", i)) + go func() { + defer wg.Done() + _ = os.MkdirAll(prefixDir, os.ModePerm) + _ = filepath.WalkDir(prefixDir, func(path string, d fs.DirEntry, err error) error { + // We log and continue on all errors, because cache read errors are not critical. + if err != nil { + log.Errorf("Error reading cache directory: %v", err) + return nil + } + if d.IsDir() { + return nil + } + subdir := filepath.Base(filepath.Dir(path)) + k, err := res.getKeyFromFileName(subdir + d.Name()) + if err != nil { + log.Errorf("Error parsing cached file name %s: %v", path, err) + return nil + } + atime, err := GetLastAccessTime(path) + if err != nil { + log.Errorf("Error getting last accessed time of %s: %v", path, err) + return nil + } + it := &qitem{ + key: k, + lat: atime, + } + size, err := res.getItemSize(k) + if err != nil { + log.Errorf("Error getting file size of %s: %v", path, err) + return nil + } + res.store.Store(k, it) + atomic.AddInt64(&res.sizeBytes, size) + res.mu.Lock() + heap.Push(res.queue, it) + res.mu.Unlock() + return nil + }) + }() } - _ = filepath.WalkDir(root, func(path string, d fs.DirEntry, err error) error { - // We log and continue on all errors, because cache read errors are not critical. - if err != nil { - log.Errorf("Error reading cache directory: %v", err) - return nil - } - if d.IsDir() { - return nil - } - subdir := filepath.Base(filepath.Dir(path)) - k, err := res.getKeyFromFileName(subdir + d.Name()) - if err != nil { - log.Errorf("Error parsing cached file name %s: %v", path, err) - return nil - } - atime, err := GetLastAccessTime(path) - if err != nil { - log.Errorf("Error getting last accessed time of %s: %v", path, err) - return nil - } - it := &qitem{ - key: k, - lat: atime, - } - size, err := res.getItemSize(k) - if err != nil { - log.Errorf("Error getting file size of %s: %v", path, err) - return nil - } - res.store.Store(k, it) - atomic.AddInt64(&res.sizeBytes, size) - heap.Push(res.queue, it) - return nil - }) + wg.Wait() go res.gc() return res } diff --git a/go/pkg/diskcache/atim_darwin.go b/go/pkg/diskcache/sys_darwin.go similarity index 81% rename from go/pkg/diskcache/atim_darwin.go rename to go/pkg/diskcache/sys_darwin.go index 82509e75..076416af 100644 --- a/go/pkg/diskcache/atim_darwin.go +++ b/go/pkg/diskcache/sys_darwin.go @@ -1,4 +1,4 @@ -// Utility to get the last accessed time on Darwin. +// System utilities that differ between OS implementations. package diskcache import ( diff --git a/go/pkg/diskcache/atim_linux.go b/go/pkg/diskcache/sys_linux.go similarity index 80% rename from go/pkg/diskcache/atim_linux.go rename to go/pkg/diskcache/sys_linux.go index c34df512..1c79836c 100644 --- a/go/pkg/diskcache/atim_linux.go +++ b/go/pkg/diskcache/sys_linux.go @@ -1,4 +1,4 @@ -// Utility to get the last accessed time on Linux. +// System utilities that differ between OS implementations. package diskcache import ( diff --git a/go/pkg/diskcache/atim_windows.go b/go/pkg/diskcache/sys_windows.go similarity index 88% rename from go/pkg/diskcache/atim_windows.go rename to go/pkg/diskcache/sys_windows.go index c633346a..92f35d4a 100644 --- a/go/pkg/diskcache/atim_windows.go +++ b/go/pkg/diskcache/sys_windows.go @@ -1,4 +1,4 @@ -// Utility to get the last accessed time on Windows. +// System utilities that differ between OS implementations. package diskcache import ( From df6b425b7ac1122b36ffc7fb564116f624a56f5b Mon Sep 17 00:00:00 2001 From: Ola Rozenfeld Date: Sun, 11 Feb 2024 12:12:02 -0500 Subject: [PATCH 3/3] Addressing Marc Antoine's comments --- go/pkg/diskcache/BUILD.bazel | 1 + go/pkg/diskcache/diskcache.go | 72 +++++++++++++++++------------- go/pkg/diskcache/diskcache_test.go | 36 +++++++++++---- go/pkg/diskcache/sys_darwin.go | 10 ++--- go/pkg/diskcache/sys_linux.go | 10 ++--- go/pkg/diskcache/sys_windows.go | 10 ++--- 6 files changed, 77 insertions(+), 62 deletions(-) diff --git a/go/pkg/diskcache/BUILD.bazel b/go/pkg/diskcache/BUILD.bazel index f0a365b8..84832ba9 100644 --- a/go/pkg/diskcache/BUILD.bazel +++ b/go/pkg/diskcache/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto", "@com_github_golang_glog//:go_default_library", "@org_golang_google_protobuf//proto:go_default_library", + "@org_golang_x_sync//errgroup:go_default_library", ], ) diff --git a/go/pkg/diskcache/diskcache.go b/go/pkg/diskcache/diskcache.go index 196865f8..23e3ea19 100644 --- a/go/pkg/diskcache/diskcache.go +++ b/go/pkg/diskcache/diskcache.go @@ -16,6 +16,8 @@ import ( "time" "github.com/bazelbuild/remote-apis-sdks/go/pkg/digest" + "golang.org/x/sync/errgroup" + log "github.com/golang/glog" ) @@ -100,7 +102,7 @@ type DiskCache struct { testGcTicks chan uint64 } -func New(ctx context.Context, root string, maxCapacityBytes uint64) *DiskCache { +func New(ctx context.Context, root string, maxCapacityBytes uint64) (*DiskCache, error) { res := &DiskCache{ root: root, maxCapacityBytes: maxCapacityBytes, @@ -112,21 +114,25 @@ func New(ctx context.Context, root string, maxCapacityBytes uint64) *DiskCache { shutdown: make(chan bool), } heap.Init(res.queue) - _ = os.MkdirAll(root, os.ModePerm) + if err := os.MkdirAll(root, os.ModePerm); err != nil { + return nil, err + } // We use Git's directory/file naming structure as inspiration: // https://git-scm.com/book/en/v2/Git-Internals-Git-Objects#:~:text=The%20subdirectory%20is%20named%20with%20the%20first%202%20characters%20of%20the%20SHA%2D1%2C%20and%20the%20filename%20is%20the%20remaining%2038%20characters. - var wg sync.WaitGroup - wg.Add(256) + eg, eCtx := errgroup.WithContext(ctx) for i := 0; i < 256; i++ { prefixDir := filepath.Join(root, fmt.Sprintf("%02x", i)) - go func() { - defer wg.Done() - _ = os.MkdirAll(prefixDir, os.ModePerm) - _ = filepath.WalkDir(prefixDir, func(path string, d fs.DirEntry, err error) error { + eg.Go(func() error { + if eCtx.Err() != nil { + return eCtx.Err() + } + if err := os.MkdirAll(prefixDir, os.ModePerm); err != nil { + return err + } + return filepath.WalkDir(prefixDir, func(path string, d fs.DirEntry, err error) error { // We log and continue on all errors, because cache read errors are not critical. if err != nil { - log.Errorf("Error reading cache directory: %v", err) - return nil + return fmt.Errorf("error reading cache directory: %v", err) } if d.IsDir() { return nil @@ -134,13 +140,11 @@ func New(ctx context.Context, root string, maxCapacityBytes uint64) *DiskCache { subdir := filepath.Base(filepath.Dir(path)) k, err := res.getKeyFromFileName(subdir + d.Name()) if err != nil { - log.Errorf("Error parsing cached file name %s: %v", path, err) - return nil + return fmt.Errorf("error parsing cached file name %s: %v", path, err) } - atime, err := GetLastAccessTime(path) + atime, err := getLastAccessTime(path) if err != nil { - log.Errorf("Error getting last accessed time of %s: %v", path, err) - return nil + return fmt.Errorf("error getting last accessed time of %s: %v", path, err) } it := &qitem{ key: k, @@ -148,8 +152,7 @@ func New(ctx context.Context, root string, maxCapacityBytes uint64) *DiskCache { } size, err := res.getItemSize(k) if err != nil { - log.Errorf("Error getting file size of %s: %v", path, err) - return nil + return fmt.Errorf("error getting file size of %s: %v", path, err) } res.store.Store(k, it) atomic.AddInt64(&res.sizeBytes, size) @@ -158,11 +161,13 @@ func New(ctx context.Context, root string, maxCapacityBytes uint64) *DiskCache { res.mu.Unlock() return nil }) - }() + }) + } + if err := eg.Wait(); err != nil { + return nil, err } - wg.Wait() go res.gc() - return res + return res, nil } func (d *DiskCache) getItemSize(k key) (int64, error) { @@ -284,18 +289,13 @@ func copyFile(src, dst string, size int64) error { return err } defer out.Close() - _, err = io.Copy(out, in) + n, err := io.Copy(out, in) if err != nil { return err } - // Required sanity check: sometimes the copy pretends to succeed, but doesn't, if - // the file is being concurrently deleted. - dstInfo, err := os.Stat(dst) - if err != nil { - return err - } - if dstInfo.Size() != size { - return fmt.Errorf("copy of %s to %s failed: src/dst size mismatch: wanted %d, got %d", src, dst, size, dstInfo.Size()) + // Required sanity check: if the file is being concurrently deleted, we may not always copy everything. + if n != size { + return fmt.Errorf("copy of %s to %s failed: src/dst size mismatch: wanted %d, got %d", src, dst, size, n) } return nil } @@ -309,15 +309,23 @@ func (d *DiskCache) LoadCas(dg digest.Digest, path string) bool { } it := iUntyped.(*qitem) it.mu.RLock() - if err := copyFile(d.getPath(k), path, dg.Size); err != nil { + err := copyFile(d.getPath(k), path, dg.Size) + it.mu.RUnlock() + if err != nil { // It is not possible to prevent a race with GC; hence, we return false on copy errors. - it.mu.RUnlock() return false } - it.mu.RUnlock() d.mu.Lock() d.queue.Bump(it) d.mu.Unlock() return true } + +func getLastAccessTime(path string) (time.Time, error) { + info, err := os.Stat(path) + if err != nil { + return time.Time{}, err + } + return FileInfoToAccessTime(info), nil +} diff --git a/go/pkg/diskcache/diskcache_test.go b/go/pkg/diskcache/diskcache_test.go index 5ac93c07..3a704488 100644 --- a/go/pkg/diskcache/diskcache_test.go +++ b/go/pkg/diskcache/diskcache_test.go @@ -41,7 +41,10 @@ func TestStoreLoadCasPerm(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { root := t.TempDir() - d := New(context.Background(), filepath.Join(root, "cache"), 20) + d, err := New(context.Background(), filepath.Join(root, "cache"), 20) + if err != nil { + t.Errorf("New: %v", err) + } defer d.Shutdown() fname, _ := testutil.CreateFile(t, tc.executable, "12345") srcInfo, err := os.Stat(fname) @@ -79,7 +82,10 @@ func TestStoreLoadCasPerm(t *testing.T) { func TestLoadCasNotFound(t *testing.T) { root := t.TempDir() - d := New(context.Background(), filepath.Join(root, "cache"), 20) + d, err := New(context.Background(), filepath.Join(root, "cache"), 20) + if err != nil { + t.Errorf("New: %v", err) + } defer d.Shutdown() newName := filepath.Join(root, "new") dg := digest.NewFromBlob([]byte("bla")) @@ -90,7 +96,10 @@ func TestLoadCasNotFound(t *testing.T) { func TestGcOldestCas(t *testing.T) { root := t.TempDir() - d := New(context.Background(), filepath.Join(root, "cache"), 20) + d, err := New(context.Background(), filepath.Join(root, "cache"), 20) + if err != nil { + t.Errorf("New: %v", err) + } defer d.Shutdown() d.testGcTicks = make(chan uint64, 1) for i := 0; i < 5; i++ { @@ -123,11 +132,11 @@ func TestGcOldestCas(t *testing.T) { func isSystemLastAccessTimeAccurate(t *testing.T) bool { t.Helper() fname, _ := testutil.CreateFile(t, false, "foo") - lat, _ := GetLastAccessTime(fname) + lat, _ := getLastAccessTime(fname) if _, err := os.ReadFile(fname); err != nil { t.Fatalf("%v", err) } - newLat, _ := GetLastAccessTime(fname) + newLat, _ := getLastAccessTime(fname) return lat.Before(newLat) } @@ -140,7 +149,10 @@ func TestInitFromExistingCas(t *testing.T) { return } root := t.TempDir() - d := New(context.Background(), filepath.Join(root, "cache"), 20) + d, err := New(context.Background(), filepath.Join(root, "cache"), 20) + if err != nil { + t.Errorf("New: %v", err) + } for i := 0; i < 4; i++ { fname, _ := testutil.CreateFile(t, false, fmt.Sprintf("aaa %d", i)) dg, err := digest.NewFromFile(fname) @@ -159,7 +171,10 @@ func TestInitFromExistingCas(t *testing.T) { d.Shutdown() // Re-initialize from existing files. - d = New(context.Background(), filepath.Join(root, "cache"), 20) + d, err = New(context.Background(), filepath.Join(root, "cache"), 20) + if err != nil { + t.Errorf("New: %v", err) + } defer d.Shutdown() d.testGcTicks = make(chan uint64, 1) @@ -169,7 +184,7 @@ func TestInitFromExistingCas(t *testing.T) { t.Errorf("expected %s to be cached", dg) } fname, _ := testutil.CreateFile(t, false, "aaa 4") - dg, err := digest.NewFromFile(fname) + dg, err = digest.NewFromFile(fname) if err != nil { t.Fatalf("digest.NewFromFile failed: %v", err) } @@ -198,7 +213,10 @@ func TestThreadSafetyCas(t *testing.T) { nFiles := 10 attempts := 5000 // All blobs are size 5 exactly. We will have half the byte capacity we need. - d := New(context.Background(), filepath.Join(root, "cache"), uint64(nFiles*5)/2) + d, err := New(context.Background(), filepath.Join(root, "cache"), uint64(nFiles*5)/2) + if err != nil { + t.Errorf("New: %v", err) + } d.testGcTicks = make(chan uint64, attempts) defer d.Shutdown() var files []string diff --git a/go/pkg/diskcache/sys_darwin.go b/go/pkg/diskcache/sys_darwin.go index 076416af..1d51d7cd 100644 --- a/go/pkg/diskcache/sys_darwin.go +++ b/go/pkg/diskcache/sys_darwin.go @@ -2,15 +2,11 @@ package diskcache import ( - "os" + "io/fs" "syscall" "time" ) -func GetLastAccessTime(path string) (time.Time, error) { - info, err := os.Stat(path) - if err != nil { - return time.Time{}, err - } - return time.Unix(info.Sys().(*syscall.Stat_t).Atimespec.Unix()), nil +func FileInfoToAccessTime(info fs.FileInfo) time.Time { + return time.Unix(info.Sys().(*syscall.Stat_t).Atimespec.Unix()) } diff --git a/go/pkg/diskcache/sys_linux.go b/go/pkg/diskcache/sys_linux.go index 1c79836c..7414d4b5 100644 --- a/go/pkg/diskcache/sys_linux.go +++ b/go/pkg/diskcache/sys_linux.go @@ -2,15 +2,11 @@ package diskcache import ( - "os" + "io/fs" "syscall" "time" ) -func GetLastAccessTime(path string) (time.Time, error) { - info, err := os.Stat(path) - if err != nil { - return time.Time{}, err - } - return time.Unix(info.Sys().(*syscall.Stat_t).Atim.Unix()), nil +func FileInfoToAccessTime(info fs.FileInfo) time.Time { + return time.Unix(info.Sys().(*syscall.Stat_t).Atim.Unix()) } diff --git a/go/pkg/diskcache/sys_windows.go b/go/pkg/diskcache/sys_windows.go index 92f35d4a..319a7988 100644 --- a/go/pkg/diskcache/sys_windows.go +++ b/go/pkg/diskcache/sys_windows.go @@ -2,17 +2,13 @@ package diskcache import ( - "os" + "io/fs" "syscall" "time" ) // This will return correct values only if `fsutil behavior set disablelastaccess 0` is set. // Tracking of last access time is disabled by default on Windows. -func GetLastAccessTime(path string) (time.Time, error) { - info, err := os.Stat(path) - if err != nil { - return time.Time{}, err - } - return time.Unix(0, info.Sys().(*syscall.Win32FileAttributeData).LastAccessTime.Nanoseconds()), nil +func FileInfoToAccessTime(info fs.FileInfo) time.Time { + return time.Unix(0, info.Sys().(*syscall.Win32FileAttributeData).LastAccessTime.Nanoseconds()) }