forked from cloudfoundry-attic/app-manager
-
Notifications
You must be signed in to change notification settings - Fork 0
/
members.go
152 lines (116 loc) · 2.65 KB
/
members.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package grouper
import (
"fmt"
"os"
"github.com/tedsuo/ifrit"
)
type Loader interface {
Load(error) (ifrit.Runner, bool)
}
type Members []Member
type Member struct {
Name string
Runner ifrit.Runner
Restart Restart
}
func (members Members) Load(err error) (ifrit.Runner, bool) {
return members, true
}
func (members Members) Run(sig <-chan os.Signal, ready chan<- struct{}) error {
group := make(pGroup, len(members))
startedChan := make(pMemberChan, len(members))
startedChan.envokeGroup(members, group)
exitedChan := make(exitedChannel, len(group))
exitedChan.waitForGroup(group)
var errToReturn error
desiredCount := len(group)
signaledToStop := false
if ready != nil {
close(ready)
}
for {
if desiredCount == 0 {
return errToReturn
}
select {
case signal := <-sig:
signaledToStop = true
group.Signal(signal)
case pm := <-startedChan:
group[pm.Process] = pm
go exitedChan.waitForProcess(pm.Process)
case e := <-exitedChan:
member, ok := group[e.Process]
if !ok {
panic(fmt.Errorf("Exit for missing process! \nExit: \nErr:%s \n Process: %#v", e.Process))
}
delete(group, e.Process)
desiredCount--
restart := member.Restart
if restart.Signal != Continue {
group.Signal(restart.Signal)
}
if signaledToStop {
continue
}
if e.error != nil {
errToReturn = fmt.Errorf("%s exited with error: %s", member.Name, e.error)
}
if !restart.AttemptRestart {
if restart.Signal != Continue {
signaledToStop = true
}
continue
}
loader, ok := member.Runner.(Loader)
if !ok {
continue
}
nextRunner, ok := loader.Load(e.error)
if !ok {
continue
}
desiredCount++
newMember := Member{member.Name, nextRunner, member.Restart}
go startedChan.envokeMember(newMember)
}
}
}
type exit struct {
ifrit.Process
error
}
type exitedChannel chan exit
func (exitedChan exitedChannel) waitForGroup(group pGroup) {
for p, _ := range group {
go exitedChan.waitForProcess(p)
}
}
func (exitedChan exitedChannel) waitForProcess(p ifrit.Process) {
err := <-p.Wait()
exitedChan <- exit{p, err}
}
type pGroup map[ifrit.Process]pMember
func (group pGroup) Signal(signal os.Signal) {
for p, _ := range group {
p.Signal(signal)
}
}
type pMember struct {
ifrit.Process
Member
}
type pMemberChan chan pMember
func (pmChan pMemberChan) envokeMember(member Member) {
process := ifrit.Envoke(member.Runner)
pmChan <- pMember{process, member}
}
func (pmChan pMemberChan) envokeGroup(group Members, p pGroup) {
for _, member := range group {
go pmChan.envokeMember(member)
}
for _ = range group {
pm := <-pmChan
p[pm.Process] = pm
}
}