Skip to content

Commit

Permalink
Make resolving timeout configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
  • Loading branch information
ktock committed Jan 26, 2022
1 parent 38baee4 commit 83260da
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 29 deletions.
2 changes: 2 additions & 0 deletions fs/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Config struct {
ResolveResultEntry int `toml:"resolve_result_entry"` // deprecated
PrefetchSize int64 `toml:"prefetch_size"`
PrefetchTimeoutSec int64 `toml:"prefetch_timeout_sec"`
ResolveTimeoutSec int64 `toml:"resolve_timeout_sec"`
ResolveRequestTimeoutSec int64 `toml:"resolve_request_timeout_sec"`
NoPrefetch bool `toml:"noprefetch"`
NoBackgroundFetch bool `toml:"no_background_fetch"`
Debug bool `toml:"debug"`
Expand Down
18 changes: 13 additions & 5 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ import (
)

const (
defaultFuseTimeout = time.Second
defaultMaxConcurrency = 2
fusermountBin = "fusermount"
defaultFuseTimeout = time.Second
defaultResolveTimeoutSec = 60
defaultMaxConcurrency = 2
fusermountBin = "fusermount"
)

type Option func(*options)
Expand Down Expand Up @@ -122,6 +123,11 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F
entryTimeout = defaultFuseTimeout
}

resolveTimeout := time.Duration(cfg.ResolveTimeoutSec) * time.Second
if resolveTimeout == 0 {
resolveTimeout = time.Duration(defaultResolveTimeoutSec) * time.Second
}

metadataStore := fsOpts.metadataStore
if metadataStore == nil {
metadataStore = memorymetadata.NewReader
Expand Down Expand Up @@ -163,6 +169,7 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F
metricsController: c,
attrTimeout: attrTimeout,
entryTimeout: entryTimeout,
resolveTimeout: resolveTimeout,
}, nil
}

Expand All @@ -181,6 +188,7 @@ type filesystem struct {
metricsController *layermetrics.Controller
attrTimeout time.Duration
entryTimeout time.Duration
resolveTimeout time.Duration
}

func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[string]string) (retErr error) {
Expand Down Expand Up @@ -255,8 +263,8 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
case err := <-errChan:
log.G(ctx).WithError(err).Debug("failed to resolve layer")
return errors.Wrapf(err, "failed to resolve layer")
case <-time.After(30 * time.Second):
log.G(ctx).Debug("failed to resolve layer (timeout)")
case <-time.After(fs.resolveTimeout):
log.G(ctx).WithField("timeout(sec)", fs.resolveTimeout.Seconds()).Debug("failed to resolve layer (timeout)")
return fmt.Errorf("failed to resolve layer (timeout)")
}
defer func() {
Expand Down
87 changes: 66 additions & 21 deletions fs/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ const (
defaultMaxLRUCacheEntry = 10
defaultMaxCacheFds = 10
defaultPrefetchTimeoutSec = 10
defaultResolveTimeoutSec = 60
defaultResolveRequestTimeoutSec = 30
memoryCacheType = "memory"
)

Expand Down Expand Up @@ -117,6 +119,8 @@ type Resolver struct {
rootDir string
resolver *remote.Resolver
prefetchTimeout time.Duration
resolveTimeout time.Duration
resolveRequestTimeout time.Duration
layerCache *cacheutil.TTLCache
layerCacheMu sync.Mutex
blobCache *cacheutil.TTLCache
Expand All @@ -137,6 +141,14 @@ func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager,
if prefetchTimeout == 0 {
prefetchTimeout = defaultPrefetchTimeoutSec * time.Second
}
resolveTimeout := time.Duration(cfg.ResolveTimeoutSec) * time.Second
if resolveTimeout == 0 {
resolveTimeout = time.Duration(defaultResolveTimeoutSec) * time.Second
}
resolveRequestTimeout := time.Duration(cfg.ResolveRequestTimeoutSec) * time.Second
if resolveRequestTimeout == 0 {
resolveRequestTimeout = time.Duration(defaultResolveRequestTimeoutSec) * time.Second
}

// layerCache caches resolved layers for future use. This is useful in a use-case where
// the filesystem resolves and caches all layers in an image (not only queried one) in parallel,
Expand Down Expand Up @@ -171,6 +183,8 @@ func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager,
layerCache: layerCache,
blobCache: blobCache,
prefetchTimeout: prefetchTimeout,
resolveTimeout: resolveTimeout,
resolveRequestTimeout: resolveRequestTimeout,
backgroundTaskManager: backgroundTaskManager,
config: cfg,
resolveLock: new(namedmutex.NamedMutex),
Expand Down Expand Up @@ -236,6 +250,8 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
defer r.resolveLock.Unlock(name)

ctx = log.WithLogger(ctx, log.G(ctx).WithField("src", name))
ctx, cancel := context.WithTimeout(ctx, r.resolveTimeout)
defer cancel()

// First, try to retrieve this layer from the underlying cache.
r.layerCacheMu.Lock()
Expand All @@ -256,7 +272,7 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
log.G(ctx).Debugf("resolving")

// Resolve the blob.
blobR, err := r.resolveBlob(ctx, hosts, refspec, desc)
blobR, err := r.resolveBlob(ctx, hosts, refspec, desc, name)
if err != nil {
return nil, errors.Wrapf(err, "failed to resolve the blob")
}
Expand All @@ -280,11 +296,52 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
// Each file's read operation is a prioritized task and all background tasks
// will be stopped during the execution so this can avoid being disturbed for
// NW traffic by background tasks.
var (
// Use context during resolving, to make it cancellable
curR = readerAtFunc(func(p []byte, offset int64) (n int, err error) {
ctx, cancel := context.WithTimeout(ctx, r.resolveRequestTimeout)
defer cancel()
return blobR.ReadAt(p, offset, remote.WithContext(ctx))
})
curRMu sync.Mutex
)
sr := io.NewSectionReader(readerAtFunc(func(p []byte, offset int64) (n int, err error) {
r.backgroundTaskManager.DoPrioritizedTask()
defer r.backgroundTaskManager.DonePrioritizedTask()
return blobR.ReadAt(p, offset)
curRMu.Lock()
br := curR
curRMu.Unlock()
return br.ReadAt(p, offset)
}), 0, blobR.Size())
vr, err := r.newReader(sr, desc, fsCache, esgzOpts...)
if err != nil {
cErr := ctx.Err()
if errors.Is(cErr, context.DeadlineExceeded) {
r.blobCacheMu.Lock()
r.blobCache.Remove(name)
r.blobCacheMu.Unlock()
}
return nil, errors.Wrap(err, "failed to read layer")
}
// do not propagate context after resolve is done
curRMu.Lock()
curR = readerAtFunc(func(p []byte, offset int64) (n int, err error) { return blobR.ReadAt(p, offset) })
curRMu.Unlock()

// Combine layer information together and cache it.
l := newLayer(r, desc, blobR, vr)
r.layerCacheMu.Lock()
cachedL, done2, added := r.layerCache.Add(name, l)
r.layerCacheMu.Unlock()
if !added {
l.close() // layer already exists in the cache. discrad this.
}

log.G(ctx).Debugf("resolved")
return &layerRef{cachedL.(*layer), done2}, nil
}

func (r *Resolver) newReader(sr *io.SectionReader, desc ocispec.Descriptor, fsCache cache.BlobCache, esgzOpts ...metadata.Option) (*reader.VerifiableReader, error) {
// define telemetry hooks to measure latency metrics inside estargz package
telemetry := metadata.Telemetry{
GetFooterLatency: func(start time.Time) {
Expand All @@ -306,36 +363,24 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
if err != nil {
return nil, errors.Wrap(err, "failed to read layer")
}

// Combine layer information together and cache it.
l := newLayer(r, desc, blobR, vr)
r.layerCacheMu.Lock()
cachedL, done2, added := r.layerCache.Add(name, l)
r.layerCacheMu.Unlock()
if !added {
l.close() // layer already exists in the cache. discrad this.
}

log.G(ctx).Debugf("resolved")
return &layerRef{cachedL.(*layer), done2}, nil
return vr, nil
}

// resolveBlob resolves a blob based on the passed layer blob information.
func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) (_ *blobRef, retErr error) {
name := refspec.String() + "/" + desc.Digest.String()

func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor, cacheKey string) (_ *blobRef, retErr error) {
// Try to retrieve the blob from the underlying cache.
r.blobCacheMu.Lock()
c, done, ok := r.blobCache.Get(name)
c, done, ok := r.blobCache.Get(cacheKey)
r.blobCacheMu.Unlock()
if ok {
if blob := c.(remote.Blob); blob.Check() == nil {
blob := c.(remote.Blob)
if err := blob.Check(); err == nil {
return &blobRef{blob, done}, nil
}
// invalid blob. discard this.
done()
r.blobCacheMu.Lock()
r.blobCache.Remove(name)
r.blobCache.Remove(cacheKey)
r.blobCacheMu.Unlock()
}

Expand All @@ -355,7 +400,7 @@ func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts,
return nil, errors.Wrap(err, "failed to resolve the source")
}
r.blobCacheMu.Lock()
cachedB, done, added := r.blobCache.Add(name, b)
cachedB, done, added := r.blobCache.Add(cacheKey, b)
r.blobCacheMu.Unlock()
if !added {
b.Close() // blob already exists in the cache. discard this.
Expand Down
13 changes: 10 additions & 3 deletions store/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ const (
prepareSucceeded = "true"
prepareFailed = "false"

defaultMaxConcurrency = 2
defaultMaxConcurrency = 2
defaultResolveTimeoutSec = 60
)

func NewLayerManager(ctx context.Context, root string, hosts source.RegistryHosts, metadataStore metadata.Store, cfg config.Config) (*LayerManager, error) {
Expand All @@ -71,6 +72,10 @@ func NewLayerManager(ctx context.Context, root string, hosts source.RegistryHost
if ns != nil {
metrics.Register(ns)
}
resolveTimeout := time.Duration(cfg.ResolveTimeoutSec) * time.Second
if resolveTimeout == 0 {
resolveTimeout = time.Duration(defaultResolveTimeoutSec) * time.Second
}
return &LayerManager{
refPool: refPool,
hosts: hosts,
Expand All @@ -85,6 +90,7 @@ func NewLayerManager(ctx context.Context, root string, hosts source.RegistryHost
resolveLock: new(namedmutex.NamedMutex),
layer: make(map[string]map[string]layer.Layer),
refcounter: make(map[string]map[string]int),
resolveTimeout: resolveTimeout,
}, nil
}

Expand All @@ -102,6 +108,7 @@ type LayerManager struct {
disableVerification bool
metricsController *layermetrics.Controller
resolveLock *namedmutex.NamedMutex
resolveTimeout time.Duration

layer map[string]map[string]layer.Layer
refcounter map[string]map[string]int
Expand Down Expand Up @@ -220,8 +227,8 @@ func (r *LayerManager) getLayer(ctx context.Context, refspec reference.Spec, dgs
case err := <-errChan:
log.G(ctx).WithError(err).Debug("failed to resolve layer")
return nil, errors.Wrapf(err, "failed to resolve layer")
case <-time.After(30 * time.Second):
log.G(ctx).Debug("failed to resolve layer (timeout)")
case <-time.After(r.resolveTimeout):
log.G(ctx).WithField("timeout(sec)", r.resolveTimeout.Seconds()).Debug("failed to resolve layer (timeout)")
return nil, fmt.Errorf("failed to resolve layer (timeout)")
}

Expand Down

0 comments on commit 83260da

Please sign in to comment.