forked from tsuru/tsuru
-
Notifications
You must be signed in to change notification settings - Fork 0
/
flow.go
105 lines (95 loc) · 2.02 KB
/
flow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// Copyright 2017 tsuru authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package integration
import (
"strconv"
"sync"
"gopkg.in/check.v1"
)
type hookFunc func(c *check.C, env *Environment)
type CmdWithExp struct {
C *Command
E []Expected
}
type ExecFlow struct {
provides []string
requires []string
matrix map[string]string
parallel bool
forward hookFunc
backward hookFunc
}
func (f *ExecFlow) Rollback(c *check.C, env *Environment) {
if f.backward == nil {
return
}
f.forExpanded(env, func(e *Environment) {
f.backward(c, e)
})
}
func (f *ExecFlow) Run(c *check.C, env *Environment) {
if f.forward == nil {
return
}
f.forExpanded(env, func(e *Environment) {
f.forward(c, e)
})
if c.Failed() {
c.FailNow()
}
}
func (f *ExecFlow) expandMatrix(env *Environment) []map[string]string {
expanded := make([]map[string]string, 1)
for k, v := range f.matrix {
values := env.All(v)
entries := []map[string]string{}
for x := range expanded {
for y := range values {
mapValue := map[string]string{}
if expanded[x] != nil {
for k, v := range expanded[x] {
mapValue[k] = v
}
}
mapValue[k] = values[y]
entries = append(entries, mapValue)
}
}
expanded = entries
}
return expanded
}
func (f *ExecFlow) forExpanded(env *Environment, fn func(env *Environment)) {
expanded := f.expandMatrix(env)
wg := sync.WaitGroup{}
maxConcurrency, _ := strconv.Atoi(env.Get("maxconcurrency"))
if maxConcurrency == 0 {
maxConcurrency = 100
}
limiter := make(chan struct{}, maxConcurrency)
expandedloop:
for _, entry := range expanded {
newEnv := env.Clone()
for k, v := range entry {
newEnv.SetLocal(k, v)
}
for _, req := range f.requires {
if !newEnv.Has(req) {
continue expandedloop
}
}
if f.parallel {
wg.Add(1)
go func() {
defer wg.Done()
limiter <- struct{}{}
defer func() { <-limiter }()
fn(newEnv)
}()
} else {
fn(newEnv)
}
}
wg.Wait()
}