-
Notifications
You must be signed in to change notification settings - Fork 114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve file read performance #105
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,9 +17,9 @@ | |
package cache | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
"os" | ||
"path/filepath" | ||
"sync" | ||
|
@@ -28,117 +28,211 @@ import ( | |
"github.com/pkg/errors" | ||
) | ||
|
||
const ( | ||
defaultMaxLRUCacheEntry = 10 | ||
defaultMaxCacheFds = 10 | ||
) | ||
|
||
type DirectoryCacheConfig struct { | ||
MaxLRUCacheEntry int `toml:"max_lru_cache_entry"` | ||
MaxCacheFds int `toml:"max_cache_fds"` | ||
SyncAdd bool `toml:"sync_add"` | ||
} | ||
|
||
// TODO: contents validation. | ||
|
||
type BlobCache interface { | ||
Fetch(blobHash string) ([]byte, error) | ||
Add(blobHash string, p []byte) | ||
Add(key string, p []byte, opts ...Option) | ||
FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) | ||
} | ||
|
||
type dirOpt struct { | ||
syncAdd bool | ||
type cacheOpt struct { | ||
direct bool | ||
} | ||
|
||
type DirOption func(o *dirOpt) *dirOpt | ||
type Option func(o *cacheOpt) *cacheOpt | ||
|
||
func SyncAdd() DirOption { | ||
return func(o *dirOpt) *dirOpt { | ||
o.syncAdd = true | ||
// When Direct option is specified for FetchAt and Add methods, these operation | ||
// won't use on-memory caches. When you know that the targeting value won't be | ||
// used immediately, you can prevent the limited space of on-memory caches from | ||
// being polluted by these unimportant values. | ||
func Direct() Option { | ||
return func(o *cacheOpt) *cacheOpt { | ||
o.direct = true | ||
return o | ||
} | ||
} | ||
|
||
func NewDirectoryCache(directory string, memCacheSize int, opts ...DirOption) (BlobCache, error) { | ||
opt := &dirOpt{} | ||
for _, o := range opts { | ||
opt = o(opt) | ||
func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache, error) { | ||
maxEntry := config.MaxLRUCacheEntry | ||
if maxEntry == 0 { | ||
maxEntry = defaultMaxLRUCacheEntry | ||
} | ||
maxFds := config.MaxCacheFds | ||
if maxFds == 0 { | ||
maxFds = defaultMaxCacheFds | ||
} | ||
if err := os.MkdirAll(directory, os.ModePerm); err != nil { | ||
return nil, err | ||
} | ||
dc := &directoryCache{ | ||
cache: lru.New(memCacheSize), | ||
cache: newObjectCache(maxEntry), | ||
fileCache: newObjectCache(maxFds), | ||
directory: directory, | ||
bufPool: sync.Pool{ | ||
New: func() interface{} { | ||
return new(bytes.Buffer) | ||
}, | ||
}, | ||
} | ||
if opt.syncAdd { | ||
dc.syncAdd = true | ||
dc.cache.finalize = func(value interface{}) { | ||
dc.bufPool.Put(value) | ||
} | ||
dc.fileCache.finalize = func(value interface{}) { | ||
value.(*os.File).Close() | ||
} | ||
dc.syncAdd = config.SyncAdd | ||
return dc, nil | ||
} | ||
|
||
// directoryCache is a cache implementation which backend is a directory. | ||
type directoryCache struct { | ||
cache *lru.Cache | ||
cacheMu sync.Mutex | ||
cache *objectCache | ||
fileCache *objectCache | ||
directory string | ||
syncAdd bool | ||
fileMu sync.Mutex | ||
|
||
bufPool sync.Pool | ||
|
||
syncAdd bool | ||
} | ||
|
||
func (dc *directoryCache) Fetch(blobHash string) (p []byte, err error) { | ||
dc.cacheMu.Lock() | ||
defer dc.cacheMu.Unlock() | ||
func (dc *directoryCache) FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) { | ||
opt := &cacheOpt{} | ||
for _, o := range opts { | ||
opt = o(opt) | ||
} | ||
|
||
if cache, ok := dc.cache.Get(blobHash); ok { | ||
p, ok := cache.([]byte) | ||
if ok { | ||
return p, nil | ||
if !opt.direct { | ||
// Get data from memory | ||
if b, done, ok := dc.cache.get(key); ok { | ||
defer done() | ||
data := b.(*bytes.Buffer).Bytes() | ||
if int64(len(data)) < offset { | ||
return 0, fmt.Errorf("invalid offset %d exceeds chunk size %d", | ||
offset, len(data)) | ||
} | ||
return copy(p, data[offset:]), nil | ||
} | ||
} | ||
|
||
c := filepath.Join(dc.directory, blobHash[:2], blobHash) | ||
if _, err := os.Stat(c); err != nil { | ||
return nil, errors.Wrapf(err, "Missed cache %q", c) | ||
// Get data from disk. If the file is already opened, use it. | ||
if f, done, ok := dc.fileCache.get(key); ok { | ||
defer done() | ||
return f.(*os.File).ReadAt(p, offset) | ||
} | ||
} | ||
|
||
file, err := os.Open(c) | ||
// Open the cache file and read the target region | ||
// TODO: If the target cache is write-in-progress, should we wait for the completion | ||
// or simply report the cache miss? | ||
file, err := os.Open(dc.cachePath(key)) | ||
if err != nil { | ||
return nil, errors.Wrapf(err, "Failed to Open cached blob file %q", c) | ||
return 0, errors.Wrapf(err, "failed to open blob file for %q", key) | ||
} | ||
if n, err = file.ReadAt(p, offset); err == io.EOF { | ||
err = nil | ||
} | ||
defer file.Close() | ||
|
||
if p, err = ioutil.ReadAll(file); err != nil && err != io.EOF { | ||
return nil, errors.Wrapf(err, "failed to read cached data %q", c) | ||
// Cache the opened file for future use. If "direct" option is specified, this | ||
// won't be done. This option is useful for preventing file cache from being | ||
// polluted by data that won't be accessed immediately. | ||
if opt.direct || !dc.fileCache.add(key, file) { | ||
file.Close() | ||
} | ||
dc.cache.Add(blobHash, p) | ||
|
||
return | ||
// TODO: should we cache the entire file data on memory? | ||
// but making I/O (possibly huge) on every fetching | ||
// might be costly. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should cache every thing to disk, and use a memory (with a limited size) as the cache of disk (cache is supposed to be layer by layer, just like CPU L1-L2-L3 cache -> memory -> disk -> remote server). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem here is that copying the entire chunk file from disk to memory on every access can be very costly. (NOTE: The client of cache doesn't always query the entire chunk file. From this perspective, the new Either way, I think we can keep this out of scope for this PR and tackle in another PR (contributions are welcome 😄). |
||
|
||
return n, err | ||
} | ||
|
||
func (dc *directoryCache) Add(blobHash string, p []byte) { | ||
// Copy the original data for avoiding the cached contents to be edited accidentally | ||
p2 := make([]byte, len(p)) | ||
copy(p2, p) | ||
p = p2 | ||
func (dc *directoryCache) Add(key string, p []byte, opts ...Option) { | ||
opt := &cacheOpt{} | ||
for _, o := range opts { | ||
opt = o(opt) | ||
} | ||
|
||
dc.cacheMu.Lock() | ||
dc.cache.Add(blobHash, p) | ||
dc.cacheMu.Unlock() | ||
if !opt.direct { | ||
// Cache the passed data on memory. This enables to serve this data even | ||
// during writing it to the disk. If "direct" option is specified, this | ||
// won't be done. This option is useful for preventing memory cache from being | ||
// polluted by data that won't be accessed immediately. | ||
b := dc.bufPool.Get().(*bytes.Buffer) | ||
b.Reset() | ||
b.Write(p) | ||
if !dc.cache.add(key, b) { | ||
dc.bufPool.Put(b) // Already exists. No need to cache. | ||
} | ||
} | ||
|
||
// Cache the passed data to disk. | ||
b2 := dc.bufPool.Get().(*bytes.Buffer) | ||
b2.Reset() | ||
b2.Write(p) | ||
addFunc := func() { | ||
dc.fileMu.Lock() | ||
defer dc.fileMu.Unlock() | ||
defer dc.bufPool.Put(b2) | ||
|
||
// Check if cache exists. | ||
c := filepath.Join(dc.directory, blobHash[:2], blobHash) | ||
var ( | ||
c = dc.cachePath(key) | ||
wip = dc.wipPath(key) | ||
) | ||
if _, err := os.Stat(wip); err == nil { | ||
return // Write in progress | ||
} | ||
if _, err := os.Stat(c); err == nil { | ||
return // Already exists. | ||
} | ||
|
||
// Write the contents to a temporary file | ||
if err := os.MkdirAll(filepath.Dir(wip), os.ModePerm); err != nil { | ||
fmt.Printf("Warning: Failed to Create blob cache directory %q: %v\n", c, err) | ||
return | ||
} | ||
wipfile, err := os.Create(wip) | ||
if err != nil { | ||
fmt.Printf("Warning: failed to prepare temp file for storing cache %q", key) | ||
return | ||
} | ||
defer func() { | ||
wipfile.Close() | ||
os.Remove(wipfile.Name()) | ||
}() | ||
want := b2.Len() | ||
if _, err := io.CopyN(wipfile, b2, int64(want)); err != nil { | ||
fmt.Printf("Warning: failed to write cache: %v\n", err) | ||
return | ||
} | ||
|
||
// Create cache file | ||
// Commit the cache contents | ||
if err := os.MkdirAll(filepath.Dir(c), os.ModePerm); err != nil { | ||
fmt.Printf("Warning: Failed to Create blob cache directory %q: %v\n", c, err) | ||
return | ||
} | ||
f, err := os.Create(c) | ||
if err := os.Rename(wipfile.Name(), c); err != nil { | ||
fmt.Printf("Warning: failed to commit cache to %q: %v\n", c, err) | ||
return | ||
} | ||
file, err := os.Open(c) | ||
if err != nil { | ||
fmt.Printf("Warning: could not create a cache file at %q: %v\n", c, err) | ||
fmt.Printf("Warning: failed to open cache on %q: %v\n", c, err) | ||
return | ||
} | ||
defer f.Close() | ||
if n, err := f.Write(p); err != nil || n != len(p) { | ||
fmt.Printf("Warning: failed to write cache: %d(wrote)/%d(expected): %v\n", | ||
n, len(p), err) | ||
|
||
// Cache the opened file for future use. If "direct" option is specified, this | ||
// won't be done. This option is useful for preventing file cache from being | ||
// polluted by data that won't be accessed immediately. | ||
if opt.direct || !dc.fileCache.add(key, file) { | ||
file.Close() | ||
} | ||
} | ||
|
||
|
@@ -149,6 +243,81 @@ func (dc *directoryCache) Add(blobHash string, p []byte) { | |
} | ||
} | ||
|
||
func (dc *directoryCache) cachePath(key string) string { | ||
return filepath.Join(dc.directory, key[:2], key) | ||
} | ||
|
||
func (dc *directoryCache) wipPath(key string) string { | ||
return filepath.Join(dc.directory, key[:2], "w", key) | ||
} | ||
|
||
func newObjectCache(maxEntries int) *objectCache { | ||
oc := &objectCache{ | ||
cache: lru.New(maxEntries), | ||
} | ||
oc.cache.OnEvicted = func(key lru.Key, value interface{}) { | ||
value.(*object).release() // Decrease ref count incremented in add operation. | ||
} | ||
return oc | ||
} | ||
|
||
type objectCache struct { | ||
cache *lru.Cache | ||
cacheMu sync.Mutex | ||
finalize func(interface{}) | ||
} | ||
|
||
func (oc *objectCache) get(key string) (value interface{}, done func(), ok bool) { | ||
oc.cacheMu.Lock() | ||
defer oc.cacheMu.Unlock() | ||
o, ok := oc.cache.Get(key) | ||
if !ok { | ||
return nil, nil, false | ||
} | ||
o.(*object).use() | ||
return o.(*object).v, func() { o.(*object).release() }, true | ||
} | ||
|
||
func (oc *objectCache) add(key string, value interface{}) bool { | ||
oc.cacheMu.Lock() | ||
defer oc.cacheMu.Unlock() | ||
if _, ok := oc.cache.Get(key); ok { | ||
return false // TODO: should we swap the object? | ||
} | ||
o := &object{ | ||
v: value, | ||
finalize: oc.finalize, | ||
} | ||
o.use() // Keep this object having at least 1 ref count (will be decreased on eviction) | ||
oc.cache.Add(key, o) | ||
return true | ||
} | ||
|
||
type object struct { | ||
v interface{} | ||
|
||
refCounts int64 | ||
finalize func(interface{}) | ||
|
||
mu sync.Mutex | ||
} | ||
|
||
func (o *object) use() { | ||
o.mu.Lock() | ||
defer o.mu.Unlock() | ||
o.refCounts++ | ||
} | ||
|
||
func (o *object) release() { | ||
o.mu.Lock() | ||
defer o.mu.Unlock() | ||
o.refCounts-- | ||
if o.refCounts <= 0 && o.finalize != nil { | ||
// nobody will refer this object | ||
o.finalize(o.v) | ||
} | ||
} | ||
|
||
func NewMemoryCache() BlobCache { | ||
return &memoryCache{ | ||
membuf: map[string]string{}, | ||
|
@@ -161,19 +330,19 @@ type memoryCache struct { | |
mu sync.Mutex | ||
} | ||
|
||
func (mc *memoryCache) Fetch(blobHash string) ([]byte, error) { | ||
func (mc *memoryCache) FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) { | ||
mc.mu.Lock() | ||
defer mc.mu.Unlock() | ||
|
||
cache, ok := mc.membuf[blobHash] | ||
cache, ok := mc.membuf[key] | ||
if !ok { | ||
return nil, fmt.Errorf("Missed cache: %q", blobHash) | ||
return 0, fmt.Errorf("Missed cache: %q", key) | ||
} | ||
return []byte(cache), nil | ||
return copy(p, cache[offset:]), nil | ||
} | ||
|
||
func (mc *memoryCache) Add(blobHash string, p []byte) { | ||
func (mc *memoryCache) Add(key string, p []byte, opts ...Option) { | ||
mc.mu.Lock() | ||
defer mc.mu.Unlock() | ||
mc.membuf[blobHash] = string(p) | ||
mc.membuf[key] = string(p) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Snapshotter as a basic infrastructure cannot tell the difference between images and users have less willings to tune these parameters up. We should let the image speak, just let the most-recently used data in memory is enough (LRU or Ringbuffer)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This option isn't for snapshotter users but for filesystem (so used only internally).
https://github.com/ktock/stargz-snapshotter/blob/33f7d6fed1279b668af21e15dbf1573451c88221/stargz/fs.go#L294
https://github.com/ktock/stargz-snapshotter/blob/33f7d6fed1279b668af21e15dbf1573451c88221/stargz/fs.go#L299
Filesystem fetches layer contents in background and stores all chunks of layer data to the cache. This is good from availabilities' perspective, but are these chunks accessed again immediately? Maybe not. Rather than them, ones accessed via FUSE reads are more likely accessed soon. In this point, we have priorities among cached contents. We have limited space on LRU memory cache so we don't want prioritized chunks (e.g. ones accessed via FUSE reads) to be evicted for storing less-important chunks (e.g. ones fetched in the background). So this patch enables the filesystem to tell that "this chunk I'm adding to the cache is less-important" using
Direct
option, which leads to the performance improvement as shown in the above.