diff --git a/pkg/leeway/cache/remote/s3.go b/pkg/leeway/cache/remote/s3.go index 0c9486c..50f260c 100644 --- a/pkg/leeway/cache/remote/s3.go +++ b/pkg/leeway/cache/remote/s3.go @@ -28,8 +28,11 @@ import ( const ( // defaultS3PartSize is the default part size for S3 multipart operations defaultS3PartSize = 5 * 1024 * 1024 - // defaultWorkerCount is the default number of concurrent workers + // defaultWorkerCount is the default number of concurrent workers for general operations defaultWorkerCount = 10 + // defaultDownloadWorkerCount is the number of concurrent workers for download operations + // Higher than default to maximize download throughput + defaultDownloadWorkerCount = 30 // defaultRateLimit is the default rate limit for S3 API calls (requests per second) defaultRateLimit = 100 // defaultBurstLimit is the default burst limit for S3 API calls @@ -64,13 +67,14 @@ type S3Config struct { // S3Cache implements RemoteCache using AWS S3 type S3Cache struct { - storage cache.ObjectStorage - cfg *cache.RemoteConfig - workerCount int - slsaVerifier slsa.VerifierInterface - cleanupMu sync.Mutex // Protects concurrent file cleanup operations - rateLimiter *rate.Limiter // Rate limiter for S3 API calls - semaphore chan struct{} // Semaphore for limiting concurrent operations + storage cache.ObjectStorage + cfg *cache.RemoteConfig + workerCount int + downloadWorkerCount int + slsaVerifier slsa.VerifierInterface + cleanupMu sync.Mutex // Protects concurrent file cleanup operations + rateLimiter *rate.Limiter // Rate limiter for S3 API calls + semaphore chan struct{} // Semaphore for limiting concurrent operations } // NewS3Cache creates a new S3 cache implementation @@ -107,12 +111,13 @@ func NewS3Cache(cfg *cache.RemoteConfig) (*S3Cache, error) { semaphore := make(chan struct{}, maxConcurrentOperations) return &S3Cache{ - storage: storage, - cfg: cfg, - workerCount: defaultWorkerCount, - slsaVerifier: slsaVerifier, - rateLimiter: rateLimiter, - semaphore: semaphore, + storage: storage, + cfg: cfg, + workerCount: defaultWorkerCount, + downloadWorkerCount: defaultDownloadWorkerCount, + slsaVerifier: slsaVerifier, + rateLimiter: rateLimiter, + semaphore: semaphore, }, nil } @@ -144,14 +149,14 @@ func (s *S3Cache) releaseSemaphore() { } } -// processPackages processes packages using a worker pool -func (s *S3Cache) processPackages(ctx context.Context, pkgs []cache.Package, fn func(context.Context, cache.Package) error) error { +// processPackages processes packages using a worker pool with the specified worker count +func (s *S3Cache) processPackages(ctx context.Context, pkgs []cache.Package, workerCount int, fn func(context.Context, cache.Package) error) error { jobs := make(chan cache.Package, len(pkgs)) results := make(chan error, len(pkgs)) var wg sync.WaitGroup // Start workers - for i := 0; i < s.workerCount; i++ { + for i := 0; i < workerCount; i++ { wg.Add(1) go func() { defer wg.Done() @@ -203,9 +208,84 @@ func (s *S3Cache) processPackages(ctx context.Context, pkgs []cache.Package, fn // ExistingPackages implements RemoteCache func (s *S3Cache) ExistingPackages(ctx context.Context, pkgs []cache.Package) (map[cache.Package]struct{}, error) { result := make(map[cache.Package]struct{}) + + // Build a map of version -> package for quick lookup + versionToPackage := make(map[string]cache.Package, len(pkgs)) + for _, p := range pkgs { + version, err := p.Version() + if err != nil { + log.WithError(err).WithField("package", p.FullName()).Debug("Failed to get version for package, skipping") + continue + } + versionToPackage[version] = p + } + + if len(versionToPackage) == 0 { + return result, nil + } + + // Use ListObjectsV2 to batch check all packages in 1-2 API calls + // We list all objects and check which packages exist + // This is much faster than 2N HeadObject calls (2 per package) + if err := s.waitForRateLimit(ctx); err != nil { + log.WithError(err).Debug("Rate limiter error during batch existence check") + // Fall back to sequential checks if rate limited + return s.existingPackagesSequential(ctx, pkgs) + } + + timeoutCtx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + + // List all objects with empty prefix to get all cached artifacts + // In practice, this could be optimized with a common prefix if versions share one + objects, err := s.storage.ListObjects(timeoutCtx, "") + if err != nil { + log.WithError(err).Debug("Failed to list objects in remote cache, falling back to sequential checks") + // Fall back to sequential checks on error + return s.existingPackagesSequential(ctx, pkgs) + } + + // Build a set of existing keys for O(1) lookup + existingKeys := make(map[string]bool, len(objects)) + for _, key := range objects { + existingKeys[key] = true + } + + // Check which packages exist by looking up their keys + for version, p := range versionToPackage { + gzKey := fmt.Sprintf("%s.tar.gz", version) + tarKey := fmt.Sprintf("%s.tar", version) + + if existingKeys[gzKey] { + log.WithFields(log.Fields{ + "package": p.FullName(), + "key": gzKey, + }).Debug("found package in remote cache (.tar.gz)") + result[p] = struct{}{} + } else if existingKeys[tarKey] { + log.WithFields(log.Fields{ + "package": p.FullName(), + "key": tarKey, + }).Debug("found package in remote cache (.tar)") + result[p] = struct{}{} + } else { + log.WithFields(log.Fields{ + "package": p.FullName(), + "version": version, + }).Debug("package not found in remote cache, will build locally") + } + } + + return result, nil +} + +// existingPackagesSequential is the fallback implementation using sequential HeadObject calls +// This is used when ListObjects fails or is rate limited +func (s *S3Cache) existingPackagesSequential(ctx context.Context, pkgs []cache.Package) (map[cache.Package]struct{}, error) { + result := make(map[cache.Package]struct{}) var mu sync.Mutex - err := s.processPackages(ctx, pkgs, func(ctx context.Context, p cache.Package) error { + err := s.processPackages(ctx, pkgs, s.workerCount, func(ctx context.Context, p cache.Package) error { version, err := p.Version() if err != nil { return fmt.Errorf("failed to get version: %w", err) @@ -313,7 +393,9 @@ func withRetry(maxRetries int, operation func() error) error { func (s *S3Cache) Download(ctx context.Context, dst cache.LocalCache, pkgs []cache.Package) error { var multiErr []error - err := s.processPackages(ctx, pkgs, func(ctx context.Context, p cache.Package) error { + // Use higher worker count for downloads to maximize throughput + // TODO: Implement dependency-aware scheduling to prioritize critical path packages + err := s.processPackages(ctx, pkgs, s.downloadWorkerCount, func(ctx context.Context, p cache.Package) error { version, err := p.Version() if err != nil { log.WithError(err).WithField("package", p.FullName()).Warn("Failed to get version for package, skipping") @@ -937,7 +1019,7 @@ func (s *S3Cache) downloadUnverified(ctx context.Context, p cache.Package, versi func (s *S3Cache) Upload(ctx context.Context, src cache.LocalCache, pkgs []cache.Package) error { var uploadErrors []error - err := s.processPackages(ctx, pkgs, func(ctx context.Context, p cache.Package) error { + err := s.processPackages(ctx, pkgs, s.workerCount, func(ctx context.Context, p cache.Package) error { localPath, exists := src.Location(p) if !exists { log.WithField("package", p.FullName()).Warn("package not found in local cache - skipping upload") diff --git a/pkg/leeway/cache/remote/s3_download_test.go b/pkg/leeway/cache/remote/s3_download_test.go index 6c80fc9..dcbc34e 100644 --- a/pkg/leeway/cache/remote/s3_download_test.go +++ b/pkg/leeway/cache/remote/s3_download_test.go @@ -174,10 +174,11 @@ func TestS3CacheDownload(t *testing.T) { } s3Cache := &S3Cache{ - storage: mockStorage, - workerCount: 1, - rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), - semaphore: make(chan struct{}, maxConcurrentOperations), + storage: mockStorage, + workerCount: 1, + downloadWorkerCount: defaultDownloadWorkerCount, + rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), + semaphore: make(chan struct{}, maxConcurrentOperations), } err := s3Cache.Download(context.Background(), localCache, tt.packages) diff --git a/pkg/leeway/cache/remote/s3_performance_test.go b/pkg/leeway/cache/remote/s3_performance_test.go index bf00f16..386a756 100644 --- a/pkg/leeway/cache/remote/s3_performance_test.go +++ b/pkg/leeway/cache/remote/s3_performance_test.go @@ -14,11 +14,13 @@ import ( "github.com/gitpod-io/leeway/pkg/leeway/cache" "github.com/gitpod-io/leeway/pkg/leeway/cache/local" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" ) // Realistic constants based on production observations const ( - s3Latency = 50 * time.Millisecond // Network round-trip + s3Latency = 50 * time.Millisecond // Network round-trip (production) + s3LatencyTest = 1 * time.Millisecond // Reduced latency for fast tests s3ThroughputMBs = 100 // MB/s download speed verifyTimeEd255 = 100 * time.Microsecond // Ed25519 signature verify attestationSize = 5 * 1024 // ~5KB attestation @@ -57,11 +59,16 @@ func createMockAttestation(t testing.TB) []byte { // realisticMockS3Storage implements realistic S3 performance characteristics type realisticMockS3Storage struct { objects map[string][]byte + latency time.Duration // Configurable latency for testing } func (m *realisticMockS3Storage) HasObject(ctx context.Context, key string) (bool, error) { // Simulate network latency for metadata check - time.Sleep(s3Latency / 2) // Metadata operations are faster + latency := m.latency + if latency == 0 { + latency = s3Latency // Default to realistic latency + } + time.Sleep(latency / 2) // Metadata operations are faster _, exists := m.objects[key] return exists, nil @@ -74,7 +81,11 @@ func (m *realisticMockS3Storage) GetObject(ctx context.Context, key string, dest } // Simulate network latency - time.Sleep(s3Latency) + latency := m.latency + if latency == 0 { + latency = s3Latency // Default to realistic latency + } + time.Sleep(latency) // Simulate download time based on size and throughput sizeInMB := float64(len(data)) / (1024 * 1024) @@ -92,7 +103,11 @@ func (m *realisticMockS3Storage) UploadObject(ctx context.Context, key string, s } // Simulate upload latency and throughput - time.Sleep(s3Latency) + latency := m.latency + if latency == 0 { + latency = s3Latency // Default to realistic latency + } + time.Sleep(latency) sizeInMB := float64(len(data)) / (1024 * 1024) uploadTime := time.Duration(sizeInMB / float64(s3ThroughputMBs) * float64(time.Second)) time.Sleep(uploadTime) @@ -103,7 +118,11 @@ func (m *realisticMockS3Storage) UploadObject(ctx context.Context, key string, s func (m *realisticMockS3Storage) ListObjects(ctx context.Context, prefix string) ([]string, error) { // Simulate network latency for list operation - time.Sleep(s3Latency / 2) + latency := m.latency + if latency == 0 { + latency = s3Latency // Default to realistic latency + } + time.Sleep(latency / 2) var keys []string for key := range m.objects { @@ -339,18 +358,13 @@ func BenchmarkS3Cache_ParallelDownloads(b *testing.B) { // TestS3Cache_ParallelVerificationScaling tests scalability func TestS3Cache_ParallelVerificationScaling(t *testing.T) { - if testing.Short() { - t.Skip("skipping scaling test in short mode") - } - + // Use reduced latency and minimal packages for fast tests tests := []struct { packages int workers int }{ - {1, 1}, + {2, 1}, {5, 2}, - {10, 4}, - {20, 8}, } for _, tt := range tests { @@ -366,8 +380,9 @@ func TestS3Cache_ParallelVerificationScaling(t *testing.T) { } } - // Setup cache + // Setup cache with fast latency mockStorage := createRealisticMockS3StorageMultiple(t, tt.packages) + mockStorage.latency = s3LatencyTest // Use fast latency for tests mockVerifier := &realisticMockVerifier{} config := &cache.RemoteConfig{ @@ -379,9 +394,13 @@ func TestS3Cache_ParallelVerificationScaling(t *testing.T) { } s3Cache := &S3Cache{ - storage: mockStorage, - cfg: config, - slsaVerifier: mockVerifier, + storage: mockStorage, + cfg: config, + slsaVerifier: mockVerifier, + workerCount: defaultWorkerCount, + downloadWorkerCount: defaultDownloadWorkerCount, + rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), + semaphore: make(chan struct{}, maxConcurrentOperations), } tmpDir := t.TempDir() @@ -398,6 +417,149 @@ func TestS3Cache_ParallelVerificationScaling(t *testing.T) { } } +// TestS3Cache_ExistingPackagesBatchOptimization tests the ListObjects optimization +func TestS3Cache_ExistingPackagesBatchOptimization(t *testing.T) { + // Use reduced latency for fast tests + packageCounts := []int{10, 50, 100} + + for _, count := range packageCounts { + t.Run(fmt.Sprintf("%d-packages", count), func(t *testing.T) { + // Create packages + packages := make([]cache.Package, count) + for i := 0; i < count; i++ { + packages[i] = &mockPackagePerf{ + version: fmt.Sprintf("package%d:v%d", i, i), + fullName: fmt.Sprintf("package%d", i), + } + } + + // Setup mock storage with all packages and fast latency + mockStorage := createRealisticMockS3StorageMultiple(t, count) + mockStorage.latency = s3LatencyTest // Use fast latency for tests + + config := &cache.RemoteConfig{ + BucketName: "test-bucket", + } + + s3Cache := &S3Cache{ + storage: mockStorage, + cfg: config, + workerCount: defaultWorkerCount, + downloadWorkerCount: defaultDownloadWorkerCount, + rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), + } + + // Measure time for batch check (using ListObjects) + start := time.Now() + existing, err := s3Cache.ExistingPackages(context.Background(), packages) + batchDuration := time.Since(start) + require.NoError(t, err) + require.Equal(t, count, len(existing), "All packages should be found") + + // Measure time for sequential check (fallback method) + start = time.Now() + existingSeq, err := s3Cache.existingPackagesSequential(context.Background(), packages) + seqDuration := time.Since(start) + require.NoError(t, err) + require.Equal(t, count, len(existingSeq), "All packages should be found") + + // Calculate speedup + speedup := float64(seqDuration) / float64(batchDuration) + + t.Logf("Package count: %d", count) + t.Logf("Batch (ListObjects): %v", batchDuration) + t.Logf("Sequential (HeadObject): %v", seqDuration) + t.Logf("Speedup: %.2fx", speedup) + + // For larger package counts, batch should be significantly faster + if count >= 50 { + require.Greater(t, speedup, 3.0, "Batch optimization should be at least 3x faster for 50+ packages") + } else { + require.Greater(t, speedup, 1.0, "Batch optimization should be faster than sequential") + } + }) + } +} + +// BenchmarkS3Cache_ExistingPackages benchmarks the optimized ExistingPackages method +func BenchmarkS3Cache_ExistingPackages(b *testing.B) { + if testing.Short() { + b.Skip("skipping benchmark in short mode") + } + + packageCounts := []int{10, 50, 100, 200} + + for _, count := range packageCounts { + b.Run(fmt.Sprintf("%d-packages-batch", count), func(b *testing.B) { + // Create packages + packages := make([]cache.Package, count) + for i := 0; i < count; i++ { + packages[i] = &mockPackagePerf{ + version: fmt.Sprintf("package%d:v%d", i, i), + fullName: fmt.Sprintf("package%d", i), + } + } + + // Setup mock storage + mockStorage := createRealisticMockS3StorageMultiple(b, count) + + config := &cache.RemoteConfig{ + BucketName: "test-bucket", + } + + s3Cache := &S3Cache{ + storage: mockStorage, + cfg: config, + workerCount: defaultWorkerCount, + downloadWorkerCount: defaultDownloadWorkerCount, + rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := s3Cache.ExistingPackages(context.Background(), packages) + if err != nil { + b.Fatal(err) + } + } + }) + + b.Run(fmt.Sprintf("%d-packages-sequential", count), func(b *testing.B) { + // Create packages + packages := make([]cache.Package, count) + for i := 0; i < count; i++ { + packages[i] = &mockPackagePerf{ + version: fmt.Sprintf("package%d:v%d", i, i), + fullName: fmt.Sprintf("package%d", i), + } + } + + // Setup mock storage + mockStorage := createRealisticMockS3StorageMultiple(b, count) + + config := &cache.RemoteConfig{ + BucketName: "test-bucket", + } + + s3Cache := &S3Cache{ + storage: mockStorage, + cfg: config, + workerCount: defaultWorkerCount, + downloadWorkerCount: defaultDownloadWorkerCount, + rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := s3Cache.existingPackagesSequential(context.Background(), packages) + if err != nil { + b.Fatal(err) + } + } + }) + } +} + // BenchmarkS3Cache_ThroughputComparison compares baseline vs verified throughput func BenchmarkS3Cache_ThroughputComparison(b *testing.B) { if testing.Short() { diff --git a/pkg/leeway/cache/remote/s3_slsa_test.go b/pkg/leeway/cache/remote/s3_slsa_test.go index e310028..0e84d91 100644 --- a/pkg/leeway/cache/remote/s3_slsa_test.go +++ b/pkg/leeway/cache/remote/s3_slsa_test.go @@ -252,11 +252,12 @@ func TestS3Cache_DownloadWithSLSAVerification(t *testing.T) { // Create S3Cache with test configuration s3Cache := &S3Cache{ - storage: mockStorage, - cfg: &tt.config, - workerCount: 1, - rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), - semaphore: make(chan struct{}, maxConcurrentOperations), + storage: mockStorage, + cfg: &tt.config, + workerCount: 1, + downloadWorkerCount: defaultDownloadWorkerCount, + rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), + semaphore: make(chan struct{}, maxConcurrentOperations), } // Initialize SLSA verifier if enabled @@ -337,11 +338,12 @@ func TestS3Cache_BackwardCompatibility(t *testing.T) { } s3Cache := &S3Cache{ - storage: mockStorage, - cfg: config, - workerCount: 1, - rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), - semaphore: make(chan struct{}, maxConcurrentOperations), + storage: mockStorage, + cfg: config, + workerCount: 1, + downloadWorkerCount: defaultDownloadWorkerCount, + rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), + semaphore: make(chan struct{}, maxConcurrentOperations), } packages := []cache.Package{ diff --git a/pkg/leeway/cache/remote/s3_test.go b/pkg/leeway/cache/remote/s3_test.go index 1a13d8f..5446908 100644 --- a/pkg/leeway/cache/remote/s3_test.go +++ b/pkg/leeway/cache/remote/s3_test.go @@ -23,9 +23,10 @@ import ( // mockS3Client implements a mock S3 client for testing type mockS3Client struct { - headObjectFunc func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) - getObjectFunc func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) - putObjectFunc func(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) + headObjectFunc func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) + getObjectFunc func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) + putObjectFunc func(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) + listObjectsV2Func func(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) } func (m *mockS3Client) HeadObject(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { @@ -66,6 +67,9 @@ func (m *mockS3Client) UploadPart(ctx context.Context, params *s3.UploadPartInpu } func (m *mockS3Client) ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { + if m.listObjectsV2Func != nil { + return m.listObjectsV2Func(ctx, params, optFns...) + } return &s3.ListObjectsV2Output{}, nil } @@ -74,11 +78,12 @@ func TestS3Cache_ExistingPackages(t *testing.T) { defer cancel() tests := []struct { - name string - packages []cache.Package - mockHeadObject func(key string) (*s3.HeadObjectOutput, error) - expectedResults map[string]struct{} - expectError bool + name string + packages []cache.Package + mockHeadObject func(key string) (*s3.HeadObjectOutput, error) + mockListObjects func(prefix string) (*s3.ListObjectsV2Output, error) + expectedResults map[string]struct{} + expectError bool }{ { name: "finds tar.gz package", @@ -91,6 +96,14 @@ func TestS3Cache_ExistingPackages(t *testing.T) { } return nil, &types.NoSuchKey{} }, + mockListObjects: func(prefix string) (*s3.ListObjectsV2Output, error) { + key := "v1.tar.gz" + return &s3.ListObjectsV2Output{ + Contents: []types.Object{ + {Key: &key}, + }, + }, nil + }, expectedResults: map[string]struct{}{ "v1": {}, }, @@ -109,6 +122,14 @@ func TestS3Cache_ExistingPackages(t *testing.T) { } return nil, &types.NoSuchKey{} }, + mockListObjects: func(prefix string) (*s3.ListObjectsV2Output, error) { + key := "v1.tar" + return &s3.ListObjectsV2Output{ + Contents: []types.Object{ + {Key: &key}, + }, + }, nil + }, expectedResults: map[string]struct{}{ "v1": {}, }, @@ -121,6 +142,11 @@ func TestS3Cache_ExistingPackages(t *testing.T) { mockHeadObject: func(key string) (*s3.HeadObjectOutput, error) { return nil, &types.NoSuchKey{} }, + mockListObjects: func(prefix string) (*s3.ListObjectsV2Output, error) { + return &s3.ListObjectsV2Output{ + Contents: []types.Object{}, + }, nil + }, expectedResults: map[string]struct{}{}, }, { @@ -131,6 +157,14 @@ func TestS3Cache_ExistingPackages(t *testing.T) { mockHeadObject: func(key string) (*s3.HeadObjectOutput, error) { return &s3.HeadObjectOutput{}, nil }, + mockListObjects: func(prefix string) (*s3.ListObjectsV2Output, error) { + key := "v1.tar.gz" + return &s3.ListObjectsV2Output{ + Contents: []types.Object{ + {Key: &key}, + }, + }, nil + }, expectedResults: map[string]struct{}{}, expectError: false, }, @@ -142,6 +176,12 @@ func TestS3Cache_ExistingPackages(t *testing.T) { headObjectFunc: func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { return tt.mockHeadObject(*params.Key) }, + listObjectsV2Func: func(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { + if tt.mockListObjects != nil { + return tt.mockListObjects(*params.Prefix) + } + return &s3.ListObjectsV2Output{}, nil + }, } s3Cache := &S3Cache{ @@ -149,9 +189,10 @@ func TestS3Cache_ExistingPackages(t *testing.T) { client: mockClient, bucketName: "test-bucket", }, - workerCount: 1, - rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), - semaphore: make(chan struct{}, maxConcurrentOperations), + workerCount: 1, + downloadWorkerCount: defaultDownloadWorkerCount, + rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), + semaphore: make(chan struct{}, maxConcurrentOperations), } results, err := s3Cache.ExistingPackages(ctx, tt.packages) @@ -293,9 +334,10 @@ func TestS3Cache_Download(t *testing.T) { client: mockClient, bucketName: "test-bucket", }, - workerCount: 1, - rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), - semaphore: make(chan struct{}, maxConcurrentOperations), + workerCount: 1, + downloadWorkerCount: defaultDownloadWorkerCount, + rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), + semaphore: make(chan struct{}, maxConcurrentOperations), } err := s3Cache.Download(ctx, tt.localCache, tt.packages) @@ -424,9 +466,10 @@ func TestS3Cache_Upload(t *testing.T) { client: mockClient, bucketName: "test-bucket", }, - workerCount: 1, - rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), - semaphore: make(chan struct{}, maxConcurrentOperations), + workerCount: 1, + downloadWorkerCount: defaultDownloadWorkerCount, + rateLimiter: rate.NewLimiter(rate.Limit(defaultRateLimit), defaultBurstLimit), + semaphore: make(chan struct{}, maxConcurrentOperations), } err := s3Cache.Upload(ctx, tt.localCache, tt.packages)