-
Notifications
You must be signed in to change notification settings - Fork 67
/
parallel.go
99 lines (91 loc) · 2.77 KB
/
parallel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package validate
import (
"context"
"fmt"
"github.com/armory/spinnaker-operator/pkg/apis/spinnaker/interfaces"
"k8s.io/apimachinery/pkg/util/wait"
)
type ParallelValidator struct {
runInParallel []SpinnakerValidator
}
func (p *ParallelValidator) Validate(spinSvc interfaces.SpinnakerService, options Options) ValidationResult {
var result ValidationResult
valGrp := wait.Group{}
ctx, cancel := context.WithCancel(options.Ctx)
// Replace context
options.Ctx = ctx
resCh := make(chan ValidationResult)
doneCh := make(chan struct{})
for _, v := range p.runInParallel {
func(v SpinnakerValidator) {
valGrp.StartWithContext(ctx, func(ctx context.Context) {
options.Log.Info(fmt.Sprintf("Running validator %T", v))
res := v.Validate(spinSvc, options)
resCh <- res
if res.HasFatalErrors() {
options.Log.Info(fmt.Sprintf("Validator %T detected a fatal error", v))
}
})
}(v)
}
// Close the result channel once all executions have been executed or the context has been canceled
go func() {
valGrp.Wait()
close(doneCh)
}()
for {
select {
case <-doneCh:
return result
case res := <-resCh:
result.Merge(res)
// Cancel the context if a fatal error is detected
// This will effectively abort the validation AS LONG AS the validation
// correctly uses the context.
if res.HasFatalErrors() && spinSvc.GetSpinnakerValidation().FailFast {
cancel()
}
}
}
//return result
}
func (p *ParallelValidator) validateAccountsInParallel(accounts []Account, options Options, f func(Account, Options) ValidationResult) ValidationResult {
options.Log.Info(fmt.Sprintf("Running validation of %d accounts in parallel", len(accounts)))
if len(accounts) == 0 {
return ValidationResult{}
}
resultsChannel := make(chan ValidationResult, len(accounts))
abortSignal := make(chan bool)
for _, a := range accounts {
go func(acc Account, o Options) {
options.Log.Info(fmt.Sprintf("Running account validator in parallel for account: %s", acc.GetName()))
r := f(acc, o)
select {
case <-abortSignal:
options.Log.Info(fmt.Sprintf("Validator %s finished but abort signal found, not saving result", acc.GetName()))
return
default:
options.Log.Info(fmt.Sprintf("Validator %s finished, saving result", acc.GetName()))
resultsChannel <- r
if r.HasFatalErrors() {
options.Log.Info(fmt.Sprintf("Validator %s detected a fatal error, aborting", acc.GetName()))
close(resultsChannel)
close(abortSignal)
}
return
}
}(a, options)
}
i := 0
result := ValidationResult{}
for r := range resultsChannel {
result.Merge(r)
i++
if i == len(accounts) {
close(resultsChannel)
break
}
}
options.Log.Info(fmt.Sprintf("Finished validation of %d accounts in parallel with %d results", len(accounts), i))
return result
}