Skip to content

Commit

Permalink
Improve file read performance
Browse files Browse the repository at this point in the history
Throughout the benchmarking in the community, it turned out the file read
performance is low especially on random and parallel reads.

This commit solves this by the following fixes:
- minimizing the occurrence of slice allocation in the execution path of file
  read, leveraging sync.Pool,
- minimizing the memory copy and disk I/O by allowing to fetch a partials range
  of blobs from the cache, and
- minimizing the locked region in the cache.

Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
  • Loading branch information
ktock committed Jun 4, 2020
1 parent d8b1b19 commit b3c5173
Show file tree
Hide file tree
Showing 9 changed files with 584 additions and 262 deletions.
295 changes: 232 additions & 63 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package cache

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sync"
Expand All @@ -28,117 +28,211 @@ import (
"github.com/pkg/errors"
)

const (
defaultMaxLRUCacheEntry = 10
defaultMaxCacheFds = 10
)

type DirectoryCacheConfig struct {
MaxLRUCacheEntry int `toml:"max_lru_cache_entry"`
MaxCacheFds int `toml:"max_cache_fds"`
SyncAdd bool `toml:"sync_add"`
}

// TODO: contents validation.

type BlobCache interface {
Fetch(blobHash string) ([]byte, error)
Add(blobHash string, p []byte)
Add(key string, p []byte, opts ...Option)
FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error)
}

type dirOpt struct {
syncAdd bool
type cacheOpt struct {
direct bool
}

type DirOption func(o *dirOpt) *dirOpt
type Option func(o *cacheOpt) *cacheOpt

func SyncAdd() DirOption {
return func(o *dirOpt) *dirOpt {
o.syncAdd = true
// When Direct option is specified for FetchAt and Add methods, these operation
// won't use on-memory caches. When you know that the targeting value won't be
// used immediately, you can prevent the limited space of on-memory caches from
// being polluted by these unimportant values.
func Direct() Option {
return func(o *cacheOpt) *cacheOpt {
o.direct = true
return o
}
}

func NewDirectoryCache(directory string, memCacheSize int, opts ...DirOption) (BlobCache, error) {
opt := &dirOpt{}
for _, o := range opts {
opt = o(opt)
func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache, error) {
maxEntry := config.MaxLRUCacheEntry
if maxEntry == 0 {
maxEntry = defaultMaxLRUCacheEntry
}
maxFds := config.MaxCacheFds
if maxFds == 0 {
maxFds = defaultMaxCacheFds
}
if err := os.MkdirAll(directory, os.ModePerm); err != nil {
return nil, err
}
dc := &directoryCache{
cache: lru.New(memCacheSize),
cache: newObjectCache(maxEntry),
fileCache: newObjectCache(maxFds),
directory: directory,
bufPool: sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
}
if opt.syncAdd {
dc.syncAdd = true
dc.cache.finalize = func(value interface{}) {
dc.bufPool.Put(value)
}
dc.fileCache.finalize = func(value interface{}) {
value.(*os.File).Close()
}
dc.syncAdd = config.SyncAdd
return dc, nil
}

// directoryCache is a cache implementation which backend is a directory.
type directoryCache struct {
cache *lru.Cache
cacheMu sync.Mutex
cache *objectCache
fileCache *objectCache
directory string
syncAdd bool
fileMu sync.Mutex

bufPool sync.Pool

syncAdd bool
}

func (dc *directoryCache) Fetch(blobHash string) (p []byte, err error) {
dc.cacheMu.Lock()
defer dc.cacheMu.Unlock()
func (dc *directoryCache) FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) {
opt := &cacheOpt{}
for _, o := range opts {
opt = o(opt)
}

if cache, ok := dc.cache.Get(blobHash); ok {
p, ok := cache.([]byte)
if ok {
return p, nil
if !opt.direct {
// Get data from memory
if b, done, ok := dc.cache.get(key); ok {
defer done()
data := b.(*bytes.Buffer).Bytes()
if int64(len(data)) < offset {
return 0, fmt.Errorf("invalid offset %d exceeds chunk size %d",
offset, len(data))
}
return copy(p, data[offset:]), nil
}
}

c := filepath.Join(dc.directory, blobHash[:2], blobHash)
if _, err := os.Stat(c); err != nil {
return nil, errors.Wrapf(err, "Missed cache %q", c)
// Get data from disk. If the file is already opened, use it.
if f, done, ok := dc.fileCache.get(key); ok {
defer done()
return f.(*os.File).ReadAt(p, offset)
}
}

file, err := os.Open(c)
// Open the cache file and read the target region
// TODO: If the target cache is write-in-progress, should we wait for the completion
// or simply report the cache miss?
file, err := os.Open(dc.cachePath(key))
if err != nil {
return nil, errors.Wrapf(err, "Failed to Open cached blob file %q", c)
return 0, errors.Wrapf(err, "failed to open blob file for %q", key)
}
if n, err = file.ReadAt(p, offset); err == io.EOF {
err = nil
}
defer file.Close()

if p, err = ioutil.ReadAll(file); err != nil && err != io.EOF {
return nil, errors.Wrapf(err, "failed to read cached data %q", c)
// Cache the opened file for future use. If "direct" option is specified, this
// won't be done. This option is useful for preventing file cache from being
// polluted by data that won't be accessed immediately.
if opt.direct || !dc.fileCache.add(key, file) {
file.Close()
}
dc.cache.Add(blobHash, p)

return
// TODO: should we cache the entire file data on memory?
// but making I/O (possibly huge) on every fetching
// might be costly.

return n, err
}

func (dc *directoryCache) Add(blobHash string, p []byte) {
// Copy the original data for avoiding the cached contents to be edited accidentally
p2 := make([]byte, len(p))
copy(p2, p)
p = p2
func (dc *directoryCache) Add(key string, p []byte, opts ...Option) {
opt := &cacheOpt{}
for _, o := range opts {
opt = o(opt)
}

dc.cacheMu.Lock()
dc.cache.Add(blobHash, p)
dc.cacheMu.Unlock()
if !opt.direct {
// Cache the passed data on memory. This enables to serve this data even
// during writing it to the disk. If "direct" option is specified, this
// won't be done. This option is useful for preventing memory cache from being
// polluted by data that won't be accessed immediately.
b := dc.bufPool.Get().(*bytes.Buffer)
b.Reset()
b.Write(p)
if !dc.cache.add(key, b) {
dc.bufPool.Put(b) // Already exists. No need to cache.
}
}

// Cache the passed data to disk.
b2 := dc.bufPool.Get().(*bytes.Buffer)
b2.Reset()
b2.Write(p)
addFunc := func() {
dc.fileMu.Lock()
defer dc.fileMu.Unlock()
defer dc.bufPool.Put(b2)

// Check if cache exists.
c := filepath.Join(dc.directory, blobHash[:2], blobHash)
var (
c = dc.cachePath(key)
wip = dc.wipPath(key)
)
if _, err := os.Stat(wip); err == nil {
return // Write in progress
}
if _, err := os.Stat(c); err == nil {
return // Already exists.
}

// Write the contents to a temporary file
if err := os.MkdirAll(filepath.Dir(wip), os.ModePerm); err != nil {
fmt.Printf("Warning: Failed to Create blob cache directory %q: %v\n", c, err)
return
}
wipfile, err := os.Create(wip)
if err != nil {
fmt.Printf("Warning: failed to prepare temp file for storing cache %q", key)
return
}
defer func() {
wipfile.Close()
os.Remove(wipfile.Name())
}()
want := b2.Len()
if _, err := io.CopyN(wipfile, b2, int64(want)); err != nil {
fmt.Printf("Warning: failed to write cache: %v\n", err)
return
}

// Create cache file
// Commit the cache contents
if err := os.MkdirAll(filepath.Dir(c), os.ModePerm); err != nil {
fmt.Printf("Warning: Failed to Create blob cache directory %q: %v\n", c, err)
return
}
f, err := os.Create(c)
if err := os.Rename(wipfile.Name(), c); err != nil {
fmt.Printf("Warning: failed to commit cache to %q: %v\n", c, err)
return
}
file, err := os.Open(c)
if err != nil {
fmt.Printf("Warning: could not create a cache file at %q: %v\n", c, err)
fmt.Printf("Warning: failed to open cache on %q: %v\n", c, err)
return
}
defer f.Close()
if n, err := f.Write(p); err != nil || n != len(p) {
fmt.Printf("Warning: failed to write cache: %d(wrote)/%d(expected): %v\n",
n, len(p), err)

// Cache the opened file for future use. If "direct" option is specified, this
// won't be done. This option is useful for preventing file cache from being
// polluted by data that won't be accessed immediately.
if opt.direct || !dc.fileCache.add(key, file) {
file.Close()
}
}

Expand All @@ -149,6 +243,81 @@ func (dc *directoryCache) Add(blobHash string, p []byte) {
}
}

func (dc *directoryCache) cachePath(key string) string {
return filepath.Join(dc.directory, key[:2], key)
}

func (dc *directoryCache) wipPath(key string) string {
return filepath.Join(dc.directory, key[:2], "w", key)
}

func newObjectCache(maxEntries int) *objectCache {
oc := &objectCache{
cache: lru.New(maxEntries),
}
oc.cache.OnEvicted = func(key lru.Key, value interface{}) {
value.(*object).release() // Decrease ref count incremented in add operation.
}
return oc
}

type objectCache struct {
cache *lru.Cache
cacheMu sync.Mutex
finalize func(interface{})
}

func (oc *objectCache) get(key string) (value interface{}, done func(), ok bool) {
oc.cacheMu.Lock()
defer oc.cacheMu.Unlock()
o, ok := oc.cache.Get(key)
if !ok {
return nil, nil, false
}
o.(*object).use()
return o.(*object).v, func() { o.(*object).release() }, true
}

func (oc *objectCache) add(key string, value interface{}) bool {
oc.cacheMu.Lock()
defer oc.cacheMu.Unlock()
if _, ok := oc.cache.Get(key); ok {
return false // TODO: should we swap the object?
}
o := &object{
v: value,
finalize: oc.finalize,
}
o.use() // Keep this object having at least 1 ref count (will be decreased on eviction)
oc.cache.Add(key, o)
return true
}

type object struct {
v interface{}

refCounts int64
finalize func(interface{})

mu sync.Mutex
}

func (o *object) use() {
o.mu.Lock()
defer o.mu.Unlock()
o.refCounts++
}

func (o *object) release() {
o.mu.Lock()
defer o.mu.Unlock()
o.refCounts--
if o.refCounts <= 0 && o.finalize != nil {
// nobody will refer this object
o.finalize(o.v)
}
}

func NewMemoryCache() BlobCache {
return &memoryCache{
membuf: map[string]string{},
Expand All @@ -161,19 +330,19 @@ type memoryCache struct {
mu sync.Mutex
}

func (mc *memoryCache) Fetch(blobHash string) ([]byte, error) {
func (mc *memoryCache) FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) {
mc.mu.Lock()
defer mc.mu.Unlock()

cache, ok := mc.membuf[blobHash]
cache, ok := mc.membuf[key]
if !ok {
return nil, fmt.Errorf("Missed cache: %q", blobHash)
return 0, fmt.Errorf("Missed cache: %q", key)
}
return []byte(cache), nil
return copy(p, cache[offset:]), nil
}

func (mc *memoryCache) Add(blobHash string, p []byte) {
func (mc *memoryCache) Add(key string, p []byte, opts ...Option) {
mc.mu.Lock()
defer mc.mu.Unlock()
mc.membuf[blobHash] = string(p)
mc.membuf[key] = string(p)
}
Loading

0 comments on commit b3c5173

Please sign in to comment.