forked from kubernetes-sigs/controller-runtime
-
Notifications
You must be signed in to change notification settings - Fork 0
/
process.go
225 lines (197 loc) · 5.32 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
package internal
import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"os/exec"
"path"
"strconv"
"time"
"github.com/onsi/gomega/gbytes"
"github.com/onsi/gomega/gexec"
"github.com/binoue/controller-runtime/pkg/internal/testing/integration/addr"
)
// ProcessState define the state of the process.
type ProcessState struct {
DefaultedProcessInput
Session *gexec.Session
// Healthcheck Endpoint. If we get http.StatusOK from this endpoint, we
// assume the process is ready to operate. E.g. "/healthz". If this is set,
// we ignore StartMessage.
HealthCheckEndpoint string
// HealthCheckPollInterval is the interval which will be used for polling the
// HealthCheckEndpoint.
// If left empty it will default to 100 Milliseconds.
HealthCheckPollInterval time.Duration
// StartMessage is the message to wait for on stderr. If we receive this
// message, we assume the process is ready to operate. Ignored if
// HealthCheckEndpoint is specified.
//
// The usage of StartMessage is discouraged, favour HealthCheckEndpoint
// instead!
//
// Deprecated: Use HealthCheckEndpoint in favour of StartMessage
StartMessage string
Args []string
// ready holds wether the process is currently in ready state (hit the ready condition) or not.
// It will be set to true on a successful `Start()` and set to false on a successful `Stop()`
ready bool
}
// DefaultedProcessInput defines the default process input required to perform the test.
type DefaultedProcessInput struct {
URL url.URL
Dir string
DirNeedsCleaning bool
Path string
StopTimeout time.Duration
StartTimeout time.Duration
}
// DoDefaulting sets the default configuration according to the data informed and return an DefaultedProcessInput
// and an error if some requirement was not informed.
func DoDefaulting(
name string,
listenURL *url.URL,
dir string,
path string,
startTimeout time.Duration,
stopTimeout time.Duration,
) (DefaultedProcessInput, error) {
defaults := DefaultedProcessInput{
Dir: dir,
Path: path,
StartTimeout: startTimeout,
StopTimeout: stopTimeout,
}
if listenURL == nil {
port, host, err := addr.Suggest("")
if err != nil {
return DefaultedProcessInput{}, err
}
defaults.URL = url.URL{
Scheme: "http",
Host: net.JoinHostPort(host, strconv.Itoa(port)),
}
} else {
defaults.URL = *listenURL
}
if dir == "" {
newDir, err := ioutil.TempDir("", "k8s_test_framework_")
if err != nil {
return DefaultedProcessInput{}, err
}
defaults.Dir = newDir
defaults.DirNeedsCleaning = true
}
if path == "" {
if name == "" {
return DefaultedProcessInput{}, fmt.Errorf("must have at least one of name or path")
}
defaults.Path = BinPathFinder(name)
}
if startTimeout == 0 {
defaults.StartTimeout = 20 * time.Second
}
if stopTimeout == 0 {
defaults.StopTimeout = 20 * time.Second
}
return defaults, nil
}
type stopChannel chan struct{}
// Start starts the apiserver, waits for it to come up, and returns an error,
// if occurred.
func (ps *ProcessState) Start(stdout, stderr io.Writer) (err error) {
if ps.ready {
return nil
}
command := exec.Command(ps.Path, ps.Args...)
ready := make(chan bool)
timedOut := time.After(ps.StartTimeout)
var pollerStopCh stopChannel
if ps.HealthCheckEndpoint != "" {
healthCheckURL := ps.URL
healthCheckURL.Path = ps.HealthCheckEndpoint
pollerStopCh = make(stopChannel)
go pollURLUntilOK(healthCheckURL, ps.HealthCheckPollInterval, ready, pollerStopCh)
} else {
startDetectStream := gbytes.NewBuffer()
ready = startDetectStream.Detect(ps.StartMessage)
stderr = safeMultiWriter(stderr, startDetectStream)
}
ps.Session, err = gexec.Start(command, stdout, stderr)
if err != nil {
return err
}
select {
case <-ready:
ps.ready = true
return nil
case <-timedOut:
if pollerStopCh != nil {
close(pollerStopCh)
}
if ps.Session != nil {
ps.Session.Terminate()
}
return fmt.Errorf("timeout waiting for process %s to start", path.Base(ps.Path))
}
}
func safeMultiWriter(writers ...io.Writer) io.Writer {
safeWriters := []io.Writer{}
for _, w := range writers {
if w != nil {
safeWriters = append(safeWriters, w)
}
}
return io.MultiWriter(safeWriters...)
}
func pollURLUntilOK(url url.URL, interval time.Duration, ready chan bool, stopCh stopChannel) {
if interval <= 0 {
interval = 100 * time.Millisecond
}
for {
res, err := http.Get(url.String())
if err == nil {
res.Body.Close()
if res.StatusCode == http.StatusOK {
ready <- true
return
}
}
select {
case <-stopCh:
return
default:
time.Sleep(interval)
}
}
}
// Stop stops this process gracefully, waits for its termination, and cleans up
// the CertDir if necessary.
func (ps *ProcessState) Stop() error {
if ps.Session == nil {
return nil
}
// gexec's Session methods (Signal, Kill, ...) do not check if the Process is
// nil, so we are doing this here for now.
// This should probably be fixed in gexec.
if ps.Session.Command.Process == nil {
return nil
}
detectedStop := ps.Session.Terminate().Exited
timedOut := time.After(ps.StopTimeout)
select {
case <-detectedStop:
break
case <-timedOut:
return fmt.Errorf("timeout waiting for process %s to stop", path.Base(ps.Path))
}
ps.ready = false
if ps.DirNeedsCleaning {
return os.RemoveAll(ps.Dir)
}
return nil
}