Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 102 additions & 20 deletions pkg/leeway/cache/remote/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import (
const (
// defaultS3PartSize is the default part size for S3 multipart operations
defaultS3PartSize = 5 * 1024 * 1024
// defaultWorkerCount is the default number of concurrent workers
// defaultWorkerCount is the default number of concurrent workers for general operations
defaultWorkerCount = 10
// defaultDownloadWorkerCount is the number of concurrent workers for download operations
// Higher than default to maximize download throughput
defaultDownloadWorkerCount = 30
// defaultRateLimit is the default rate limit for S3 API calls (requests per second)
defaultRateLimit = 100
// defaultBurstLimit is the default burst limit for S3 API calls
Expand Down Expand Up @@ -64,13 +67,14 @@ type S3Config struct {

// S3Cache implements RemoteCache using AWS S3
type S3Cache struct {
storage cache.ObjectStorage
cfg *cache.RemoteConfig
workerCount int
slsaVerifier slsa.VerifierInterface
cleanupMu sync.Mutex // Protects concurrent file cleanup operations
rateLimiter *rate.Limiter // Rate limiter for S3 API calls
semaphore chan struct{} // Semaphore for limiting concurrent operations
storage cache.ObjectStorage
cfg *cache.RemoteConfig
workerCount int
downloadWorkerCount int
slsaVerifier slsa.VerifierInterface
cleanupMu sync.Mutex // Protects concurrent file cleanup operations
rateLimiter *rate.Limiter // Rate limiter for S3 API calls
semaphore chan struct{} // Semaphore for limiting concurrent operations
}

// NewS3Cache creates a new S3 cache implementation
Expand Down Expand Up @@ -107,12 +111,13 @@ func NewS3Cache(cfg *cache.RemoteConfig) (*S3Cache, error) {
semaphore := make(chan struct{}, maxConcurrentOperations)

return &S3Cache{
storage: storage,
cfg: cfg,
workerCount: defaultWorkerCount,
slsaVerifier: slsaVerifier,
rateLimiter: rateLimiter,
semaphore: semaphore,
storage: storage,
cfg: cfg,
workerCount: defaultWorkerCount,
downloadWorkerCount: defaultDownloadWorkerCount,
slsaVerifier: slsaVerifier,
rateLimiter: rateLimiter,
semaphore: semaphore,
}, nil
}

Expand Down Expand Up @@ -144,14 +149,14 @@ func (s *S3Cache) releaseSemaphore() {
}
}

// processPackages processes packages using a worker pool
func (s *S3Cache) processPackages(ctx context.Context, pkgs []cache.Package, fn func(context.Context, cache.Package) error) error {
// processPackages processes packages using a worker pool with the specified worker count
func (s *S3Cache) processPackages(ctx context.Context, pkgs []cache.Package, workerCount int, fn func(context.Context, cache.Package) error) error {
jobs := make(chan cache.Package, len(pkgs))
results := make(chan error, len(pkgs))
var wg sync.WaitGroup

// Start workers
for i := 0; i < s.workerCount; i++ {
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -203,9 +208,84 @@ func (s *S3Cache) processPackages(ctx context.Context, pkgs []cache.Package, fn
// ExistingPackages implements RemoteCache
func (s *S3Cache) ExistingPackages(ctx context.Context, pkgs []cache.Package) (map[cache.Package]struct{}, error) {
result := make(map[cache.Package]struct{})

// Build a map of version -> package for quick lookup
versionToPackage := make(map[string]cache.Package, len(pkgs))
for _, p := range pkgs {
version, err := p.Version()
if err != nil {
log.WithError(err).WithField("package", p.FullName()).Debug("Failed to get version for package, skipping")
continue
}
versionToPackage[version] = p
}

if len(versionToPackage) == 0 {
return result, nil
}

// Use ListObjectsV2 to batch check all packages in 1-2 API calls
// We list all objects and check which packages exist
// This is much faster than 2N HeadObject calls (2 per package)
if err := s.waitForRateLimit(ctx); err != nil {
log.WithError(err).Debug("Rate limiter error during batch existence check")
// Fall back to sequential checks if rate limited
return s.existingPackagesSequential(ctx, pkgs)
}

timeoutCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

// List all objects with empty prefix to get all cached artifacts
// In practice, this could be optimized with a common prefix if versions share one
objects, err := s.storage.ListObjects(timeoutCtx, "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to worry about large buckets with so many objects that we run into scalability issues?

  • If the bucket has thousands of cached artifacts, this could return a very large list
  • Memory usage could spike with large buckets

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concern is valid for very large buckets, but:

  1. most Leeway builds have 10-100 packages, and buckets typically have a few thousand artifacts (old versions get cleaned up once a week)
  2. the fallback exists: If ListObjects fails or times out, it falls back to sequential
  3. there's already a 60 seconds timeout that would trigger fallback for very slow listings

It feels like a bucket would need 100k+ objects before this becomes problematic, and even then the 60s timeout provides a safety net

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the S3 bucket lacks a retention policy and grows unbounded, there are two options.

Option 1 (recommended): Add a retention policy to the bucket.

This is the proper fix. Old cache artifacts should be cleaned up periodically.

Option 2: Add a maxListPages safeguard.

If a retention policy isn't possible, we can limit pagination to avoid memory issues with very large buckets:

const maxListPages = 10  // ~10,000 objects (1000 per page default)

pageCount := 0
for paginator.HasMorePages() {
    if pageCount >= maxListPages {
        return nil, fmt.Errorf("bucket exceeds %d pages, falling back to sequential", maxListPages)
    }
    // ... existing pagination logic
    pageCount++
}

Behavior:

  • Buckets with <10k objects: Fast batch check (current optimization)
  • Buckets with >10k objects: Returns error, caller falls back to sequential HeadObject calls

For a build with 100 packages against a 50k object bucket:

  • Batch would need 50 pages → hits limit → triggers fallback
  • Sequential: 200 HeadObject calls (~2-5 seconds)

This caps memory at ~1MB while preserving the optimization for reasonably-sized buckets.


In any case, I feel this is a bit of a premature optimization (given the option 1 is faster/smarter to apply) that in any case would be better suited for a follow-up PR.

if err != nil {
log.WithError(err).Debug("Failed to list objects in remote cache, falling back to sequential checks")
// Fall back to sequential checks on error
return s.existingPackagesSequential(ctx, pkgs)
}

// Build a set of existing keys for O(1) lookup
existingKeys := make(map[string]bool, len(objects))
for _, key := range objects {
existingKeys[key] = true
}

// Check which packages exist by looking up their keys
for version, p := range versionToPackage {
gzKey := fmt.Sprintf("%s.tar.gz", version)
tarKey := fmt.Sprintf("%s.tar", version)

if existingKeys[gzKey] {
log.WithFields(log.Fields{
"package": p.FullName(),
"key": gzKey,
}).Debug("found package in remote cache (.tar.gz)")
result[p] = struct{}{}
} else if existingKeys[tarKey] {
log.WithFields(log.Fields{
"package": p.FullName(),
"key": tarKey,
}).Debug("found package in remote cache (.tar)")
result[p] = struct{}{}
} else {
log.WithFields(log.Fields{
"package": p.FullName(),
"version": version,
}).Debug("package not found in remote cache, will build locally")
}
}

return result, nil
}

// existingPackagesSequential is the fallback implementation using sequential HeadObject calls
// This is used when ListObjects fails or is rate limited
func (s *S3Cache) existingPackagesSequential(ctx context.Context, pkgs []cache.Package) (map[cache.Package]struct{}, error) {
result := make(map[cache.Package]struct{})
var mu sync.Mutex

err := s.processPackages(ctx, pkgs, func(ctx context.Context, p cache.Package) error {
err := s.processPackages(ctx, pkgs, s.workerCount, func(ctx context.Context, p cache.Package) error {
version, err := p.Version()
if err != nil {
return fmt.Errorf("failed to get version: %w", err)
Expand Down Expand Up @@ -313,7 +393,9 @@ func withRetry(maxRetries int, operation func() error) error {
func (s *S3Cache) Download(ctx context.Context, dst cache.LocalCache, pkgs []cache.Package) error {
var multiErr []error

err := s.processPackages(ctx, pkgs, func(ctx context.Context, p cache.Package) error {
// Use higher worker count for downloads to maximize throughput
// TODO: Implement dependency-aware scheduling to prioritize critical path packages
err := s.processPackages(ctx, pkgs, s.downloadWorkerCount, func(ctx context.Context, p cache.Package) error {
version, err := p.Version()
if err != nil {
log.WithError(err).WithField("package", p.FullName()).Warn("Failed to get version for package, skipping")
Expand Down Expand Up @@ -937,7 +1019,7 @@ func (s *S3Cache) downloadUnverified(ctx context.Context, p cache.Package, versi
func (s *S3Cache) Upload(ctx context.Context, src cache.LocalCache, pkgs []cache.Package) error {
var uploadErrors []error

err := s.processPackages(ctx, pkgs, func(ctx context.Context, p cache.Package) error {
err := s.processPackages(ctx, pkgs, s.workerCount, func(ctx context.Context, p cache.Package) error {
localPath, exists := src.Location(p)
if !exists {
log.WithField("package", p.FullName()).Warn("package not found in local cache - skipping upload")
Expand Down
9 changes: 5 additions & 4 deletions pkg/leeway/cache/remote/s3_download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,11 @@ func TestS3CacheDownload(t *testing.T) {
}

s3Cache := &S3Cache{
storage: mockStorage,
workerCount: 1,
rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit),
semaphore: make(chan struct{}, maxConcurrentOperations),
storage: mockStorage,
workerCount: 1,
downloadWorkerCount: defaultDownloadWorkerCount,
rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit),
semaphore: make(chan struct{}, maxConcurrentOperations),
}

err := s3Cache.Download(context.Background(), localCache, tt.packages)
Expand Down
Loading
Loading