Skip to content

Commit ed84b9a

Browse files
authored
perf: uses a semaphore to fetch containers faster in parallel (#3161)
1 parent c906b80 commit ed84b9a

File tree

3 files changed

+26
-8
lines changed

3 files changed

+26
-8
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ require (
3333
github.com/samber/lo v1.46.0
3434
github.com/wk8/go-ordered-map/v2 v2.1.8
3535
github.com/yuin/goldmark v1.7.4
36+
golang.org/x/sync v0.7.0
3637
google.golang.org/grpc v1.65.0
3738
google.golang.org/protobuf v1.34.2
3839
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
170170
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
171171
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
172172
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
173+
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
174+
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
173175
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
174176
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
175177
golang.org/x/sys v0.0.0-20200831180312-196b9ba8737a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

internal/docker/container_store.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import (
88

99
"github.com/puzpuzpuz/xsync/v3"
1010
"github.com/samber/lo"
11-
lop "github.com/samber/lo/parallel"
1211
log "github.com/sirupsen/logrus"
12+
"golang.org/x/sync/semaphore"
1313
)
1414

1515
type ContainerStore struct {
@@ -43,7 +43,10 @@ func NewContainerStore(ctx context.Context, client Client) *ContainerStore {
4343
return s
4444
}
4545

46-
var ErrContainerNotFound = errors.New("container not found")
46+
var (
47+
ErrContainerNotFound = errors.New("container not found")
48+
maxFetchParallelism = int64(30)
49+
)
4750

4851
func (s *ContainerStore) checkConnectivity() error {
4952
if s.connected.CompareAndSwap(false, true) {
@@ -69,14 +72,26 @@ func (s *ContainerStore) checkConnectivity() error {
6972
return item.State == "running"
7073
})
7174

72-
chunks := lo.Chunk(running, 100)
75+
sem := semaphore.NewWeighted(maxFetchParallelism)
7376

74-
for _, chunk := range chunks {
75-
lop.ForEach(chunk, func(c Container, _ int) {
76-
container, _ := s.client.FindContainer(c.ID)
77-
s.containers.Store(c.ID, &container)
78-
})
77+
for i, c := range running {
78+
if err := sem.Acquire(s.ctx, 1); err != nil {
79+
log.Errorf("failed to acquire semaphore: %v", err)
80+
break
81+
}
82+
go func(c Container, i int) {
83+
defer sem.Release(1)
84+
if container, err := s.client.FindContainer(c.ID); err == nil {
85+
s.containers.Store(c.ID, &container)
86+
}
87+
}(c, i)
7988
}
89+
90+
if err := sem.Acquire(s.ctx, maxFetchParallelism); err != nil {
91+
log.Errorf("failed to acquire semaphore: %v", err)
92+
}
93+
94+
log.Debugf("finished initializing container store with %d containers", len(containers))
8095
}
8196
}
8297

0 commit comments

Comments
 (0)