/
wait.go
161 lines (137 loc) · 3.91 KB
/
wait.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package wait
import (
"bytes"
"context"
"io"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/go-connections/nat"
"github.com/lestrrat-go/option"
)
type Waiter struct {
interval time.Duration
timeout time.Duration
check CheckFunc
}
// Fetcher provides several ways to access the state of the container.
type Fetcher interface {
ContainerID() string
Status(ctx context.Context) (*types.ContainerState, error)
Ports() nat.PortMap
Log(ctx context.Context) (io.ReadCloser, error)
Exec(ctx context.Context, cmd ...string) ([]byte, error)
}
type (
Option interface {
option.Interface
wait() Option
}
identOptionInterval struct{}
identOptionTimeout struct{}
waitOption struct{ option.Interface }
)
func (o waitOption) wait() Option { return o }
// WithInterval sets the interval between container readiness checks.
func WithInterval(d time.Duration) Option {
return waitOption{
Interface: option.New(identOptionInterval{}, d),
}.wait()
}
// WithTimeout sets the timeout for waiting for the container to be ready.
func WithTimeout(d time.Duration) Option {
return waitOption{
Interface: option.New(identOptionTimeout{}, d),
}.wait()
}
const (
defaultInterval = 500 * time.Millisecond
defaultTimeout = 30 * time.Second
)
type CheckFunc func(ctx context.Context, f Fetcher) (bool, error)
// New creates a Waiter that waits for the container to be ready.
// CheckFunc is the criteria for evaluating readiness. Use Fetcher to obtain
// the container status.
//
// Waiter repeatedly checks the readiness until first success. We can set
// interval and timeout by WithInterval and WithTimeout. The default value for
// the interval is 500ms and for the timeout is 30sec.
func New(check CheckFunc, opts ...Option) *Waiter {
w := &Waiter{
interval: defaultInterval,
timeout: defaultTimeout,
check: check,
}
for _, opt := range opts {
switch opt.Ident() {
case identOptionInterval{}:
w.interval = opt.Value().(time.Duration)
case identOptionTimeout{}:
w.timeout = opt.Value().(time.Duration)
}
}
return w
}
// LogContains waits for the given number of occurrences of the given message
// in the container log.
func LogContains(message string, occurrence int, opts ...Option) *Waiter {
return New(CheckLogOccurrence(message, occurrence), opts...)
}
// CheckLogOccurrence creates CheckFunc. See LogContains.
func CheckLogOccurrence(message string, occurrence int) CheckFunc {
msg := []byte(message)
return func(ctx context.Context, f Fetcher) (bool, error) {
rc, err := f.Log(ctx)
if err != nil {
return false, err
}
defer func() {
_ = rc.Close()
}()
data, err := io.ReadAll(rc)
if err != nil {
return false, err
}
return bytes.Count(data, msg) >= occurrence, nil
}
}
// Healthy waits for the container's health status to be healthy.
func Healthy(opts ...Option) *Waiter {
return New(CheckHealthy, opts...)
}
// CheckHealthy is a CheckFunc. See Healthy.
func CheckHealthy(ctx context.Context, f Fetcher) (bool, error) {
status, err := f.Status(ctx)
if err != nil {
return false, err
}
return status.Health != nil && status.Health.Status == "healthy", nil
}
// CommandSucceeds waits for the success of given command.
func CommandSucceeds(cmd []string, opts ...Option) *Waiter {
return New(CheckCommandSucceeds(cmd), opts...)
}
// CheckCommandSucceeds creates CheckFunc. See CommandSucceeds.
func CheckCommandSucceeds(cmd []string) CheckFunc {
return func(ctx context.Context, f Fetcher) (bool, error) {
_, err := f.Exec(ctx, cmd...)
return err == nil, nil
}
}
// Wait calls CheckFunc with given Fetcher repeatedly until the first success.
func (w *Waiter) Wait(ctx context.Context, f Fetcher) error {
ctx, cancel := context.WithTimeout(ctx, w.timeout)
defer cancel()
for {
ok, err := w.check(ctx, f)
if err != nil {
return err
} else if ok {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(w.interval):
}
}
}