-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
localstorage_cached.go
135 lines (109 loc) · 2.93 KB
/
localstorage_cached.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package paths
import (
"os"
"sync"
"time"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
var StatTimeout = 5 * time.Second
var MaxDiskUsageDuration = time.Second
type cachedLocalStorage struct {
base LocalStorage
statLk sync.Mutex
stats *lru.Cache[string, statEntry]
pathDUs *lru.Cache[string, *diskUsageEntry]
}
func newCachedLocalStorage(ls LocalStorage) *cachedLocalStorage {
statCache, _ := lru.New[string, statEntry](1024)
duCache, _ := lru.New[string, *diskUsageEntry](1024)
return &cachedLocalStorage{
base: ls,
stats: statCache,
pathDUs: duCache,
}
}
type statEntry struct {
stat fsutil.FsStat
time time.Time
}
type diskUsageEntry struct {
last diskUsageResult
usagePromise <-chan diskUsageResult
}
type diskUsageResult struct {
usage int64
time time.Time
}
func (c *cachedLocalStorage) GetStorage() (storiface.StorageConfig, error) {
return c.base.GetStorage()
}
func (c *cachedLocalStorage) SetStorage(f func(*storiface.StorageConfig)) error {
return c.base.SetStorage(f)
}
func (c *cachedLocalStorage) Stat(path string) (fsutil.FsStat, error) {
c.statLk.Lock()
defer c.statLk.Unlock()
if v, ok := c.stats.Get(path); ok && time.Now().Sub(v.time) < StatTimeout {
return v.stat, nil
}
// if we don't, get the stat
st, err := c.base.Stat(path)
if err == nil {
c.stats.Add(path, statEntry{
stat: st,
time: time.Now(),
})
}
return st, err
}
func (c *cachedLocalStorage) DiskUsage(path string) (int64, error) {
c.statLk.Lock()
defer c.statLk.Unlock()
var entry *diskUsageEntry
if v, ok := c.pathDUs.Get(path); ok {
entry = v
// if we have recent cached entry, use that
if time.Now().Sub(entry.last.time) < StatTimeout {
return entry.last.usage, nil
}
} else {
entry = new(diskUsageEntry)
c.pathDUs.Add(path, entry)
}
// start a new disk usage request; this can take a while so start a
// goroutine, and if it doesn't return quickly, return either the
// previous value, or zero - that's better than potentially blocking
// here for a long time.
if entry.usagePromise == nil {
resCh := make(chan diskUsageResult, 1)
go func() {
du, err := c.base.DiskUsage(path)
if err != nil {
if !os.IsNotExist(err) {
log.Errorw("error getting disk usage", "path", path, "error", err)
}
}
resCh <- diskUsageResult{
usage: du,
time: time.Now(),
}
}()
entry.usagePromise = resCh
}
// wait for the disk usage result; if it doesn't come, fallback on
// previous usage
select {
case du := <-entry.usagePromise:
entry.usagePromise = nil
entry.last = du
case <-time.After(MaxDiskUsageDuration):
log.Warnw("getting usage is slow, falling back to previous usage",
"path", path,
"fallback", entry.last.usage,
"age", time.Now().Sub(entry.last.time))
}
return entry.last.usage, nil
}
var _ LocalStorage = &cachedLocalStorage{}