forked from liamjbennett/sous
-
Notifications
You must be signed in to change notification settings - Fork 0
/
until_ready.go
105 lines (90 loc) · 2.11 KB
/
until_ready.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package test_with_docker
import (
"fmt"
"time"
)
type (
// A ReadyFn is a high-order function that returns a predicate to test for
// readiness and a deferred cleanup function
ReadyFn func() (desc string, test func() bool, defr func())
blank struct{}
// A ReadyError is returned when UntilReady doesn't get a universal ready result
ReadyError struct {
message string
Errs []error
}
)
func (re *ReadyError) Error() string {
str := re.message
for _, e := range re.Errs {
str = str + "\n" + e.Error()
}
return str
}
// UntilReady waits for a series of conditions to be true, with optional cleanup
func UntilReady(d, max time.Duration, fns ...ReadyFn) error {
readies := make(chan error, len(fns))
done := make(chan blank)
re := new(ReadyError)
for _, fn := range fns {
go loopUntilReady(done, readies, d, fn)
}
waitCount := len(fns)
waitCount = fanInReady(waitCount, readies, max, re, fmt.Sprintf("Still unready after %s ", max))
close(done)
fanInReady(waitCount, readies, 3*d, re, "Still waiting on shutdown")
close(readies)
if len(re.Errs) > 0 {
return re
}
return nil
}
func sendPanicAsError(out chan error) {
rec := recover()
if rec == nil {
out <- nil
return
}
if err, ok := rec.(error); ok {
out <- err
return
}
out <- fmt.Errorf("%v", rec)
}
func loopUntilReady(done chan blank, out chan error, pause time.Duration, rfn ReadyFn) {
desc, test, defr := rfn()
defer func() { recover() }()
defer sendPanicAsError(out)
defer defr()
for {
select {
case <-done:
out <- fmt.Errorf("Not ready: %s", desc)
return
default:
if test() {
return
}
}
time.Sleep(pause)
}
}
func fanInReady(waitCount int, readies chan error, to time.Duration, re *ReadyError, exMsg string) int {
timeout := time.After(to)
for waitCount > 0 {
select {
case err := <-readies:
waitCount--
// log.Printf("Got %v %T. Now waiting for %d checks", err, err, waitCount)
if err != nil {
// log.Print(re.Errs)
re.Errs = append(re.Errs, err)
}
case <-timeout:
//log.Printf("timeout")
re.message = re.message + exMsg
return waitCount
}
}
return waitCount
}