forked from cloudfoundry/bosh-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker_pool.go
74 lines (62 loc) · 1.57 KB
/
worker_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package cmd
import bosherr "github.com/cloudfoundry/bosh-utils/errors"
type WorkerPool struct {
WorkerCount int
}
// Runs the given set of tasks in parallel using the configured number of worker go routines
// Will stop adding new tasks if a task throws an error, but will wait for in-flight tasks to finish
func (w WorkerPool) ParallelDo(tasks ...func() (interface{}, error)) ([]interface{}, error) {
jobs := make(chan func() (interface{}, error))
results := make(chan interface{}, len(tasks))
errs := make(chan error, len(tasks))
done := make(chan bool)
for i := 0; i < w.WorkerCount; i++ {
w.spawnWorker(jobs, results, errs, done)
}
for _, task := range tasks {
select {
case jobs <- task:
// add another job
case err := <-errs:
// stop adding jobs
errs <- err
break
}
}
close(jobs)
for i := 0; i < w.WorkerCount; i++ {
<-done
}
close(results)
close(errs)
combinedResults := []interface{}{}
for result := range results {
combinedResults = append(combinedResults, result)
}
var combinedErr error
for err := range errs {
if combinedErr == nil {
combinedErr = err
} else {
combinedErr = bosherr.WrapError(combinedErr, err.Error())
}
}
if combinedErr != nil {
return nil, combinedErr
}
return combinedResults, nil
}
func (w WorkerPool) spawnWorker(tasks <-chan func() (interface{}, error), results chan<- interface{}, errs chan<- error, done chan<- bool) {
go func() {
for task := range tasks {
result, err := task()
if err != nil {
errs <- err
break
} else {
results <- result
}
}
done <- true
}()
}