-
Notifications
You must be signed in to change notification settings - Fork 126
/
process.go
37 lines (29 loc) · 884 Bytes
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package util
import (
"sync"
"github.com/armadaproject/armada/internal/common/armadacontext"
commonUtil "github.com/armadaproject/armada/internal/common/util"
)
func ProcessItemsWithThreadPool[K any](ctx *armadacontext.Context, maxThreadCount int, itemsToProcess []K, processFunc func(K)) {
wg := &sync.WaitGroup{}
processChannel := make(chan K)
for i := 0; i < commonUtil.Min(len(itemsToProcess), maxThreadCount); i++ {
wg.Add(1)
go poolWorker(ctx, wg, processChannel, processFunc)
}
for _, item := range itemsToProcess {
processChannel <- item
}
close(processChannel)
wg.Wait()
}
func poolWorker[K any](ctx *armadacontext.Context, wg *sync.WaitGroup, podsToProcess chan K, processFunc func(K)) {
defer wg.Done()
for pod := range podsToProcess {
// Skip processing once context is finished
if ctx.Err() != nil {
continue
}
processFunc(pod)
}
}