From 83260da5f1095d88866935ef031e9a4e7f91b428 Mon Sep 17 00:00:00 2001 From: Kohei Tokunaga Date: Tue, 25 Jan 2022 11:32:33 +0900 Subject: [PATCH] Make resolving timeout configurable Signed-off-by: Kohei Tokunaga --- fs/config/config.go | 2 ++ fs/fs.go | 18 +++++++--- fs/layer/layer.go | 87 ++++++++++++++++++++++++++++++++++----------- store/manager.go | 13 +++++-- 4 files changed, 91 insertions(+), 29 deletions(-) diff --git a/fs/config/config.go b/fs/config/config.go index 5550c2fd5..fd1d06680 100644 --- a/fs/config/config.go +++ b/fs/config/config.go @@ -42,6 +42,8 @@ type Config struct { ResolveResultEntry int `toml:"resolve_result_entry"` // deprecated PrefetchSize int64 `toml:"prefetch_size"` PrefetchTimeoutSec int64 `toml:"prefetch_timeout_sec"` + ResolveTimeoutSec int64 `toml:"resolve_timeout_sec"` + ResolveRequestTimeoutSec int64 `toml:"resolve_request_timeout_sec"` NoPrefetch bool `toml:"noprefetch"` NoBackgroundFetch bool `toml:"no_background_fetch"` Debug bool `toml:"debug"` diff --git a/fs/fs.go b/fs/fs.go index b27f19119..a44fd7758 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -68,9 +68,10 @@ import ( ) const ( - defaultFuseTimeout = time.Second - defaultMaxConcurrency = 2 - fusermountBin = "fusermount" + defaultFuseTimeout = time.Second + defaultResolveTimeoutSec = 60 + defaultMaxConcurrency = 2 + fusermountBin = "fusermount" ) type Option func(*options) @@ -122,6 +123,11 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F entryTimeout = defaultFuseTimeout } + resolveTimeout := time.Duration(cfg.ResolveTimeoutSec) * time.Second + if resolveTimeout == 0 { + resolveTimeout = time.Duration(defaultResolveTimeoutSec) * time.Second + } + metadataStore := fsOpts.metadataStore if metadataStore == nil { metadataStore = memorymetadata.NewReader @@ -163,6 +169,7 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F metricsController: c, attrTimeout: attrTimeout, entryTimeout: entryTimeout, + resolveTimeout: resolveTimeout, }, nil } @@ -181,6 +188,7 @@ type filesystem struct { metricsController *layermetrics.Controller attrTimeout time.Duration entryTimeout time.Duration + resolveTimeout time.Duration } func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[string]string) (retErr error) { @@ -255,8 +263,8 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s case err := <-errChan: log.G(ctx).WithError(err).Debug("failed to resolve layer") return errors.Wrapf(err, "failed to resolve layer") - case <-time.After(30 * time.Second): - log.G(ctx).Debug("failed to resolve layer (timeout)") + case <-time.After(fs.resolveTimeout): + log.G(ctx).WithField("timeout(sec)", fs.resolveTimeout.Seconds()).Debug("failed to resolve layer (timeout)") return fmt.Errorf("failed to resolve layer (timeout)") } defer func() { diff --git a/fs/layer/layer.go b/fs/layer/layer.go index fa0e5ccf2..8e2b82a30 100644 --- a/fs/layer/layer.go +++ b/fs/layer/layer.go @@ -59,6 +59,8 @@ const ( defaultMaxLRUCacheEntry = 10 defaultMaxCacheFds = 10 defaultPrefetchTimeoutSec = 10 + defaultResolveTimeoutSec = 60 + defaultResolveRequestTimeoutSec = 30 memoryCacheType = "memory" ) @@ -117,6 +119,8 @@ type Resolver struct { rootDir string resolver *remote.Resolver prefetchTimeout time.Duration + resolveTimeout time.Duration + resolveRequestTimeout time.Duration layerCache *cacheutil.TTLCache layerCacheMu sync.Mutex blobCache *cacheutil.TTLCache @@ -137,6 +141,14 @@ func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager, if prefetchTimeout == 0 { prefetchTimeout = defaultPrefetchTimeoutSec * time.Second } + resolveTimeout := time.Duration(cfg.ResolveTimeoutSec) * time.Second + if resolveTimeout == 0 { + resolveTimeout = time.Duration(defaultResolveTimeoutSec) * time.Second + } + resolveRequestTimeout := time.Duration(cfg.ResolveRequestTimeoutSec) * time.Second + if resolveRequestTimeout == 0 { + resolveRequestTimeout = time.Duration(defaultResolveRequestTimeoutSec) * time.Second + } // layerCache caches resolved layers for future use. This is useful in a use-case where // the filesystem resolves and caches all layers in an image (not only queried one) in parallel, @@ -171,6 +183,8 @@ func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager, layerCache: layerCache, blobCache: blobCache, prefetchTimeout: prefetchTimeout, + resolveTimeout: resolveTimeout, + resolveRequestTimeout: resolveRequestTimeout, backgroundTaskManager: backgroundTaskManager, config: cfg, resolveLock: new(namedmutex.NamedMutex), @@ -236,6 +250,8 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs defer r.resolveLock.Unlock(name) ctx = log.WithLogger(ctx, log.G(ctx).WithField("src", name)) + ctx, cancel := context.WithTimeout(ctx, r.resolveTimeout) + defer cancel() // First, try to retrieve this layer from the underlying cache. r.layerCacheMu.Lock() @@ -256,7 +272,7 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs log.G(ctx).Debugf("resolving") // Resolve the blob. - blobR, err := r.resolveBlob(ctx, hosts, refspec, desc) + blobR, err := r.resolveBlob(ctx, hosts, refspec, desc, name) if err != nil { return nil, errors.Wrapf(err, "failed to resolve the blob") } @@ -280,11 +296,52 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs // Each file's read operation is a prioritized task and all background tasks // will be stopped during the execution so this can avoid being disturbed for // NW traffic by background tasks. + var ( + // Use context during resolving, to make it cancellable + curR = readerAtFunc(func(p []byte, offset int64) (n int, err error) { + ctx, cancel := context.WithTimeout(ctx, r.resolveRequestTimeout) + defer cancel() + return blobR.ReadAt(p, offset, remote.WithContext(ctx)) + }) + curRMu sync.Mutex + ) sr := io.NewSectionReader(readerAtFunc(func(p []byte, offset int64) (n int, err error) { r.backgroundTaskManager.DoPrioritizedTask() defer r.backgroundTaskManager.DonePrioritizedTask() - return blobR.ReadAt(p, offset) + curRMu.Lock() + br := curR + curRMu.Unlock() + return br.ReadAt(p, offset) }), 0, blobR.Size()) + vr, err := r.newReader(sr, desc, fsCache, esgzOpts...) + if err != nil { + cErr := ctx.Err() + if errors.Is(cErr, context.DeadlineExceeded) { + r.blobCacheMu.Lock() + r.blobCache.Remove(name) + r.blobCacheMu.Unlock() + } + return nil, errors.Wrap(err, "failed to read layer") + } + // do not propagate context after resolve is done + curRMu.Lock() + curR = readerAtFunc(func(p []byte, offset int64) (n int, err error) { return blobR.ReadAt(p, offset) }) + curRMu.Unlock() + + // Combine layer information together and cache it. + l := newLayer(r, desc, blobR, vr) + r.layerCacheMu.Lock() + cachedL, done2, added := r.layerCache.Add(name, l) + r.layerCacheMu.Unlock() + if !added { + l.close() // layer already exists in the cache. discrad this. + } + + log.G(ctx).Debugf("resolved") + return &layerRef{cachedL.(*layer), done2}, nil +} + +func (r *Resolver) newReader(sr *io.SectionReader, desc ocispec.Descriptor, fsCache cache.BlobCache, esgzOpts ...metadata.Option) (*reader.VerifiableReader, error) { // define telemetry hooks to measure latency metrics inside estargz package telemetry := metadata.Telemetry{ GetFooterLatency: func(start time.Time) { @@ -306,36 +363,24 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs if err != nil { return nil, errors.Wrap(err, "failed to read layer") } - - // Combine layer information together and cache it. - l := newLayer(r, desc, blobR, vr) - r.layerCacheMu.Lock() - cachedL, done2, added := r.layerCache.Add(name, l) - r.layerCacheMu.Unlock() - if !added { - l.close() // layer already exists in the cache. discrad this. - } - - log.G(ctx).Debugf("resolved") - return &layerRef{cachedL.(*layer), done2}, nil + return vr, nil } // resolveBlob resolves a blob based on the passed layer blob information. -func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) (_ *blobRef, retErr error) { - name := refspec.String() + "/" + desc.Digest.String() - +func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor, cacheKey string) (_ *blobRef, retErr error) { // Try to retrieve the blob from the underlying cache. r.blobCacheMu.Lock() - c, done, ok := r.blobCache.Get(name) + c, done, ok := r.blobCache.Get(cacheKey) r.blobCacheMu.Unlock() if ok { - if blob := c.(remote.Blob); blob.Check() == nil { + blob := c.(remote.Blob) + if err := blob.Check(); err == nil { return &blobRef{blob, done}, nil } // invalid blob. discard this. done() r.blobCacheMu.Lock() - r.blobCache.Remove(name) + r.blobCache.Remove(cacheKey) r.blobCacheMu.Unlock() } @@ -355,7 +400,7 @@ func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts, return nil, errors.Wrap(err, "failed to resolve the source") } r.blobCacheMu.Lock() - cachedB, done, added := r.blobCache.Add(name, b) + cachedB, done, added := r.blobCache.Add(cacheKey, b) r.blobCacheMu.Unlock() if !added { b.Close() // blob already exists in the cache. discard this. diff --git a/store/manager.go b/store/manager.go index 1397aaa96..8fc23ae01 100644 --- a/store/manager.go +++ b/store/manager.go @@ -46,7 +46,8 @@ const ( prepareSucceeded = "true" prepareFailed = "false" - defaultMaxConcurrency = 2 + defaultMaxConcurrency = 2 + defaultResolveTimeoutSec = 60 ) func NewLayerManager(ctx context.Context, root string, hosts source.RegistryHosts, metadataStore metadata.Store, cfg config.Config) (*LayerManager, error) { @@ -71,6 +72,10 @@ func NewLayerManager(ctx context.Context, root string, hosts source.RegistryHost if ns != nil { metrics.Register(ns) } + resolveTimeout := time.Duration(cfg.ResolveTimeoutSec) * time.Second + if resolveTimeout == 0 { + resolveTimeout = time.Duration(defaultResolveTimeoutSec) * time.Second + } return &LayerManager{ refPool: refPool, hosts: hosts, @@ -85,6 +90,7 @@ func NewLayerManager(ctx context.Context, root string, hosts source.RegistryHost resolveLock: new(namedmutex.NamedMutex), layer: make(map[string]map[string]layer.Layer), refcounter: make(map[string]map[string]int), + resolveTimeout: resolveTimeout, }, nil } @@ -102,6 +108,7 @@ type LayerManager struct { disableVerification bool metricsController *layermetrics.Controller resolveLock *namedmutex.NamedMutex + resolveTimeout time.Duration layer map[string]map[string]layer.Layer refcounter map[string]map[string]int @@ -220,8 +227,8 @@ func (r *LayerManager) getLayer(ctx context.Context, refspec reference.Spec, dgs case err := <-errChan: log.G(ctx).WithError(err).Debug("failed to resolve layer") return nil, errors.Wrapf(err, "failed to resolve layer") - case <-time.After(30 * time.Second): - log.G(ctx).Debug("failed to resolve layer (timeout)") + case <-time.After(r.resolveTimeout): + log.G(ctx).WithField("timeout(sec)", r.resolveTimeout.Seconds()).Debug("failed to resolve layer (timeout)") return nil, fmt.Errorf("failed to resolve layer (timeout)") }