diff --git a/registry/proxy/proxyblobstore.go b/registry/proxy/proxyblobstore.go index 0a082888e1..f482bb9eef 100644 --- a/registry/proxy/proxyblobstore.go +++ b/registry/proxy/proxyblobstore.go @@ -132,18 +132,18 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, return err } - blobRef, err := reference.WithDigest(pbs.repositoryName, dgst) - if err != nil { - dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err) - return err - } - - if pbs.scheduler != nil && pbs.ttl != nil { - if err := pbs.scheduler.AddBlob(blobRef, *pbs.ttl); err != nil { - dcontext.GetLogger(ctx).Errorf("Error adding blob: %s", err) - return err - } - } + // blobRef, err := reference.WithDigest(pbs.repositoryName, dgst) + // if err != nil { + // dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err) + // return err + // } + + // if pbs.scheduler != nil && pbs.ttl != nil { + // if err := pbs.scheduler.AddBlob(blobRef, *pbs.ttl); err != nil { + // dcontext.GetLogger(ctx).Errorf("Error adding blob: %s", err) + // return err + // } + // } return nil } diff --git a/registry/proxy/proxymanifeststore.go b/registry/proxy/proxymanifeststore.go index 29dcca9e71..f7a5623bce 100644 --- a/registry/proxy/proxymanifeststore.go +++ b/registry/proxy/proxymanifeststore.go @@ -55,6 +55,19 @@ func (pms proxyManifestStore) Get(ctx context.Context, dgst digest.Digest, optio fromRemote = true } + repoBlob, err := reference.WithDigest(pms.repositoryName, dgst) + if err != nil { + dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err) + return nil, err + } + + if pms.scheduler != nil && pms.ttl != nil { + if err := pms.scheduler.AddManifest(repoBlob, *pms.ttl); err != nil { + dcontext.GetLogger(ctx).Errorf("Error adding manifest: %s", err) + return nil, err + } + } + _, payload, err := manifest.Payload() if err != nil { return nil, err @@ -70,18 +83,18 @@ func (pms proxyManifestStore) Get(ctx context.Context, dgst digest.Digest, optio } // Schedule the manifest blob for removal - repoBlob, err := reference.WithDigest(pms.repositoryName, dgst) - if err != nil { - dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err) - return nil, err - } - - if pms.scheduler != nil && pms.ttl != nil { - if err := pms.scheduler.AddManifest(repoBlob, *pms.ttl); err != nil { - dcontext.GetLogger(ctx).Errorf("Error adding manifest: %s", err) - return nil, err - } - } + // repoBlob, err := reference.WithDigest(pms.repositoryName, dgst) + // if err != nil { + // dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err) + // return nil, err + // } + + // if pms.scheduler != nil && pms.ttl != nil { + // if err := pms.scheduler.AddManifest(repoBlob, *pms.ttl); err != nil { + // dcontext.GetLogger(ctx).Errorf("Error adding manifest: %s", err) + // return nil, err + // } + // } // Ensure the manifest blob is cleaned up // pms.scheduler.AddBlob(blobRef, repositoryTTL) diff --git a/registry/proxy/proxyregistry.go b/registry/proxy/proxyregistry.go index 33dcc4afa1..5fee7130fa 100644 --- a/registry/proxy/proxyregistry.go +++ b/registry/proxy/proxyregistry.go @@ -39,7 +39,7 @@ func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Name return nil, err } - v := storage.NewVacuum(ctx, driver) + // v := storage.NewVacuum(ctx, driver) var s *scheduler.TTLExpirationScheduler var ttl *time.Duration @@ -54,34 +54,37 @@ func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Name } if ttl != nil { - s = scheduler.New(ctx, driver, "/scheduler-state.json") - s.OnBlobExpire(func(ref reference.Reference) error { - var r reference.Canonical - var ok bool - if r, ok = ref.(reference.Canonical); !ok { - return fmt.Errorf("unexpected reference type : %T", ref) - } - - repo, err := registry.Repository(ctx, r) - if err != nil { - return err - } - - blobs := repo.Blobs(ctx) - - // Clear the repository reference and descriptor caches - err = blobs.Delete(ctx, r.Digest()) - if err != nil { - return err - } - - err = v.RemoveBlob(r.Digest().String()) - if err != nil { - return err - } - - return nil + s = scheduler.New(ctx, driver, "/scheduler-state.json", registry, storage.GCOpts{ + DryRun: true, + RemoveUntagged: false, }) + // s.OnBlobExpire(func(ref reference.Reference) error { + // var r reference.Canonical + // var ok bool + // if r, ok = ref.(reference.Canonical); !ok { + // return fmt.Errorf("unexpected reference type : %T", ref) + // } + + // repo, err := registry.Repository(ctx, r) + // if err != nil { + // return err + // } + + // blobs := repo.Blobs(ctx) + + // // Clear the repository reference and descriptor caches + // err = blobs.Delete(ctx, r.Digest()) + // if err != nil { + // return err + // } + + // err = v.RemoveBlob(r.Digest().String()) + // if err != nil { + // return err + // } + + // return nil + // }) s.OnManifestExpire(func(ref reference.Reference) error { var r reference.Canonical diff --git a/registry/proxy/scheduler/scheduler.go b/registry/proxy/scheduler/scheduler.go index ed1d9d4198..457914293d 100644 --- a/registry/proxy/scheduler/scheduler.go +++ b/registry/proxy/scheduler/scheduler.go @@ -7,9 +7,12 @@ import ( "sync" "time" + "github.com/distribution/distribution/v3" "github.com/distribution/distribution/v3/internal/dcontext" + "github.com/distribution/distribution/v3/registry/storage" "github.com/distribution/distribution/v3/registry/storage/driver" "github.com/distribution/reference" + "github.com/opencontainers/go-digest" ) // onTTLExpiryFunc is called when a repository's TTL expires @@ -18,7 +21,8 @@ type expiryFunc func(reference.Reference) error const ( entryTypeBlob = iota entryTypeManifest - indexSaveFrequency = 5 * time.Second + indexSaveFrequency = 5 * time.Second + garbageCollectFrequency = 1 * time.Minute ) // schedulerEntry represents an entry in the scheduler @@ -31,16 +35,24 @@ type schedulerEntry struct { timer *time.Timer } +func (se schedulerEntry) String() string { + return fmt.Sprintf("Expiry: %s, EntryType: %d", se.Expiry, se.EntryType) +} + // New returns a new instance of the scheduler -func New(ctx context.Context, driver driver.StorageDriver, path string) *TTLExpirationScheduler { +func New(ctx context.Context, driver driver.StorageDriver, path string, ttl *time.Duration, registry distribution.Namespace, opts storage.GCOpts) *TTLExpirationScheduler { return &TTLExpirationScheduler{ - entries: make(map[string]*schedulerEntry), - driver: driver, - pathToStateFile: path, - ctx: ctx, - stopped: true, - doneChan: make(chan struct{}), - saveTimer: time.NewTicker(indexSaveFrequency), + entries: make(map[string]*schedulerEntry), + driver: driver, + pathToStateFile: path, + ttl: ttl, + registry: registry, + opts: opts, + ctx: ctx, + stopped: true, + doneChan: make(chan struct{}), + saveTimer: time.NewTicker(indexSaveFrequency), + garbageCollectTimer: time.NewTicker(garbageCollectFrequency), } } @@ -54,15 +66,19 @@ type TTLExpirationScheduler struct { driver driver.StorageDriver ctx context.Context pathToStateFile string + ttl *time.Duration + registry distribution.Namespace + opts storage.GCOpts stopped bool onBlobExpire expiryFunc onManifestExpire expiryFunc - indexDirty bool - saveTimer *time.Ticker - doneChan chan struct{} + indexDirty bool + saveTimer *time.Ticker + garbageCollectTimer *time.Ticker + doneChan chan struct{} } // OnBlobExpire is called when a scheduled blob's TTL expires @@ -121,7 +137,7 @@ func (ttles *TTLExpirationScheduler) Start() error { return fmt.Errorf("scheduler already started") } - dcontext.GetLogger(ttles.ctx).Infof("Starting cached object TTL expiration scheduler...") + dcontext.GetLogger(ttles.ctx).Infof("Starting cached object TTL (cameron changed) expiration scheduler...") ttles.stopped = false // Start timer for each deserialized entry @@ -129,6 +145,11 @@ func (ttles *TTLExpirationScheduler) Start() error { entry.timer = ttles.startTimer(entry, time.Until(entry.Expiry)) } + err = ttles.BackfillManifests() + if err != nil { + return fmt.Errorf("failed to backfill manifests: %w", err) + } + // Start a ticker to periodically save the entries index go func() { @@ -140,6 +161,7 @@ func (ttles *TTLExpirationScheduler) Start() error { ttles.Unlock() continue } + dcontext.GetLogger(ttles.ctx).Debugf("Current state: \n %+v", ttles.entries) err := ttles.writeState() if err != nil { @@ -155,9 +177,25 @@ func (ttles *TTLExpirationScheduler) Start() error { } }() + // You could paraallize this, but you'd want to make sure work was not overlapping + go ttles.GarbageCollect() + return nil } +func (ttles *TTLExpirationScheduler) GarbageCollect() { + for { + select { + case <-ttles.garbageCollectTimer.C: + + storage.MarkAndSweep(ttles.ctx, ttles.driver, ttles.registry, ttles.opts) + + case <-ttles.doneChan: + return + } + } +} + func (ttles *TTLExpirationScheduler) add(r reference.Reference, ttl time.Duration, eType int) { entry := &schedulerEntry{ Key: r.String(), @@ -256,5 +294,76 @@ func (ttles *TTLExpirationScheduler) readState() error { if err != nil { return err } + dcontext.GetLogger(ttles.ctx).Infof("Start state: \n %+v", ttles.entries) + return nil } + +func (ttles *TTLExpirationScheduler) BackfillManifests() error { + repositoryEnumerator, ok := ttles.registry.(distribution.RepositoryEnumerator) + if !ok { + return fmt.Errorf("unable to convert Namespace to RepositoryEnumerator") + } + emit("backfilling manifests") + + // mark + err := repositoryEnumerator.Enumerate(ttles.ctx, func(repoName string) error { + emit("backfill for " + repoName) + + var err error + named, err := reference.WithName(repoName) + if err != nil { + return fmt.Errorf("failed to parse repo name %s: %v", repoName, err) + } + repository, err := ttles.registry.Repository(ttles.ctx, named) + if err != nil { + return fmt.Errorf("failed to construct repository: %v", err) + } + + manifestService, err := repository.Manifests(ttles.ctx) + if err != nil { + return fmt.Errorf("failed to construct manifest service: %v", err) + } + + manifestEnumerator, ok := manifestService.(distribution.ManifestEnumerator) + if !ok { + return fmt.Errorf("unable to convert ManifestService into ManifestEnumerator") + } + + err = manifestEnumerator.Enumerate(ttles.ctx, func(dgst digest.Digest) error { + // Mark the manifest's blob + emit("backfill for %s: adding ttl manifest %s ", repoName, dgst) + + // Skip if TTL exists for manifest + key := dgst.String() + if _, ok := ttles.entries[key]; !ok { + ttles.entries[key] = &schedulerEntry{ + Key: key, + + // TODO file created at is probably better + Expiry: time.Now().Add(*ttles.ttl), + EntryType: entryTypeManifest, + } + } + + return nil + }) + + // In certain situations such as unfinished uploads, deleting all + // tags in S3 or removing the _manifests folder manually, this + // error may be of type PathNotFound. + // + // In these cases we can continue marking other manifests safely. + if _, ok := err.(driver.PathNotFoundError); ok { + return nil + } + + return err + }) + + return err +} + +func emit(format string, a ...interface{}) { + fmt.Printf(format+"\n", a...) +} diff --git a/tests/conf-e2e-cloud-storage.yml b/tests/conf-e2e-cloud-storage.yml index 63a8778c70..fb551a4265 100644 --- a/tests/conf-e2e-cloud-storage.yml +++ b/tests/conf-e2e-cloud-storage.yml @@ -26,6 +26,9 @@ redis: idletimeout: 60s maxactive: 64 maxidle: 16 +proxy: + remoteurl: "https://registry-1.docker.io" + ttl: 1m storage: redirect: disable: true diff --git a/tests/docker-compose-e2e-cloud-storage.yml b/tests/docker-compose-e2e-cloud-storage.yml index 887c9db9f4..105712f1fc 100644 --- a/tests/docker-compose-e2e-cloud-storage.yml +++ b/tests/docker-compose-e2e-cloud-storage.yml @@ -55,7 +55,7 @@ services: minio-init: condition: service_completed_successfully ports: - - "5000:5000" - - "5001:5001" + - "5004:5000" + - "5005:5001" volumes: - ./conf-e2e-cloud-storage.yml:/etc/docker/registry/config-test.yml