Skip to content

Commit

Permalink
atc: Consider image volumes in volume-locality strategy
Browse files Browse the repository at this point in the history
Often the largest volume that may need streaming is the image that the
container executes in. By considering this volume too, this may allow
for slightly better placement/reduced streaming for some builds (in
particular builds with no inputs or caches will now prefer to be on the
same worker as their image volume rather than a random one)

Signed-off-by: Andy Paine <andy.paine@engineerbetter.com>
  • Loading branch information
andy-paine committed Feb 10, 2022
1 parent c336a05 commit 413f8e9
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 8 deletions.
29 changes: 21 additions & 8 deletions atc/worker/placement.go
Expand Up @@ -140,33 +140,33 @@ type volumeLocalityStrategy struct{}
func (strategy volumeLocalityStrategy) Order(logger lager.Logger, pool Pool, workers []db.Worker, spec runtime.ContainerSpec) ([]db.Worker, error) {
counts := make(map[string]int, len(workers))

for _, input := range spec.Inputs {
volume, ok := input.Artifact.(runtime.Volume)
updateCountsForArtifact := func(artifact runtime.Artifact, destinationPath string) error {
volume, ok := artifact.(runtime.Volume)
if !ok {
// Non-volume artifacts don't live on workers, so don't affect
// volume locality decisions.
continue
return nil
}
logger := logger.WithData(lager.Data{
"handle": volume.Handle(),
"path": input.DestinationPath,
"path": destinationPath,
})
srcWorker := volume.DBVolume().WorkerName()
counts[srcWorker]++

resourceCacheID := volume.DBVolume().GetResourceCacheID()
if resourceCacheID == 0 {
logger.Debug("resource-not-cached")
continue
return nil
}
resourceCache, found, err := pool.db.ResourceCacheFactory.FindResourceCacheByID(resourceCacheID)
if err != nil {
logger.Error("failed-to-find-resource-cache", err)
return nil, err
return err
}
if !found {
logger.Debug("resource-cache-not-found")
continue
return nil
}
for _, worker := range workers {
if worker.Name() == srcWorker {
Expand All @@ -175,12 +175,25 @@ func (strategy volumeLocalityStrategy) Order(logger lager.Logger, pool Pool, wor
_, found, err := pool.db.VolumeRepo.FindResourceCacheVolume(worker.Name(), resourceCache)
if err != nil {
logger.Error("failed-to-find-resource-cache-volume", err)
return nil, err
return err
}
if found {
counts[worker.Name()]++
}
}
return nil
}

err := updateCountsForArtifact(spec.ImageSpec.ImageArtifact, "/")
if err != nil {
return nil, err
}

for _, input := range spec.Inputs {
err := updateCountsForArtifact(input.Artifact, input.DestinationPath)
if err != nil {
return nil, err
}
}

for _, cachePath := range spec.Caches {
Expand Down
27 changes: 27 additions & 0 deletions atc/worker/placement_test.go
Expand Up @@ -22,6 +22,33 @@ var _ = Describe("Container Placement Strategies", func() {
return strategy
}

Test("sorts the workers based on image volume", func() {
scenario := Setup(
workertest.WithBasicJob(),
workertest.WithWorkers(
grt.NewWorker("worker1").
WithVolumesCreatedInDBAndBaggageclaim(
grt.NewVolume("image-volume1"),
),
grt.NewWorker("worker2").
WithVolumesCreatedInDBAndBaggageclaim(
grt.NewVolume("image-volume2"),
),
),
)

workers, err := volumeLocalityStrategy().Order(logger, scenario.Pool, scenario.DB.Workers, runtime.ContainerSpec{
TeamID: scenario.TeamID,
JobID: scenario.JobID,
StepName: scenario.StepName,
ImageSpec: runtime.ImageSpec{
ImageArtifact: scenario.WorkerVolume("worker2", "image-volume2"),
},
})
Expect(err).ToNot(HaveOccurred())
Expect(workerNames(workers)).To(Equal([]string{"worker2", "worker1"}))
})

Test("sorts the workers by number of local inputs", func() {
scenario := Setup(
workertest.WithBasicJob(),
Expand Down

0 comments on commit 413f8e9

Please sign in to comment.