Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve file read performance #105

Merged
merged 2 commits into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 232 additions & 63 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package cache

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sync"
Expand All @@ -28,117 +28,211 @@ import (
"github.com/pkg/errors"
)

const (
defaultMaxLRUCacheEntry = 10
defaultMaxCacheFds = 10
)

type DirectoryCacheConfig struct {
MaxLRUCacheEntry int `toml:"max_lru_cache_entry"`
MaxCacheFds int `toml:"max_cache_fds"`
SyncAdd bool `toml:"sync_add"`
}

// TODO: contents validation.

type BlobCache interface {
Fetch(blobHash string) ([]byte, error)
Add(blobHash string, p []byte)
Add(key string, p []byte, opts ...Option)
FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error)
}

type dirOpt struct {
syncAdd bool
type cacheOpt struct {
direct bool
}

type DirOption func(o *dirOpt) *dirOpt
type Option func(o *cacheOpt) *cacheOpt

func SyncAdd() DirOption {
return func(o *dirOpt) *dirOpt {
o.syncAdd = true
// When Direct option is specified for FetchAt and Add methods, these operation
// won't use on-memory caches. When you know that the targeting value won't be
// used immediately, you can prevent the limited space of on-memory caches from
// being polluted by these unimportant values.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Snapshotter as a basic infrastructure cannot tell the difference between images and users have less willings to tune these parameters up. We should let the image speak, just let the most-recently used data in memory is enough (LRU or Ringbuffer)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This option isn't for snapshotter users but for filesystem (so used only internally).

https://github.com/ktock/stargz-snapshotter/blob/33f7d6fed1279b668af21e15dbf1573451c88221/stargz/fs.go#L294
https://github.com/ktock/stargz-snapshotter/blob/33f7d6fed1279b668af21e15dbf1573451c88221/stargz/fs.go#L299

Filesystem fetches layer contents in background and stores all chunks of layer data to the cache. This is good from availabilities' perspective, but are these chunks accessed again immediately? Maybe not. Rather than them, ones accessed via FUSE reads are more likely accessed soon. In this point, we have priorities among cached contents. We have limited space on LRU memory cache so we don't want prioritized chunks (e.g. ones accessed via FUSE reads) to be evicted for storing less-important chunks (e.g. ones fetched in the background). So this patch enables the filesystem to tell that "this chunk I'm adding to the cache is less-important" using Direct option, which leads to the performance improvement as shown in the above.

func Direct() Option {
return func(o *cacheOpt) *cacheOpt {
o.direct = true
return o
}
}

func NewDirectoryCache(directory string, memCacheSize int, opts ...DirOption) (BlobCache, error) {
opt := &dirOpt{}
for _, o := range opts {
opt = o(opt)
func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache, error) {
maxEntry := config.MaxLRUCacheEntry
if maxEntry == 0 {
maxEntry = defaultMaxLRUCacheEntry
}
maxFds := config.MaxCacheFds
if maxFds == 0 {
maxFds = defaultMaxCacheFds
}
if err := os.MkdirAll(directory, os.ModePerm); err != nil {
return nil, err
}
dc := &directoryCache{
cache: lru.New(memCacheSize),
cache: newObjectCache(maxEntry),
fileCache: newObjectCache(maxFds),
directory: directory,
bufPool: sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
}
if opt.syncAdd {
dc.syncAdd = true
dc.cache.finalize = func(value interface{}) {
dc.bufPool.Put(value)
}
dc.fileCache.finalize = func(value interface{}) {
value.(*os.File).Close()
}
dc.syncAdd = config.SyncAdd
return dc, nil
}

// directoryCache is a cache implementation which backend is a directory.
type directoryCache struct {
cache *lru.Cache
cacheMu sync.Mutex
cache *objectCache
fileCache *objectCache
directory string
syncAdd bool
fileMu sync.Mutex

bufPool sync.Pool

syncAdd bool
}

func (dc *directoryCache) Fetch(blobHash string) (p []byte, err error) {
dc.cacheMu.Lock()
defer dc.cacheMu.Unlock()
func (dc *directoryCache) FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) {
opt := &cacheOpt{}
for _, o := range opts {
opt = o(opt)
}

if cache, ok := dc.cache.Get(blobHash); ok {
p, ok := cache.([]byte)
if ok {
return p, nil
if !opt.direct {
// Get data from memory
if b, done, ok := dc.cache.get(key); ok {
defer done()
data := b.(*bytes.Buffer).Bytes()
if int64(len(data)) < offset {
return 0, fmt.Errorf("invalid offset %d exceeds chunk size %d",
offset, len(data))
}
return copy(p, data[offset:]), nil
}
}

c := filepath.Join(dc.directory, blobHash[:2], blobHash)
if _, err := os.Stat(c); err != nil {
return nil, errors.Wrapf(err, "Missed cache %q", c)
// Get data from disk. If the file is already opened, use it.
if f, done, ok := dc.fileCache.get(key); ok {
defer done()
return f.(*os.File).ReadAt(p, offset)
}
}

file, err := os.Open(c)
// Open the cache file and read the target region
// TODO: If the target cache is write-in-progress, should we wait for the completion
// or simply report the cache miss?
file, err := os.Open(dc.cachePath(key))
if err != nil {
return nil, errors.Wrapf(err, "Failed to Open cached blob file %q", c)
return 0, errors.Wrapf(err, "failed to open blob file for %q", key)
}
if n, err = file.ReadAt(p, offset); err == io.EOF {
err = nil
}
defer file.Close()

if p, err = ioutil.ReadAll(file); err != nil && err != io.EOF {
return nil, errors.Wrapf(err, "failed to read cached data %q", c)
// Cache the opened file for future use. If "direct" option is specified, this
// won't be done. This option is useful for preventing file cache from being
// polluted by data that won't be accessed immediately.
if opt.direct || !dc.fileCache.add(key, file) {
file.Close()
}
dc.cache.Add(blobHash, p)

return
// TODO: should we cache the entire file data on memory?
// but making I/O (possibly huge) on every fetching
// might be costly.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should cache every thing to disk, and use a memory (with a limited size) as the cache of disk (cache is supposed to be layer by layer, just like CPU L1-L2-L3 cache -> memory -> disk -> remote server).
This will be easily achieved in the prefetch scenario, but the normal scenario.
In the normal scenario, a writeback is needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is that copying the entire chunk file from disk to memory on every access can be very costly. (NOTE: The client of cache doesn't always query the entire chunk file. From this perspective, the new FetchAt API in this patch allows them to specify the partial contents, which reduces unnecessary I/Os and leads to the performance improvement, as shown in the above). During writing this patch, I tackled this TODO by copying chunk file contents to memory in the background (in goroutine), but this resulted in noisy I/Os which resulted in lower performance.

Either way, I think we can keep this out of scope for this PR and tackle in another PR (contributions are welcome 😄).


return n, err
}

func (dc *directoryCache) Add(blobHash string, p []byte) {
// Copy the original data for avoiding the cached contents to be edited accidentally
p2 := make([]byte, len(p))
copy(p2, p)
p = p2
func (dc *directoryCache) Add(key string, p []byte, opts ...Option) {
opt := &cacheOpt{}
for _, o := range opts {
opt = o(opt)
}

dc.cacheMu.Lock()
dc.cache.Add(blobHash, p)
dc.cacheMu.Unlock()
if !opt.direct {
// Cache the passed data on memory. This enables to serve this data even
// during writing it to the disk. If "direct" option is specified, this
// won't be done. This option is useful for preventing memory cache from being
// polluted by data that won't be accessed immediately.
b := dc.bufPool.Get().(*bytes.Buffer)
b.Reset()
b.Write(p)
if !dc.cache.add(key, b) {
dc.bufPool.Put(b) // Already exists. No need to cache.
}
}

// Cache the passed data to disk.
b2 := dc.bufPool.Get().(*bytes.Buffer)
b2.Reset()
b2.Write(p)
addFunc := func() {
dc.fileMu.Lock()
defer dc.fileMu.Unlock()
defer dc.bufPool.Put(b2)

// Check if cache exists.
c := filepath.Join(dc.directory, blobHash[:2], blobHash)
var (
c = dc.cachePath(key)
wip = dc.wipPath(key)
)
if _, err := os.Stat(wip); err == nil {
return // Write in progress
}
if _, err := os.Stat(c); err == nil {
return // Already exists.
}

// Write the contents to a temporary file
if err := os.MkdirAll(filepath.Dir(wip), os.ModePerm); err != nil {
fmt.Printf("Warning: Failed to Create blob cache directory %q: %v\n", c, err)
return
}
wipfile, err := os.Create(wip)
if err != nil {
fmt.Printf("Warning: failed to prepare temp file for storing cache %q", key)
return
}
defer func() {
wipfile.Close()
os.Remove(wipfile.Name())
}()
want := b2.Len()
if _, err := io.CopyN(wipfile, b2, int64(want)); err != nil {
fmt.Printf("Warning: failed to write cache: %v\n", err)
return
}

// Create cache file
// Commit the cache contents
if err := os.MkdirAll(filepath.Dir(c), os.ModePerm); err != nil {
fmt.Printf("Warning: Failed to Create blob cache directory %q: %v\n", c, err)
return
}
f, err := os.Create(c)
if err := os.Rename(wipfile.Name(), c); err != nil {
fmt.Printf("Warning: failed to commit cache to %q: %v\n", c, err)
return
}
file, err := os.Open(c)
if err != nil {
fmt.Printf("Warning: could not create a cache file at %q: %v\n", c, err)
fmt.Printf("Warning: failed to open cache on %q: %v\n", c, err)
return
}
defer f.Close()
if n, err := f.Write(p); err != nil || n != len(p) {
fmt.Printf("Warning: failed to write cache: %d(wrote)/%d(expected): %v\n",
n, len(p), err)

// Cache the opened file for future use. If "direct" option is specified, this
// won't be done. This option is useful for preventing file cache from being
// polluted by data that won't be accessed immediately.
if opt.direct || !dc.fileCache.add(key, file) {
file.Close()
}
}

Expand All @@ -149,6 +243,81 @@ func (dc *directoryCache) Add(blobHash string, p []byte) {
}
}

func (dc *directoryCache) cachePath(key string) string {
return filepath.Join(dc.directory, key[:2], key)
}

func (dc *directoryCache) wipPath(key string) string {
return filepath.Join(dc.directory, key[:2], "w", key)
}

func newObjectCache(maxEntries int) *objectCache {
oc := &objectCache{
cache: lru.New(maxEntries),
}
oc.cache.OnEvicted = func(key lru.Key, value interface{}) {
value.(*object).release() // Decrease ref count incremented in add operation.
}
return oc
}

type objectCache struct {
cache *lru.Cache
cacheMu sync.Mutex
finalize func(interface{})
}

func (oc *objectCache) get(key string) (value interface{}, done func(), ok bool) {
oc.cacheMu.Lock()
defer oc.cacheMu.Unlock()
o, ok := oc.cache.Get(key)
if !ok {
return nil, nil, false
}
o.(*object).use()
return o.(*object).v, func() { o.(*object).release() }, true
}

func (oc *objectCache) add(key string, value interface{}) bool {
oc.cacheMu.Lock()
defer oc.cacheMu.Unlock()
if _, ok := oc.cache.Get(key); ok {
return false // TODO: should we swap the object?
}
o := &object{
v: value,
finalize: oc.finalize,
}
o.use() // Keep this object having at least 1 ref count (will be decreased on eviction)
oc.cache.Add(key, o)
return true
}

type object struct {
v interface{}

refCounts int64
finalize func(interface{})

mu sync.Mutex
}

func (o *object) use() {
o.mu.Lock()
defer o.mu.Unlock()
o.refCounts++
}

func (o *object) release() {
o.mu.Lock()
defer o.mu.Unlock()
o.refCounts--
if o.refCounts <= 0 && o.finalize != nil {
// nobody will refer this object
o.finalize(o.v)
}
}

func NewMemoryCache() BlobCache {
return &memoryCache{
membuf: map[string]string{},
Expand All @@ -161,19 +330,19 @@ type memoryCache struct {
mu sync.Mutex
}

func (mc *memoryCache) Fetch(blobHash string) ([]byte, error) {
func (mc *memoryCache) FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) {
mc.mu.Lock()
defer mc.mu.Unlock()

cache, ok := mc.membuf[blobHash]
cache, ok := mc.membuf[key]
if !ok {
return nil, fmt.Errorf("Missed cache: %q", blobHash)
return 0, fmt.Errorf("Missed cache: %q", key)
}
return []byte(cache), nil
return copy(p, cache[offset:]), nil
}

func (mc *memoryCache) Add(blobHash string, p []byte) {
func (mc *memoryCache) Add(key string, p []byte, opts ...Option) {
mc.mu.Lock()
defer mc.mu.Unlock()
mc.membuf[blobHash] = string(p)
mc.membuf[key] = string(p)
}
Loading