Skip to content

Commit

Permalink
Merge pull request #600 from ktock/cachettl
Browse files Browse the repository at this point in the history
layer resolver: Avoid many cache misses occur when many pullings of images happen
  • Loading branch information
AkihiroSuda committed Jan 25, 2022
2 parents e3a38fc + 16166d7 commit 38baee4
Show file tree
Hide file tree
Showing 8 changed files with 376 additions and 94 deletions.
14 changes: 7 additions & 7 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"path/filepath"
"sync"

"github.com/containerd/stargz-snapshotter/util/lrucache"
"github.com/containerd/stargz-snapshotter/util/cacheutil"
"github.com/containerd/stargz-snapshotter/util/namedmutex"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
Expand All @@ -51,11 +51,11 @@ type DirectoryCacheConfig struct {

// DataCache is an on-memory cache of the data.
// OnEvicted will be overridden and replaced for internal use.
DataCache *lrucache.Cache
DataCache *cacheutil.LRUCache

// FdCache is a cache for opened file descriptors.
// OnEvicted will be overridden and replaced for internal use.
FdCache *lrucache.Cache
FdCache *cacheutil.LRUCache

// BufPool will be used for pooling bytes.Buffer.
BufPool *sync.Pool
Expand Down Expand Up @@ -130,7 +130,7 @@ func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache
if maxEntry == 0 {
maxEntry = defaultMaxLRUCacheEntry
}
dataCache = lrucache.New(maxEntry)
dataCache = cacheutil.NewLRUCache(maxEntry)
dataCache.OnEvicted = func(key string, value interface{}) {
value.(*bytes.Buffer).Reset()
bufPool.Put(value)
Expand All @@ -142,7 +142,7 @@ func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache
if maxEntry == 0 {
maxEntry = defaultMaxCacheFds
}
fdCache = lrucache.New(maxEntry)
fdCache = cacheutil.NewLRUCache(maxEntry)
fdCache.OnEvicted = func(key string, value interface{}) {
value.(*os.File).Close()
}
Expand All @@ -169,8 +169,8 @@ func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache

// directoryCache is a cache implementation which backend is a directory.
type directoryCache struct {
cache *lrucache.Cache
fileCache *lrucache.Cache
cache *cacheutil.LRUCache
fileCache *cacheutil.LRUCache
wipDirectory string
directory string
wipLock *namedmutex.NamedMutex
Expand Down
27 changes: 15 additions & 12 deletions fs/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,21 @@ const (
)

type Config struct {
HTTPCacheType string `toml:"http_cache_type"`
FSCacheType string `toml:"filesystem_cache_type"`
ResolveResultEntry int `toml:"resolve_result_entry"`
PrefetchSize int64 `toml:"prefetch_size"`
PrefetchTimeoutSec int64 `toml:"prefetch_timeout_sec"`
NoPrefetch bool `toml:"noprefetch"`
NoBackgroundFetch bool `toml:"no_background_fetch"`
Debug bool `toml:"debug"`
AllowNoVerification bool `toml:"allow_no_verification"`
DisableVerification bool `toml:"disable_verification"`
MaxConcurrency int64 `toml:"max_concurrency"`
NoPrometheus bool `toml:"no_prometheus"`
HTTPCacheType string `toml:"http_cache_type"`
FSCacheType string `toml:"filesystem_cache_type"`
// ResolveResultEntryTTLSec is TTL (in sec) to cache resolved layers for
// future use. (default 120s)
ResolveResultEntryTTLSec int `toml:"resolve_result_entry_ttl_sec"`
ResolveResultEntry int `toml:"resolve_result_entry"` // deprecated
PrefetchSize int64 `toml:"prefetch_size"`
PrefetchTimeoutSec int64 `toml:"prefetch_timeout_sec"`
NoPrefetch bool `toml:"noprefetch"`
NoBackgroundFetch bool `toml:"no_background_fetch"`
Debug bool `toml:"debug"`
AllowNoVerification bool `toml:"allow_no_verification"`
DisableVerification bool `toml:"disable_verification"`
MaxConcurrency int64 `toml:"max_concurrency"`
NoPrometheus bool `toml:"no_prometheus"`

// BlobConfig is config for layer blob management.
BlobConfig `toml:"blob"`
Expand Down
34 changes: 17 additions & 17 deletions fs/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
"github.com/containerd/stargz-snapshotter/fs/source"
"github.com/containerd/stargz-snapshotter/metadata"
"github.com/containerd/stargz-snapshotter/task"
"github.com/containerd/stargz-snapshotter/util/lrucache"
"github.com/containerd/stargz-snapshotter/util/cacheutil"
"github.com/containerd/stargz-snapshotter/util/namedmutex"
fusefs "github.com/hanwen/go-fuse/v2/fs"
digest "github.com/opencontainers/go-digest"
Expand All @@ -55,11 +55,11 @@ import (
)

const (
defaultResolveResultEntry = 30
defaultMaxLRUCacheEntry = 10
defaultMaxCacheFds = 10
defaultPrefetchTimeoutSec = 10
memoryCacheType = "memory"
defaultResolveResultEntryTTLSec = 120
defaultMaxLRUCacheEntry = 10
defaultMaxCacheFds = 10
defaultPrefetchTimeoutSec = 10
memoryCacheType = "memory"
)

// Layer represents a layer.
Expand Down Expand Up @@ -117,9 +117,9 @@ type Resolver struct {
rootDir string
resolver *remote.Resolver
prefetchTimeout time.Duration
layerCache *lrucache.Cache
layerCache *cacheutil.TTLCache
layerCacheMu sync.Mutex
blobCache *lrucache.Cache
blobCache *cacheutil.TTLCache
blobCacheMu sync.Mutex
backgroundTaskManager *task.BackgroundTaskManager
resolveLock *namedmutex.NamedMutex
Expand All @@ -129,9 +129,9 @@ type Resolver struct {

// NewResolver returns a new layer resolver.
func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager, cfg config.Config, resolveHandlers map[string]remote.Handler, metadataStore metadata.Store) (*Resolver, error) {
resolveResultEntry := cfg.ResolveResultEntry
if resolveResultEntry == 0 {
resolveResultEntry = defaultResolveResultEntry
resolveResultEntryTTL := time.Duration(cfg.ResolveResultEntryTTLSec) * time.Second
if resolveResultEntryTTL == 0 {
resolveResultEntryTTL = defaultResolveResultEntryTTLSec * time.Second
}
prefetchTimeout := time.Duration(cfg.PrefetchTimeoutSec) * time.Second
if prefetchTimeout == 0 {
Expand All @@ -141,7 +141,7 @@ func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager,
// layerCache caches resolved layers for future use. This is useful in a use-case where
// the filesystem resolves and caches all layers in an image (not only queried one) in parallel,
// before they are actually queried.
layerCache := lrucache.New(resolveResultEntry)
layerCache := cacheutil.NewTTLCache(resolveResultEntryTTL)
layerCache.OnEvicted = func(key string, value interface{}) {
if err := value.(*layer).close(); err != nil {
logrus.WithField("key", key).WithError(err).Warnf("failed to clean up layer")
Expand All @@ -152,7 +152,7 @@ func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager,

// blobCache caches resolved blobs for futural use. This is especially useful when a layer
// isn't eStargz/stargz (the *layer object won't be created/cached in this case).
blobCache := lrucache.New(resolveResultEntry)
blobCache := cacheutil.NewTTLCache(resolveResultEntryTTL)
blobCache.OnEvicted = func(key string, value interface{}) {
if err := value.(remote.Blob).Close(); err != nil {
logrus.WithField("key", key).WithError(err).Warnf("failed to clean up blob")
Expand Down Expand Up @@ -198,7 +198,7 @@ func newCache(root string, cacheType string, cfg config.Config) (cache.BlobCache
return new(bytes.Buffer)
},
}
dCache, fCache := lrucache.New(maxDataEntry), lrucache.New(maxFdEntry)
dCache, fCache := cacheutil.NewLRUCache(maxDataEntry), cacheutil.NewLRUCache(maxFdEntry)
dCache.OnEvicted = func(key string, value interface{}) {
value.(*bytes.Buffer).Reset()
bufPool.Put(value)
Expand Down Expand Up @@ -231,13 +231,13 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
name := refspec.String() + "/" + desc.Digest.String()

// Wait if resolving this layer is already running. The result
// can hopefully get from the LRU cache.
// can hopefully get from the cache.
r.resolveLock.Lock(name)
defer r.resolveLock.Unlock(name)

ctx = log.WithLogger(ctx, log.G(ctx).WithField("src", name))

// First, try to retrieve this layer from the underlying LRU cache.
// First, try to retrieve this layer from the underlying cache.
r.layerCacheMu.Lock()
c, done, ok := r.layerCache.Get(name)
r.layerCacheMu.Unlock()
Expand Down Expand Up @@ -324,7 +324,7 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) (_ *blobRef, retErr error) {
name := refspec.String() + "/" + desc.Digest.String()

// Try to retrieve the blob from the underlying LRU cache.
// Try to retrieve the blob from the underlying cache.
r.blobCacheMu.Lock()
c, done, ok := r.blobCache.Get(name)
r.blobCacheMu.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions store/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/stargz-snapshotter/fs/source"
"github.com/containerd/stargz-snapshotter/util/cacheutil"
"github.com/containerd/stargz-snapshotter/util/containerdutil"
"github.com/containerd/stargz-snapshotter/util/lrucache"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
Expand All @@ -54,7 +54,7 @@ func newRefPool(ctx context.Context, root string, hosts source.RegistryHosts) (*
hosts: hosts,
refcounter: make(map[string]*releaser),
}
p.cache = lrucache.New(refCacheEntry)
p.cache = cacheutil.NewLRUCache(refCacheEntry)
p.cache.OnEvicted = func(key string, value interface{}) {
refspec := value.(reference.Spec)
if err := os.RemoveAll(p.metadataDir(refspec)); err != nil {
Expand All @@ -71,7 +71,7 @@ type refPool struct {
hosts source.RegistryHosts

refcounter map[string]*releaser
cache *lrucache.Cache
cache *cacheutil.LRUCache
mu sync.Mutex
}

Expand Down
21 changes: 10 additions & 11 deletions util/lrucache/lrucache.go → util/cacheutil/lrucache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,18 @@
limitations under the License.
*/

// Package lrucache provides reference-count-aware lru cache.
package lrucache
package cacheutil

import (
"sync"

"github.com/golang/groupcache/lru"
)

// Cache is "groupcache/lru"-like cache. The difference is that "groupcache/lru" immediately
// LRUCache is "groupcache/lru"-like cache. The difference is that "groupcache/lru" immediately
// finalizes theevicted contents using OnEvicted callback but our version strictly tracks the
// reference counts of contents and calls OnEvicted when nobody refers to the evicted contents.
type Cache struct {
type LRUCache struct {
cache *lru.Cache
mu sync.Mutex

Expand All @@ -35,23 +34,23 @@ type Cache struct {
OnEvicted func(key string, value interface{})
}

// New creates new cache.
func New(maxEntries int) *Cache {
// NewLRUCache creates new lru cache.
func NewLRUCache(maxEntries int) *LRUCache {
inner := lru.New(maxEntries)
inner.OnEvicted = func(key lru.Key, value interface{}) {
// Decrease the ref count incremented in Add().
// When nobody refers to this value, this value will be finalized via refCounter.
value.(*refCounter).finalize()
}
return &Cache{
return &LRUCache{
cache: inner,
}
}

// Get retrieves the specified object from the cache and increments the reference counter of the
// target content. Client must call `done` callback to decrease the reference count when the value
// will no longer be used.
func (c *Cache) Get(key string) (value interface{}, done func(), ok bool) {
func (c *LRUCache) Get(key string) (value interface{}, done func(), ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
o, ok := c.cache.Get(key)
Expand All @@ -67,7 +66,7 @@ func (c *Cache) Get(key string) (value interface{}, done func(), ok bool) {
// If the specified content already exists in the cache, this sets `added` to false and returns
// "already cached" content (i.e. doesn't replace the content with the new one). Client must call
// `done` callback to decrease the counter when the value will no longer be used.
func (c *Cache) Add(key string, value interface{}) (cachedValue interface{}, done func(), added bool) {
func (c *LRUCache) Add(key string, value interface{}) (cachedValue interface{}, done func(), added bool) {
c.mu.Lock()
defer c.mu.Unlock()
if o, ok := c.cache.Get(key); ok {
Expand All @@ -88,13 +87,13 @@ func (c *Cache) Add(key string, value interface{}) (cachedValue interface{}, don

// Remove removes the specified contents from the cache. OnEvicted callback will be called when
// nobody refers to the removed content.
func (c *Cache) Remove(key string) {
func (c *LRUCache) Remove(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.cache.Remove(key)
}

func (c *Cache) decreaseOnceFunc(rc *refCounter) func() {
func (c *LRUCache) decreaseOnceFunc(rc *refCounter) func() {
var once sync.Once
return func() {
c.mu.Lock()
Expand Down
Loading

0 comments on commit 38baee4

Please sign in to comment.