From d8b1b19f9ba0be50d8fe712ba5b7e1c292f0f79c Mon Sep 17 00:00:00 2001 From: ktock Date: Mon, 1 Jun 2020 13:30:59 +0900 Subject: [PATCH 1/2] Prevent the prefetch from making big HTTP request header During prefetch, current filesystem implementation chunks this prefetching range into small, many, and mostly neighbouring chunks and lists them in a single HTTP Range Request header without enough squashing. Sometimes this leads to large HTTP header and the request fails. This commit solves this by aggressively squashing neighbouring/overwrapping chunks in HTTP headers, as much as possible. Signed-off-by: Kohei Tokunaga --- stargz/remote/util.go | 57 ++++++++++++++++++++++++++++---------- stargz/remote/util_test.go | 14 +++++++++- 2 files changed, 55 insertions(+), 16 deletions(-) diff --git a/stargz/remote/util.go b/stargz/remote/util.go index 3a7c7e1a0..1fd3899bf 100644 --- a/stargz/remote/util.go +++ b/stargz/remote/util.go @@ -46,31 +46,58 @@ func superRegion(regs []region) region { // regionSet is a set of regions type regionSet struct { - rs []region + rs []region // must be kept sorted } -// add attempts to merge r to rs.rs +// add attempts to merge r to rs.rs with squashing the regions as +// small as possible. This operation takes O(n). +// TODO: more efficient way to do it. func (rs *regionSet) add(r region) { - for i := range rs.rs { - f := &rs.rs[i] - if r.b <= f.b && f.b <= r.e+1 && r.e <= f.e { - f.b = r.b + // Iterate over the sorted region slice from the tail. + // a) When an overwrap occurs, adjust `r` to fully contain the looking region + // `l` and remove `l` from region slice. + // b) Once l.e become less than r.b, no overwrap will occur again. So immediately + // insert `r` which fully contains all overwrapped regions, to the region slice. + // Here, `r` is inserted to the region slice with keeping it sorted, without + // overwrapping to any regions. + // *) If any `l` contains `r`, we don't need to do anything so return immediately. + for i := len(rs.rs) - 1; i >= 0; i-- { + l := &rs.rs[i] + + // *) l contains r + if l.b <= r.b && r.e <= l.e { return } - if f.b <= r.b && r.e <= f.e { - return + + // a) r overwraps to l so adjust r to fully contain l and reomve l + // from region slice. + if l.b <= r.b && r.b <= l.e+1 && l.e <= r.e { + r.b = l.b + rs.rs = append(rs.rs[:i], rs.rs[i+1:]...) + continue } - if f.b <= r.b && r.b <= f.e+1 && f.e <= r.e { - f.e = r.e - return + if r.b <= l.b && l.b <= r.e+1 && r.e <= l.e { + r.e = l.e + rs.rs = append(rs.rs[:i], rs.rs[i+1:]...) + continue } - if r.b <= f.b && f.e <= r.e { - f.b = r.b - f.e = r.e + if r.b <= l.b && l.e <= r.e { + rs.rs = append(rs.rs[:i], rs.rs[i+1:]...) + continue + } + + // b) No overwrap will occur after this iteration. Instert r to the + // region slice immediately. + if l.e < r.b { + rs.rs = append(rs.rs[:i+1], append([]region{r}, rs.rs[i+1:]...)...) return } + + // No overwrap occurs yet. See the next region. } - rs.rs = append(rs.rs, r) + + // r is the topmost region among regions in the slice. + rs.rs = append([]region{r}, rs.rs...) } func (rs *regionSet) totalSize() int64 { diff --git a/stargz/remote/util_test.go b/stargz/remote/util_test.go index 8058971c6..827c53438 100644 --- a/stargz/remote/util_test.go +++ b/stargz/remote/util_test.go @@ -46,7 +46,7 @@ func TestRegionSet(t *testing.T) { }, { input: []region{{2, 4}, {6, 8}, {1, 5}}, - expected: []region{{1, 5}, {6, 8}}, + expected: []region{{1, 8}}, }, { input: []region{{1, 2}, {1, 2}}, @@ -72,6 +72,18 @@ func TestRegionSet(t *testing.T) { input: []region{{4, 6}, {1, 3}}, // region.e is inclusive expected: []region{{1, 6}}, }, + { + input: []region{{4, 6}, {1, 3}, {7, 9}, {2, 8}}, + expected: []region{{1, 9}}, + }, + { + input: []region{{4, 6}, {1, 5}, {7, 9}, {4, 8}}, + expected: []region{{1, 9}}, + }, + { + input: []region{{7, 8}, {1, 2}, {5, 6}}, + expected: []region{{1, 2}, {5, 8}}, + }, } for i, tt := range tests { var rs regionSet From b3c5173d33fa15f1259431e5eec65f72a3224742 Mon Sep 17 00:00:00 2001 From: ktock Date: Mon, 1 Jun 2020 13:31:41 +0900 Subject: [PATCH 2/2] Improve file read performance Throughout the benchmarking in the community, it turned out the file read performance is low especially on random and parallel reads. This commit solves this by the following fixes: - minimizing the occurrence of slice allocation in the execution path of file read, leveraging sync.Pool, - minimizing the memory copy and disk I/O by allowing to fetch a partials range of blobs from the cache, and - minimizing the locked region in the cache. Signed-off-by: Kohei Tokunaga --- cache/cache.go | 295 +++++++++++++++++++++++++++-------- cache/cache_test.go | 49 +++--- stargz/fs.go | 104 +++++++----- stargz/fs_test.go | 25 +-- stargz/reader/reader.go | 134 ++++++++++------ stargz/reader/reader_test.go | 25 +-- stargz/remote/blob.go | 148 ++++++++++++------ stargz/remote/blob_test.go | 28 ++-- stargz/remote/resolver.go | 38 +++-- 9 files changed, 584 insertions(+), 262 deletions(-) diff --git a/cache/cache.go b/cache/cache.go index 47c49a384..11641d6f9 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -17,9 +17,9 @@ package cache import ( + "bytes" "fmt" "io" - "io/ioutil" "os" "path/filepath" "sync" @@ -28,117 +28,211 @@ import ( "github.com/pkg/errors" ) +const ( + defaultMaxLRUCacheEntry = 10 + defaultMaxCacheFds = 10 +) + +type DirectoryCacheConfig struct { + MaxLRUCacheEntry int `toml:"max_lru_cache_entry"` + MaxCacheFds int `toml:"max_cache_fds"` + SyncAdd bool `toml:"sync_add"` +} + // TODO: contents validation. type BlobCache interface { - Fetch(blobHash string) ([]byte, error) - Add(blobHash string, p []byte) + Add(key string, p []byte, opts ...Option) + FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) } -type dirOpt struct { - syncAdd bool +type cacheOpt struct { + direct bool } -type DirOption func(o *dirOpt) *dirOpt +type Option func(o *cacheOpt) *cacheOpt -func SyncAdd() DirOption { - return func(o *dirOpt) *dirOpt { - o.syncAdd = true +// When Direct option is specified for FetchAt and Add methods, these operation +// won't use on-memory caches. When you know that the targeting value won't be +// used immediately, you can prevent the limited space of on-memory caches from +// being polluted by these unimportant values. +func Direct() Option { + return func(o *cacheOpt) *cacheOpt { + o.direct = true return o } } -func NewDirectoryCache(directory string, memCacheSize int, opts ...DirOption) (BlobCache, error) { - opt := &dirOpt{} - for _, o := range opts { - opt = o(opt) +func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache, error) { + maxEntry := config.MaxLRUCacheEntry + if maxEntry == 0 { + maxEntry = defaultMaxLRUCacheEntry + } + maxFds := config.MaxCacheFds + if maxFds == 0 { + maxFds = defaultMaxCacheFds } if err := os.MkdirAll(directory, os.ModePerm); err != nil { return nil, err } dc := &directoryCache{ - cache: lru.New(memCacheSize), + cache: newObjectCache(maxEntry), + fileCache: newObjectCache(maxFds), directory: directory, + bufPool: sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + }, } - if opt.syncAdd { - dc.syncAdd = true + dc.cache.finalize = func(value interface{}) { + dc.bufPool.Put(value) } + dc.fileCache.finalize = func(value interface{}) { + value.(*os.File).Close() + } + dc.syncAdd = config.SyncAdd return dc, nil } // directoryCache is a cache implementation which backend is a directory. type directoryCache struct { - cache *lru.Cache - cacheMu sync.Mutex + cache *objectCache + fileCache *objectCache directory string - syncAdd bool - fileMu sync.Mutex + + bufPool sync.Pool + + syncAdd bool } -func (dc *directoryCache) Fetch(blobHash string) (p []byte, err error) { - dc.cacheMu.Lock() - defer dc.cacheMu.Unlock() +func (dc *directoryCache) FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) { + opt := &cacheOpt{} + for _, o := range opts { + opt = o(opt) + } - if cache, ok := dc.cache.Get(blobHash); ok { - p, ok := cache.([]byte) - if ok { - return p, nil + if !opt.direct { + // Get data from memory + if b, done, ok := dc.cache.get(key); ok { + defer done() + data := b.(*bytes.Buffer).Bytes() + if int64(len(data)) < offset { + return 0, fmt.Errorf("invalid offset %d exceeds chunk size %d", + offset, len(data)) + } + return copy(p, data[offset:]), nil } - } - c := filepath.Join(dc.directory, blobHash[:2], blobHash) - if _, err := os.Stat(c); err != nil { - return nil, errors.Wrapf(err, "Missed cache %q", c) + // Get data from disk. If the file is already opened, use it. + if f, done, ok := dc.fileCache.get(key); ok { + defer done() + return f.(*os.File).ReadAt(p, offset) + } } - file, err := os.Open(c) + // Open the cache file and read the target region + // TODO: If the target cache is write-in-progress, should we wait for the completion + // or simply report the cache miss? + file, err := os.Open(dc.cachePath(key)) if err != nil { - return nil, errors.Wrapf(err, "Failed to Open cached blob file %q", c) + return 0, errors.Wrapf(err, "failed to open blob file for %q", key) + } + if n, err = file.ReadAt(p, offset); err == io.EOF { + err = nil } - defer file.Close() - if p, err = ioutil.ReadAll(file); err != nil && err != io.EOF { - return nil, errors.Wrapf(err, "failed to read cached data %q", c) + // Cache the opened file for future use. If "direct" option is specified, this + // won't be done. This option is useful for preventing file cache from being + // polluted by data that won't be accessed immediately. + if opt.direct || !dc.fileCache.add(key, file) { + file.Close() } - dc.cache.Add(blobHash, p) - return + // TODO: should we cache the entire file data on memory? + // but making I/O (possibly huge) on every fetching + // might be costly. + + return n, err } -func (dc *directoryCache) Add(blobHash string, p []byte) { - // Copy the original data for avoiding the cached contents to be edited accidentally - p2 := make([]byte, len(p)) - copy(p2, p) - p = p2 +func (dc *directoryCache) Add(key string, p []byte, opts ...Option) { + opt := &cacheOpt{} + for _, o := range opts { + opt = o(opt) + } - dc.cacheMu.Lock() - dc.cache.Add(blobHash, p) - dc.cacheMu.Unlock() + if !opt.direct { + // Cache the passed data on memory. This enables to serve this data even + // during writing it to the disk. If "direct" option is specified, this + // won't be done. This option is useful for preventing memory cache from being + // polluted by data that won't be accessed immediately. + b := dc.bufPool.Get().(*bytes.Buffer) + b.Reset() + b.Write(p) + if !dc.cache.add(key, b) { + dc.bufPool.Put(b) // Already exists. No need to cache. + } + } + // Cache the passed data to disk. + b2 := dc.bufPool.Get().(*bytes.Buffer) + b2.Reset() + b2.Write(p) addFunc := func() { - dc.fileMu.Lock() - defer dc.fileMu.Unlock() + defer dc.bufPool.Put(b2) - // Check if cache exists. - c := filepath.Join(dc.directory, blobHash[:2], blobHash) + var ( + c = dc.cachePath(key) + wip = dc.wipPath(key) + ) + if _, err := os.Stat(wip); err == nil { + return // Write in progress + } if _, err := os.Stat(c); err == nil { + return // Already exists. + } + + // Write the contents to a temporary file + if err := os.MkdirAll(filepath.Dir(wip), os.ModePerm); err != nil { + fmt.Printf("Warning: Failed to Create blob cache directory %q: %v\n", c, err) + return + } + wipfile, err := os.Create(wip) + if err != nil { + fmt.Printf("Warning: failed to prepare temp file for storing cache %q", key) + return + } + defer func() { + wipfile.Close() + os.Remove(wipfile.Name()) + }() + want := b2.Len() + if _, err := io.CopyN(wipfile, b2, int64(want)); err != nil { + fmt.Printf("Warning: failed to write cache: %v\n", err) return } - // Create cache file + // Commit the cache contents if err := os.MkdirAll(filepath.Dir(c), os.ModePerm); err != nil { fmt.Printf("Warning: Failed to Create blob cache directory %q: %v\n", c, err) return } - f, err := os.Create(c) + if err := os.Rename(wipfile.Name(), c); err != nil { + fmt.Printf("Warning: failed to commit cache to %q: %v\n", c, err) + return + } + file, err := os.Open(c) if err != nil { - fmt.Printf("Warning: could not create a cache file at %q: %v\n", c, err) + fmt.Printf("Warning: failed to open cache on %q: %v\n", c, err) return } - defer f.Close() - if n, err := f.Write(p); err != nil || n != len(p) { - fmt.Printf("Warning: failed to write cache: %d(wrote)/%d(expected): %v\n", - n, len(p), err) + + // Cache the opened file for future use. If "direct" option is specified, this + // won't be done. This option is useful for preventing file cache from being + // polluted by data that won't be accessed immediately. + if opt.direct || !dc.fileCache.add(key, file) { + file.Close() } } @@ -149,6 +243,81 @@ func (dc *directoryCache) Add(blobHash string, p []byte) { } } +func (dc *directoryCache) cachePath(key string) string { + return filepath.Join(dc.directory, key[:2], key) +} + +func (dc *directoryCache) wipPath(key string) string { + return filepath.Join(dc.directory, key[:2], "w", key) +} + +func newObjectCache(maxEntries int) *objectCache { + oc := &objectCache{ + cache: lru.New(maxEntries), + } + oc.cache.OnEvicted = func(key lru.Key, value interface{}) { + value.(*object).release() // Decrease ref count incremented in add operation. + } + return oc +} + +type objectCache struct { + cache *lru.Cache + cacheMu sync.Mutex + finalize func(interface{}) +} + +func (oc *objectCache) get(key string) (value interface{}, done func(), ok bool) { + oc.cacheMu.Lock() + defer oc.cacheMu.Unlock() + o, ok := oc.cache.Get(key) + if !ok { + return nil, nil, false + } + o.(*object).use() + return o.(*object).v, func() { o.(*object).release() }, true +} + +func (oc *objectCache) add(key string, value interface{}) bool { + oc.cacheMu.Lock() + defer oc.cacheMu.Unlock() + if _, ok := oc.cache.Get(key); ok { + return false // TODO: should we swap the object? + } + o := &object{ + v: value, + finalize: oc.finalize, + } + o.use() // Keep this object having at least 1 ref count (will be decreased on eviction) + oc.cache.Add(key, o) + return true +} + +type object struct { + v interface{} + + refCounts int64 + finalize func(interface{}) + + mu sync.Mutex +} + +func (o *object) use() { + o.mu.Lock() + defer o.mu.Unlock() + o.refCounts++ +} + +func (o *object) release() { + o.mu.Lock() + defer o.mu.Unlock() + o.refCounts-- + if o.refCounts <= 0 && o.finalize != nil { + // nobody will refer this object + o.finalize(o.v) + } +} + func NewMemoryCache() BlobCache { return &memoryCache{ membuf: map[string]string{}, @@ -161,19 +330,19 @@ type memoryCache struct { mu sync.Mutex } -func (mc *memoryCache) Fetch(blobHash string) ([]byte, error) { +func (mc *memoryCache) FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) { mc.mu.Lock() defer mc.mu.Unlock() - cache, ok := mc.membuf[blobHash] + cache, ok := mc.membuf[key] if !ok { - return nil, fmt.Errorf("Missed cache: %q", blobHash) + return 0, fmt.Errorf("Missed cache: %q", key) } - return []byte(cache), nil + return copy(p, cache[offset:]), nil } -func (mc *memoryCache) Add(blobHash string, p []byte) { +func (mc *memoryCache) Add(key string, p []byte, opts ...Option) { mc.mu.Lock() defer mc.mu.Unlock() - mc.membuf[blobHash] = string(p) + mc.membuf[key] = string(p) } diff --git a/cache/cache_test.go b/cache/cache_test.go index 362597ee5..8aa5df308 100644 --- a/cache/cache_test.go +++ b/cache/cache_test.go @@ -42,7 +42,10 @@ func TestDirectoryCache(t *testing.T) { if err != nil { t.Fatalf("failed to make tempdir: %v", err) } - c, err := NewDirectoryCache(tmp, 10, SyncAdd()) + c, err := NewDirectoryCache(tmp, DirectoryCacheConfig{ + MaxLRUCacheEntry: 10, + SyncAdd: true, + }) if err != nil { t.Fatalf("failed to make cache: %v", err) } @@ -56,7 +59,10 @@ func TestDirectoryCache(t *testing.T) { if err != nil { t.Fatalf("failed to make tempdir: %v", err) } - c, err := NewDirectoryCache(tmp, 1, SyncAdd()) + c, err := NewDirectoryCache(tmp, DirectoryCacheConfig{ + MaxLRUCacheEntry: 1, + SyncAdd: true, + }) if err != nil { t.Fatalf("failed to make cache: %v", err) } @@ -144,28 +150,35 @@ func digestFor(content string) string { func hit(sample string) check { return func(t *testing.T, c BlobCache) { - d := digestFor(sample) - p, err := c.Fetch(d) - if err != nil { - t.Errorf("failed to fetch blob %q: %v", d, err) - return - } - if len(p) != len(sample) { - t.Errorf("fetched size %d; want %d", len(p), len(sample)) - return - } - df := digestFor(string(p)) - if df != d { - t.Errorf("fetched digest %q(%q); want %q(%q)", - df, string(p), d, sample) - } + // test whole blob + key := digestFor(sample) + testChunk(t, c, key, 0, sample) + + // test a chunk + chunk := len(sample) / 3 + testChunk(t, c, key, int64(chunk), sample[chunk:2*chunk]) + } +} + +func testChunk(t *testing.T, c BlobCache, key string, offset int64, sample string) { + p := make([]byte, len(sample)) + if n, err := c.FetchAt(key, offset, p); err != nil { + t.Errorf("failed to fetch blob %q: %v", key, err) + return + } else if n != len(sample) { + t.Errorf("fetched size %d; want %d", len(p), len(sample)) + return + } + if digestFor(sample) != digestFor(string(p)) { + t.Errorf("fetched %q; want %q", string(p), sample) } } func miss(sample string) check { return func(t *testing.T, c BlobCache) { d := digestFor(sample) - _, err := c.Fetch(d) + p := make([]byte, len(sample)) + _, err := c.FetchAt(d, 0, p) if err == nil { t.Errorf("hit blob %q but must be missed: %v", d, err) return diff --git a/stargz/fs.go b/stargz/fs.go index 2f12f7bfc..c50c66eda 100644 --- a/stargz/fs.go +++ b/stargz/fs.go @@ -71,15 +71,15 @@ import ( ) const ( - blockSize = 512 + blockSize = 4096 memoryCacheType = "memory" whiteoutPrefix = ".wh." whiteoutOpaqueDir = whiteoutPrefix + whiteoutPrefix + ".opq" opaqueXattr = "trusted.overlay.opaque" opaqueXattrValue = "y" stateDirName = ".stargz-snapshotter" - defaultLRUCacheEntry = 100 defaultResolveResultEntry = 100 + defaultPrefetchTimeoutSec = 10 statFileMode = syscall.S_IFREG | 0400 // -r-------- stateDirMode = syscall.S_IFDIR | 0500 // dr-x------ @@ -104,27 +104,38 @@ type Config struct { remote.ResolverConfig `toml:"resolver"` remote.BlobConfig `toml:"blob"` keychain.KubeconfigKeychainConfig `toml:"kubeconfig_keychain"` + cache.DirectoryCacheConfig `toml:"directory_cache"` HTTPCacheType string `toml:"http_cache_type"` FSCacheType string `toml:"filesystem_cache_type"` - LRUCacheEntry int `toml:"lru_max_entry"` ResolveResultEntry int `toml:"resolve_result_entry"` PrefetchSize int64 `toml:"prefetch_size"` + PrefetchTimeoutSec int64 `toml:"prefetch_timeout_sec"` NoPrefetch bool `toml:"noprefetch"` Debug bool `toml:"debug"` } -func NewFilesystem(ctx context.Context, root string, config *Config) (snbase.FileSystem, error) { - maxEntry := config.LRUCacheEntry - if maxEntry == 0 { - maxEntry = defaultLRUCacheEntry - } - httpCache, err := getCache(config.HTTPCacheType, filepath.Join(root, "httpcache"), maxEntry) - if err != nil { - return nil, err +func NewFilesystem(ctx context.Context, root string, config *Config) (_ snbase.FileSystem, err error) { + var httpCache cache.BlobCache + if config.HTTPCacheType == memoryCacheType { + httpCache = cache.NewMemoryCache() + } else { + if httpCache, err = cache.NewDirectoryCache( + filepath.Join(root, "http"), + config.DirectoryCacheConfig, + ); err != nil { + return nil, errors.Wrap(err, "failed to prepare HTTP cache") + } } - fsCache, err := getCache(config.FSCacheType, filepath.Join(root, "fscache"), maxEntry) - if err != nil { - return nil, err + var fsCache cache.BlobCache + if config.FSCacheType == memoryCacheType { + fsCache = cache.NewMemoryCache() + } else { + if fsCache, err = cache.NewDirectoryCache( + filepath.Join(root, "fscache"), + config.DirectoryCacheConfig, + ); err != nil { + return nil, errors.Wrap(err, "failed to prepare filesystem cache") + } } keychain := authn.NewMultiKeychain( authn.DefaultKeychain, @@ -134,12 +145,17 @@ func NewFilesystem(ctx context.Context, root string, config *Config) (snbase.Fil if resolveResultEntry == 0 { resolveResultEntry = defaultResolveResultEntry } + prefetchTimeout := time.Duration(config.PrefetchTimeoutSec) * time.Second + if prefetchTimeout == 0 { + prefetchTimeout = defaultPrefetchTimeoutSec * time.Second + } return &filesystem{ resolver: remote.NewResolver(keychain, config.ResolverConfig), blobConfig: config.BlobConfig, httpCache: httpCache, fsCache: fsCache, prefetchSize: config.PrefetchSize, + prefetchTimeout: prefetchTimeout, noprefetch: config.NoPrefetch, debug: config.Debug, layer: make(map[string]*layer), @@ -148,20 +164,13 @@ func NewFilesystem(ctx context.Context, root string, config *Config) (snbase.Fil }, nil } -// getCache gets a cache corresponding to specified type. -func getCache(ctype, dir string, maxEntry int) (cache.BlobCache, error) { - if ctype == memoryCacheType { - return cache.NewMemoryCache(), nil - } - return cache.NewDirectoryCache(dir, maxEntry) -} - type filesystem struct { resolver *remote.Resolver blobConfig remote.BlobConfig httpCache cache.BlobCache fsCache cache.BlobCache prefetchSize int64 + prefetchTimeout time.Duration noprefetch bool debug bool layer map[string]*layer @@ -245,7 +254,9 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s // Check() for this layer waits for the prefetch completion. We recreate // RoundTripper to avoid disturbing other NW-related operations. if !fs.noprefetch { + l.doPrefetch() go func() { + defer l.donePrefetch() fs.backgroundTaskManager.DoPrioritizedTask() defer fs.backgroundTaskManager.DonePrioritizedTask() tr, err := fetchTr() @@ -275,11 +286,17 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s retN, retErr = 0, err return } - retN, retErr = l.blob.ReadAt(p, offset, remote.WithContext(ctx), remote.WithRoundTripper(tr)) + retN, retErr = l.blob.ReadAt( + p, + offset, + remote.WithContext(ctx), // Make cancellable + remote.WithRoundTripper(tr), // Use dedicated Transport + remote.WithCacheOpts(cache.Direct()), // Do not pollute mem cache + ) }, 120*time.Second) return }), 0, l.blob.Size()) - if err := l.reader.CacheTarGzWithReader(br); err != nil { + if err := l.reader.CacheTarGzWithReader(br, cache.Direct()); err != nil { logCtx.WithError(err).Debug("failed to fetch whole layer") return } @@ -349,7 +366,7 @@ func (fs *filesystem) resolve(ctx context.Context, ref, digest string) *resolveR return nil, errors.Wrap(err, "failed to read layer") } - return newLayer(blob, gr, root), nil + return newLayer(blob, gr, root, fs.prefetchTimeout), nil }) } @@ -371,7 +388,7 @@ func (fs *filesystem) Check(ctx context.Context, mountpoint string) error { } // Wait for prefetch compeletion - if err := l.waitForPrefetchCompletion(10 * time.Second); err != nil { + if err := l.waitForPrefetchCompletion(); err != nil { logCtx.WithError(err).Warn("failed to sync with prefetch completion") } @@ -507,26 +524,33 @@ func (rr *resolveResult) isInProgress() bool { return rr.progress.isInProgress() } -func newLayer(blob remote.Blob, r reader.Reader, root *stargz.TOCEntry) *layer { +func newLayer(blob remote.Blob, r reader.Reader, root *stargz.TOCEntry, prefetchTimeout time.Duration) *layer { return &layer{ - blob: blob, - reader: r, - root: root, - prefetchWaiter: newWaiter(), + blob: blob, + reader: r, + root: root, + prefetchWaiter: newWaiter(), + prefetchTimeout: prefetchTimeout, } } type layer struct { - blob remote.Blob - reader reader.Reader - root *stargz.TOCEntry - prefetchWaiter *waiter + blob remote.Blob + reader reader.Reader + root *stargz.TOCEntry + prefetchWaiter *waiter + prefetchTimeout time.Duration } -func (l *layer) prefetch(prefetchSize int64, opts ...remote.Option) error { +func (l *layer) doPrefetch() { l.prefetchWaiter.start() - defer l.prefetchWaiter.done() +} + +func (l *layer) donePrefetch() { + l.prefetchWaiter.done() +} +func (l *layer) prefetch(prefetchSize int64, opts ...remote.Option) error { if _, ok := l.reader.Lookup(NoPrefetchLandmark); ok { // do not prefetch this layer return nil @@ -545,15 +569,15 @@ func (l *layer) prefetch(prefetchSize int64, opts ...remote.Option) error { return l.blob.ReadAt(p, off, opts...) }), 0, prefetchSize) err := l.reader.CacheTarGzWithReader(pr) - if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + if err != nil && errors.Cause(err) != io.EOF && errors.Cause(err) != io.ErrUnexpectedEOF { return errors.Wrap(err, "failed to cache prefetched layer") } return nil } -func (l *layer) waitForPrefetchCompletion(timeout time.Duration) error { - return l.prefetchWaiter.wait(timeout) +func (l *layer) waitForPrefetchCompletion() error { + return l.prefetchWaiter.wait(l.prefetchTimeout) } func newWaiter() *waiter { diff --git a/stargz/fs_test.go b/stargz/fs_test.go index f4156b0c5..8266c4946 100644 --- a/stargz/fs_test.go +++ b/stargz/fs_test.go @@ -41,6 +41,7 @@ import ( "testing" "time" + "github.com/containerd/stargz-snapshotter/cache" "github.com/containerd/stargz-snapshotter/stargz/reader" "github.com/containerd/stargz-snapshotter/stargz/remote" "github.com/containerd/stargz-snapshotter/task" @@ -62,7 +63,7 @@ func TestCheck(t *testing.T) { bb := &breakBlob{} fs := &filesystem{ layer: map[string]*layer{ - "test": newLayer(bb, nopreader{}, nil), + "test": newLayer(bb, nopreader{}, nil, time.Second), }, backgroundTaskManager: task.NewBackgroundTaskManager(1, time.Millisecond), } @@ -79,9 +80,9 @@ func TestCheck(t *testing.T) { type nopreader struct{} -func (r nopreader) OpenFile(name string) (io.ReaderAt, error) { return nil, nil } -func (r nopreader) Lookup(name string) (*stargz.TOCEntry, bool) { return nil, false } -func (r nopreader) CacheTarGzWithReader(ir io.Reader) error { return nil } +func (r nopreader) OpenFile(name string) (io.ReaderAt, error) { return nil, nil } +func (r nopreader) Lookup(name string) (*stargz.TOCEntry, bool) { return nil, false } +func (r nopreader) CacheTarGzWithReader(ir io.Reader, opts ...cache.Option) error { return nil } type breakBlob struct { success bool @@ -885,7 +886,7 @@ func TestPrefetch(t *testing.T) { if err != nil { t.Fatalf("failed to make stargz reader: %v", err) } - l := newLayer(blob, gr, nil) + l := newLayer(blob, gr, nil, time.Second) prefetchSize := int64(0) if tt.prefetchSize != nil { prefetchSize = tt.prefetchSize(l) @@ -966,22 +967,22 @@ type testCache struct { mu sync.Mutex } -func (tc *testCache) Fetch(blobHash string) ([]byte, error) { +func (tc *testCache) FetchAt(key string, offset int64, p []byte, opts ...cache.Option) (int, error) { tc.mu.Lock() defer tc.mu.Unlock() - cache, ok := tc.membuf[blobHash] + cache, ok := tc.membuf[key] if !ok { - return nil, fmt.Errorf("Missed cache: %q", blobHash) + return 0, fmt.Errorf("Missed cache: %q", key) } - return []byte(cache), nil + return copy(p, cache[offset:]), nil } -func (tc *testCache) Add(blobHash string, p []byte) { +func (tc *testCache) Add(key string, p []byte, opts ...cache.Option) { tc.mu.Lock() defer tc.mu.Unlock() - tc.membuf[blobHash] = string(p) - tc.t.Logf(" cached [%s...]: %q", blobHash[:8], string(p)) + tc.membuf[key] = string(p) + tc.t.Logf(" cached [%s...]: %q", key[:8], string(p)) } func TestWaiter(t *testing.T) { diff --git a/stargz/reader/reader.go b/stargz/reader/reader.go index d4a64c45c..f2a35103e 100644 --- a/stargz/reader/reader.go +++ b/stargz/reader/reader.go @@ -24,11 +24,14 @@ package reader import ( "archive/tar" + "bytes" "compress/gzip" "crypto/sha256" "fmt" "io" + "io/ioutil" "strings" + "sync" "github.com/containerd/stargz-snapshotter/cache" "github.com/google/crfs/stargz" @@ -38,7 +41,7 @@ import ( type Reader interface { OpenFile(name string) (io.ReaderAt, error) Lookup(name string) (*stargz.TOCEntry, bool) - CacheTarGzWithReader(r io.Reader) error + CacheTarGzWithReader(r io.Reader, opts ...cache.Option) error } func NewReader(sr *io.SectionReader, cache cache.BlobCache) (Reader, *stargz.TOCEntry, error) { @@ -56,13 +59,19 @@ func NewReader(sr *io.SectionReader, cache cache.BlobCache) (Reader, *stargz.TOC r: r, sr: sr, cache: cache, + bufPool: sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + }, }, root, nil } type reader struct { - r *stargz.Reader - sr *io.SectionReader - cache cache.BlobCache + r *stargz.Reader + sr *io.SectionReader + cache cache.BlobCache + bufPool sync.Pool } func (gr *reader) OpenFile(name string) (io.ReaderAt, error) { @@ -80,6 +89,7 @@ func (gr *reader) OpenFile(name string) (io.ReaderAt, error) { r: gr.r, cache: gr.cache, ra: sr, + gr: gr, }, nil } @@ -87,10 +97,10 @@ func (gr *reader) Lookup(name string) (*stargz.TOCEntry, bool) { return gr.r.Lookup(name) } -func (gr *reader) CacheTarGzWithReader(r io.Reader) error { +func (gr *reader) CacheTarGzWithReader(r io.Reader, opts ...cache.Option) error { gzr, err := gzip.NewReader(r) if err != nil { - return err + return errors.Wrapf(err, "failed to get gzip reader") } defer gzr.Close() tr := tar.NewReader(gzr) @@ -98,7 +108,7 @@ func (gr *reader) CacheTarGzWithReader(r io.Reader) error { h, err := tr.Next() if err != nil { if err != io.EOF { - return err + return errors.Wrapf(err, "failed to read next tar entry") } break } @@ -116,20 +126,42 @@ func (gr *reader) CacheTarGzWithReader(r io.Reader) error { if !ok { break } - id := genID(fe.Digest, ce.ChunkOffset, ce.ChunkSize) - if cacheData, err := gr.cache.Fetch(id); err != nil || len(cacheData) != int(ce.ChunkSize) { - // make sure that this range is at ce.ChunkOffset for ce.ChunkSize - if nr != ce.ChunkOffset { - return fmt.Errorf("invalid offset %d != %d", nr, ce.ChunkOffset) - } - data := make([]byte, int(ce.ChunkSize)) + // make sure that this range is at ce.ChunkOffset for ce.ChunkSize + if nr != ce.ChunkOffset { + return fmt.Errorf("invalid offset %d != %d", nr, ce.ChunkOffset) + } - // Cache this chunk (offset: ce.ChunkOffset, size: ce.ChunkSize) - if _, err := io.ReadFull(tr, data); err != nil && err != io.EOF { - return err + // Check if the target chunks exists in the cache + id := genID(fe.Digest, ce.ChunkOffset, ce.ChunkSize) + if _, err := gr.cache.FetchAt(id, 0, nil, opts...); err != nil { + // missed cache, needs to fetch and add it to the cache + b := gr.bufPool.Get().(*bytes.Buffer) + b.Reset() + b.Grow(int(ce.ChunkSize)) + if _, err := io.CopyN(b, tr, ce.ChunkSize); err != nil { + gr.bufPool.Put(b) + return errors.Wrapf(err, + "failed to read file payload of %q (offset:%d,size:%d)", + h.Name, ce.ChunkOffset, ce.ChunkSize) + } + if int64(b.Len()) != ce.ChunkSize { + gr.bufPool.Put(b) + return fmt.Errorf("unexpected copied data size %d; want %d", + b.Len(), ce.ChunkSize) } - gr.cache.Add(id, data) + gr.cache.Add(id, b.Bytes()[:ce.ChunkSize], opts...) + gr.bufPool.Put(b) + + nr += ce.ChunkSize + continue + } + + // Discard the target chunk + if _, err := io.CopyN(ioutil.Discard, tr, ce.ChunkSize); err != nil { + return errors.Wrapf(err, + "failed to discard file payload of %q (offset:%d,size:%d)", + h.Name, ce.ChunkOffset, ce.ChunkSize) } nr += ce.ChunkSize } @@ -143,6 +175,7 @@ type file struct { ra io.ReaderAt r *stargz.Reader cache cache.BlobCache + gr *reader } // ReadAt reads chunks from the stargz file with trying to fetch as many chunks @@ -154,40 +187,51 @@ func (sf *file) ReadAt(p []byte, offset int64) (int, error) { if !ok { break } - id := genID(sf.digest, ce.ChunkOffset, ce.ChunkSize) - if cached, err := sf.cache.Fetch(id); err == nil && int64(len(cached)) == ce.ChunkSize { - nr += copy(p[nr:], cached[offset+int64(nr)-ce.ChunkOffset:]) - } else { - var ( - ip []byte - tmp bool - lowerUnread = positive(offset - ce.ChunkOffset) - upperUnread = positive(ce.ChunkOffset + ce.ChunkSize - (offset + int64(len(p)))) - ) - if lowerUnread == 0 && upperUnread == 0 { - ip = p[nr : int64(nr)+ce.ChunkSize] - } else { - // Use temporally buffer for aligning this chunk - ip = make([]byte, ce.ChunkSize) - tmp = true - } + var ( + id = genID(sf.digest, ce.ChunkOffset, ce.ChunkSize) + lowerDiscard = positive(offset - ce.ChunkOffset) + upperDiscard = positive(ce.ChunkOffset + ce.ChunkSize - (offset + int64(len(p)))) + expectedSize = ce.ChunkSize - upperDiscard - lowerDiscard + ) + + // Check if the content exists in the cache + n, err := sf.cache.FetchAt(id, lowerDiscard, p[nr:int64(nr)+expectedSize]) + if err == nil && int64(n) == expectedSize { + nr += n + continue + } + + // We missed cache. Take it from underlying reader. + // We read the whole chunk here and add it to the cache so that following + // reads against neighboring chunks can take the data without decmpression. + if lowerDiscard == 0 && upperDiscard == 0 { + // We can directly store the result to the given buffer + ip := p[nr : int64(nr)+ce.ChunkSize] n, err := sf.ra.ReadAt(ip, ce.ChunkOffset) if err != nil && err != io.EOF { return 0, errors.Wrap(err, "failed to read data") - } else if int64(n) != ce.ChunkSize { - return 0, fmt.Errorf("invalid chunk size %d; want %d", n, ce.ChunkSize) - } - if tmp { - // Write temporally buffer to resulting slice - n = copy(p[nr:], ip[lowerUnread:ce.ChunkSize-upperUnread]) - if int64(n) != ce.ChunkSize-upperUnread-lowerUnread { - return 0, fmt.Errorf("unexpected final data size %d; want %d", - n, ce.ChunkSize-upperUnread-lowerUnread) - } } sf.cache.Add(id, ip) nr += n + continue + } + + // Use temporally buffer for aligning this chunk + b := sf.gr.bufPool.Get().(*bytes.Buffer) + b.Reset() + b.Grow(int(ce.ChunkSize)) + ip := b.Bytes()[:ce.ChunkSize] + if _, err := sf.ra.ReadAt(ip, ce.ChunkOffset); err != nil && err != io.EOF { + sf.gr.bufPool.Put(b) + return 0, errors.Wrap(err, "failed to read data") + } + sf.cache.Add(id, ip) + n = copy(p[nr:], ip[lowerDiscard:ce.ChunkSize-upperDiscard]) + sf.gr.bufPool.Put(b) + if int64(n) != expectedSize { + return 0, fmt.Errorf("unexpected final data size %d; want %d", n, expectedSize) } + nr += n } return nr, nil diff --git a/stargz/reader/reader_test.go b/stargz/reader/reader_test.go index af3b6f05a..ee9936e6e 100644 --- a/stargz/reader/reader_test.go +++ b/stargz/reader/reader_test.go @@ -101,11 +101,11 @@ func (br *breakReaderAt) ReadAt(p []byte, off int64) (int, error) { type nopCache struct{} -func (nc *nopCache) Fetch(blobHash string) ([]byte, error) { - return nil, fmt.Errorf("Missed cache: %q", blobHash) +func (nc *nopCache) FetchAt(key string, offset int64, p []byte, opts ...cache.Option) (int, error) { + return 0, fmt.Errorf("Missed cache: %q", key) } -func (nc *nopCache) Add(blobHash string, p []byte) {} +func (nc *nopCache) Add(key string, p []byte, opts ...cache.Option) {} type testCache struct { membuf map[string]string @@ -113,22 +113,22 @@ type testCache struct { mu sync.Mutex } -func (tc *testCache) Fetch(blobHash string) ([]byte, error) { +func (tc *testCache) FetchAt(key string, offset int64, p []byte, opts ...cache.Option) (int, error) { tc.mu.Lock() defer tc.mu.Unlock() - cache, ok := tc.membuf[blobHash] + cache, ok := tc.membuf[key] if !ok { - return nil, fmt.Errorf("Missed cache: %q", blobHash) + return 0, fmt.Errorf("Missed cache: %q", key) } - return []byte(cache), nil + return copy(p, cache[offset:]), nil } -func (tc *testCache) Add(blobHash string, p []byte) { +func (tc *testCache) Add(key string, p []byte, opts ...cache.Option) { tc.mu.Lock() defer tc.mu.Unlock() - tc.membuf[blobHash] = string(p) - tc.t.Logf(" cached [%s...]: %q", blobHash[:8], string(p)) + tc.membuf[key] = string(p) + tc.t.Logf(" cached [%s...]: %q", key[:8], string(p)) } type region struct{ b, e int64 } @@ -220,8 +220,9 @@ func TestFileReadAt(t *testing.T) { if !ok { break } - data, err := f.cache.Fetch(genID(f.digest, ce.ChunkOffset, ce.ChunkSize)) - if err != nil || len(data) != int(ce.ChunkSize) { + data := make([]byte, ce.ChunkSize) + n, err := f.cache.FetchAt(genID(f.digest, ce.ChunkOffset, ce.ChunkSize), 0, data) + if err != nil || n != int(ce.ChunkSize) { t.Errorf("missed cache of offset=%d, size=%d: %v(got size=%d)", ce.ChunkOffset, ce.ChunkSize, err, n) return } diff --git a/stargz/remote/blob.go b/stargz/remote/blob.go index b2b8e1a15..d4b1e8be0 100644 --- a/stargz/remote/blob.go +++ b/stargz/remote/blob.go @@ -62,6 +62,8 @@ type blob struct { fetchedRegionSet regionSet fetchedRegionSetMu sync.Mutex + + resolver *Resolver } func (b *blob) Authn(tr http.RoundTripper) (http.RoundTripper, error) { @@ -96,13 +98,24 @@ func (b *blob) FetchedSize() int64 { } func (b *blob) Cache(offset int64, size int64, opts ...Option) error { + var cacheOpts options + for _, o := range opts { + o(&cacheOpts) + } + + b.fetcherMu.Lock() + fr := b.fetcher + b.fetcherMu.Unlock() + fetchReg := region{floor(offset, b.chunkSize), ceil(offset+size-1, b.chunkSize) - 1} discard := make(map[region]io.Writer) b.walkChunks(fetchReg, func(reg region) error { - discard[reg] = ioutil.Discard // do not read chunks (only cached) + if _, err := b.cache.FetchAt(fr.genID(reg), 0, nil, cacheOpts.cacheOpts...); err != nil { + discard[reg] = ioutil.Discard + } return nil }) - if err := b.fetchRange(discard, opts...); err != nil { + if err := b.fetchRange(discard, &cacheOpts); err != nil { return err } @@ -120,36 +133,75 @@ func (b *blob) ReadAt(p []byte, offset int64, opts ...Option) (int, error) { // Make the buffer chunk aligned allRegion := region{floor(offset, b.chunkSize), ceil(offset+int64(len(p))-1, b.chunkSize) - 1} allData := make(map[region]io.Writer) + var putBufs []*bytes.Buffer + defer func() { + for _, bf := range putBufs { + b.resolver.bufPool.Put(bf) + } + }() + + var readAtOpts options + for _, o := range opts { + o(&readAtOpts) + } + + // Fetcher can be suddenly updated so we take and use the snapshot of it for + // consistency. + b.fetcherMu.Lock() + fr := b.fetcher + b.fetcherMu.Unlock() + var commits []func() error b.walkChunks(allRegion, func(chunk region) error { var ( - ip []byte - base = positive(chunk.b - offset) - lowerUnread = positive(offset - chunk.b) - upperUnread = positive(chunk.e + 1 - (offset + int64(len(p)))) + base = positive(chunk.b - offset) + lowerUnread = positive(offset - chunk.b) + upperUnread = positive(chunk.e + 1 - (offset + int64(len(p)))) + expectedSize = chunk.size() - upperUnread - lowerUnread ) + + // Check if the content exists in the cache + n, err := b.cache.FetchAt(fr.genID(chunk), lowerUnread, p[base:base+expectedSize], readAtOpts.cacheOpts...) + if err == nil && n == int(expectedSize) { + return nil + } + + // We missed cache. Take it from remote registry. + // We get the whole chunk here and add it to the cache so that following + // reads against neighboring chunks can take the data without making HTTP requests. if lowerUnread == 0 && upperUnread == 0 { - ip = p[base : base+chunk.size()] + // We can directly store the result in the given buffer + allData[chunk] = &byteWriter{ + p: p[base : base+chunk.size()], + } } else { // Use temporally buffer for aligning this chunk - ip = make([]byte, chunk.size()) + bf := b.resolver.bufPool.Get().(*bytes.Buffer) + putBufs = append(putBufs, bf) + bf.Reset() + bf.Grow(int(chunk.size())) + allData[chunk] = bf + + // Function for committing the buffered chunk into the result slice. commits = append(commits, func() error { - n := copy(p[base:], ip[lowerUnread:chunk.size()-upperUnread]) - if int64(n) != chunk.size()-upperUnread-lowerUnread { + if int64(bf.Len()) != chunk.size() { return fmt.Errorf("unexpected data size %d; want %d", - n, chunk.size()-upperUnread-lowerUnread) + bf.Len(), chunk.size()) + } + bb := bf.Bytes()[:chunk.size()] + n := copy(p[base:], bb[lowerUnread:chunk.size()-upperUnread]) + if int64(n) != expectedSize { + return fmt.Errorf("invalid copied data size %d; want %d", + n, expectedSize) } return nil }) } - allData[chunk] = &byteWriter{ - p: ip, - } return nil }) // Read required data - if err := b.fetchRange(allData, opts...); err != nil { + if err := b.fetchRange(allData, &readAtOpts); err != nil { return 0, err } @@ -172,40 +224,27 @@ func (b *blob) ReadAt(p []byte, offset int64, opts ...Option) (int, error) { } // fetchRange fetches all specified chunks from local cache and remote blob. -func (b *blob) fetchRange(allData map[region]io.Writer, opts ...Option) error { +func (b *blob) fetchRange(allData map[region]io.Writer, opts *options) error { + if len(allData) == 0 { + return nil + } + // Fetcher can be suddenly updated so we take and use the snapshot of it for // consistency. b.fetcherMu.Lock() fr := b.fetcher b.fetcherMu.Unlock() - // Read data from cache - fetched := make(map[region]bool) - for chunk := range allData { - data, err := b.cache.Fetch(fr.genID(chunk)) - if err != nil || int64(len(data)) != chunk.size() { - fetched[chunk] = false // missed cache, needs to fetch remotely. - continue - } - if n, err := io.Copy(allData[chunk], bytes.NewReader(data)); err != nil { - return err - } else if n != chunk.size() { - return fmt.Errorf("unexpected cached data size %d; want %d", n, chunk.size()) - } - } - if len(fetched) == 0 { - // We successfully served whole range from cache - return nil - } - // request missed regions var req []region - for reg := range fetched { + fetched := make(map[region]bool) + for reg := range allData { req = append(req, reg) + fetched[reg] = false } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - mr, err := fr.fetch(ctx, req, opts...) + mr, err := fr.fetch(ctx, req, opts) if err != nil { return err } @@ -224,23 +263,34 @@ func (b *blob) fetchRange(allData map[region]io.Writer, opts ...Option) error { return errors.Wrapf(err, "failed to read multipart resp") } if err := b.walkChunks(reg, func(chunk region) error { - data := make([]byte, chunk.size()) - if _, err := io.ReadFull(p, data); err != nil { + + // Prepare the temporary buffer + bf := b.resolver.bufPool.Get().(*bytes.Buffer) + defer b.resolver.bufPool.Put(bf) + bf.Reset() + bf.Grow(int(chunk.size())) + w := io.Writer(bf) + + // If this chunk is one of the targets, write the content to the + // passed reader too. + if _, ok := fetched[chunk]; ok { + w = io.MultiWriter(bf, allData[chunk]) + } + + // Copy the target chunk + if _, err := io.CopyN(w, p, chunk.size()); err != nil { return err + } else if int64(bf.Len()) != chunk.size() { + return fmt.Errorf("unexpected fetched data size %d; want %d", + bf.Len(), chunk.size()) } - b.cache.Add(fr.genID(chunk), data) + + // Add the target chunk to the cache + b.cache.Add(fr.genID(chunk), bf.Bytes()[:chunk.size()], opts.cacheOpts...) b.fetchedRegionSetMu.Lock() b.fetchedRegionSet.add(chunk) b.fetchedRegionSetMu.Unlock() - if _, ok := fetched[chunk]; ok { - fetched[chunk] = true - if n, err := io.Copy(allData[chunk], bytes.NewReader(data)); err != nil { - return errors.Wrap(err, "failed to write chunk to buffer") - } else if n != chunk.size() { - return fmt.Errorf("unexpected fetched data size %d; want %d", - n, chunk.size()) - } - } + fetched[chunk] = true return nil }); err != nil { return errors.Wrapf(err, "failed to get chunks") diff --git a/stargz/remote/blob_test.go b/stargz/remote/blob_test.go index 28cfd38bc..25ca1ec25 100644 --- a/stargz/remote/blob_test.go +++ b/stargz/remote/blob_test.go @@ -37,6 +37,8 @@ import ( "sync" "testing" "time" + + "github.com/containerd/stargz-snapshotter/cache" ) const ( @@ -213,8 +215,9 @@ func checkAllCached(t *testing.T, r *blob, offset, size int64) { cn := 0 whole := region{floor(offset, r.chunkSize), ceil(offset+size-1, r.chunkSize) - 1} if err := r.walkChunks(whole, func(reg region) error { - data, err := r.cache.Fetch(r.fetcher.genID(reg)) - if err != nil || int64(len(data)) != reg.size() { + data := make([]byte, reg.size()) + n, err := r.cache.FetchAt(r.fetcher.genID(reg), 0, data) + if err != nil || int64(n) != reg.size() { return fmt.Errorf("missed cache of region={%d,%d}(size=%d): %v", reg.b, reg.e, reg.size(), err) } @@ -279,6 +282,13 @@ func makeBlob(t *testing.T, size int64, chunkSize int64, fn RoundTripFunc) *blob size: size, chunkSize: chunkSize, cache: &testCache{membuf: map[string]string{}, t: t}, + resolver: &Resolver{ + bufPool: sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + }, + }, } } @@ -288,22 +298,22 @@ type testCache struct { mu sync.Mutex } -func (tc *testCache) Fetch(blobHash string) ([]byte, error) { +func (tc *testCache) FetchAt(key string, offset int64, p []byte, opts ...cache.Option) (int, error) { tc.mu.Lock() defer tc.mu.Unlock() - cache, ok := tc.membuf[blobHash] + cache, ok := tc.membuf[key] if !ok { - return nil, fmt.Errorf("Missed cache: %q", blobHash) + return 0, fmt.Errorf("Missed cache: %q", key) } - return []byte(cache), nil + return copy(p, cache[offset:]), nil } -func (tc *testCache) Add(blobHash string, p []byte) { +func (tc *testCache) Add(key string, p []byte, opts ...cache.Option) { tc.mu.Lock() defer tc.mu.Unlock() - tc.membuf[blobHash] = string(p) - tc.t.Logf(" cached [%s...]: %q", blobHash[:8], string(p)) + tc.membuf[key] = string(p) + tc.t.Logf(" cached [%s...]: %q", key[:8], string(p)) } func TestCheckInterval(t *testing.T) { diff --git a/stargz/remote/resolver.go b/stargz/remote/resolver.go index c28c08714..b8d36cec4 100644 --- a/stargz/remote/resolver.go +++ b/stargz/remote/resolver.go @@ -23,6 +23,7 @@ package remote import ( + "bytes" "context" "crypto/sha256" "fmt" @@ -84,6 +85,11 @@ func NewResolver(keychain authn.Keychain, config ResolverConfig) *Resolver { trPool: lru.New(poolEntry), keychain: keychain, config: config, + bufPool: sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, + }, } } @@ -93,6 +99,7 @@ type Resolver struct { trPoolMu sync.Mutex keychain authn.Keychain config ResolverConfig + bufPool sync.Pool } func (r *Resolver) Resolve(ref, digest string, cache cache.BlobCache, config BlobConfig) (Blob, error) { @@ -124,6 +131,7 @@ func (r *Resolver) Resolve(ref, digest string, cache cache.BlobCache, config Blo cache: cache, lastCheck: time.Now(), checkInterval: checkInterval, + resolver: r, }, nil } @@ -352,7 +360,7 @@ type multipartReadCloser interface { Close() error } -func (f *fetcher) fetch(ctx context.Context, rs []region, opts ...Option) (multipartReadCloser, error) { +func (f *fetcher) fetch(ctx context.Context, rs []region, opts *options) (multipartReadCloser, error) { if len(rs) == 0 { return nil, fmt.Errorf("no request queried") } @@ -362,16 +370,11 @@ func (f *fetcher) fetch(ctx context.Context, rs []region, opts ...Option) (multi singleRangeMode = f.isSingleRangeMode() ) - // Parse options - var opt options - for _, o := range opts { - o(&opt) + if opts.ctx != nil { + ctx = opts.ctx } - if opt.ctx != nil { - ctx = opt.ctx - } - if opt.tr != nil { - tr = opt.tr + if opts.tr != nil { + tr = opts.tr } // squash requesting chunks for reducing the total size of request header @@ -428,8 +431,8 @@ func (f *fetcher) fetch(ctx context.Context, rs []region, opts ...Option) (multi } return singlePartReader(reg, res.Body), nil } else if !singleRangeMode { - f.singleRangeMode() // fallbacks to singe range request mode - return f.fetch(ctx, rs, opts...) // retries with the single range mode + f.singleRangeMode() // fallbacks to singe range request mode + return f.fetch(ctx, rs, opts) // retries with the single range mode } return nil, fmt.Errorf("unexpected status code on %q: %v", f.url, res.Status) @@ -548,8 +551,9 @@ func parseRange(header string) (region, error) { type Option func(*options) type options struct { - ctx context.Context - tr http.RoundTripper + ctx context.Context + tr http.RoundTripper + cacheOpts []cache.Option } func WithContext(ctx context.Context) Option { @@ -563,3 +567,9 @@ func WithRoundTripper(tr http.RoundTripper) Option { opts.tr = tr } } + +func WithCacheOpts(cacheOpts ...cache.Option) Option { + return func(opts *options) { + opts.cacheOpts = cacheOpts + } +}