-
Notifications
You must be signed in to change notification settings - Fork 2k
/
retry_join.go
154 lines (129 loc) · 4.67 KB
/
retry_join.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package agent
import (
"fmt"
"log"
"strings"
"time"
)
// DiscoverInterface is an interface for the Discover type in the go-discover
// library. Using an interface allows for ease of testing.
type DiscoverInterface interface {
// Addrs discovers ip addresses of nodes that match the given filter
// criteria.
// The config string must have the format 'provider=xxx key=val key=val ...'
// where the keys and values are provider specific. The values are URL
// encoded.
Addrs(string, *log.Logger) ([]string, error)
// Help describes the format of the configuration string for address
// discovery and the various provider specific options.
Help() string
// Names returns the names of the configured providers.
Names() []string
}
// retryJoiner is used to handle retrying a join until it succeeds or all of
// its tries are exhausted.
type retryJoiner struct {
// serverJoin adds the specified servers to the serf cluster
serverJoin func([]string) (int, error)
// serverEnabled indicates whether the nomad agent will run in server mode
serverEnabled bool
// clientJoin adds the specified servers to the serf cluster
clientJoin func([]string) (int, error)
// clientEnabled indicates whether the nomad agent will run in client mode
clientEnabled bool
// discover is of type Discover, where this is either the go-discover
// implementation or a mock used for testing
discover DiscoverInterface
// errCh is used to communicate with the agent when the max retry attempt
// limit has been reached
errCh chan struct{}
// logger is the agent logger.
logger *log.Logger
}
// Validate ensures that the configuration passes validity checks for the
// retry_join stanza. If the configuration is not valid, returns an error that
// will be displayed to the operator, otherwise nil.
func (r *retryJoiner) Validate(config *Config) error {
// If retry_join is defined for the server, ensure that deprecated
// fields and the server_join stanza are not both set
if config.Server != nil && config.Server.ServerJoin != nil && len(config.Server.ServerJoin.RetryJoin) != 0 {
if len(config.Server.RetryJoin) != 0 {
return fmt.Errorf("server_join and retry_join cannot both be defined; prefer setting the server_join stanza")
}
if len(config.Server.StartJoin) != 0 {
return fmt.Errorf("server_join and start_join cannot both be defined; prefer setting the server_join stanza")
}
if config.Server.RetryMaxAttempts != 0 {
return fmt.Errorf("server_join and retry_max cannot both be defined; prefer setting the server_join stanza")
}
if config.Server.RetryInterval != 0 {
return fmt.Errorf("server_join and retry_interval cannot both be defined; prefer setting the server_join stanza")
}
if len(config.Server.ServerJoin.StartJoin) != 0 {
return fmt.Errorf("retry_join and start_join cannot both be defined")
}
}
// if retry_join is defined for the client, ensure that start_join is not
// set as this configuration is only defined for servers.
if config.Client != nil && config.Client.ServerJoin != nil {
if config.Client.ServerJoin.StartJoin != nil {
return fmt.Errorf("start_join is not supported for Nomad clients")
}
}
return nil
}
// retryJoin is used to handle retrying a join until it succeeds or all retries
// are exhausted.
func (r *retryJoiner) RetryJoin(serverJoin *ServerJoin) {
if len(serverJoin.RetryJoin) == 0 {
return
}
attempt := 0
addrsToJoin := strings.Join(serverJoin.RetryJoin, " ")
r.logger.Printf("[INFO] agent: Joining cluster... %s", addrsToJoin)
for {
var addrs []string
var n int
var err error
for _, addr := range serverJoin.RetryJoin {
switch {
case strings.HasPrefix(addr, "provider="):
servers, err := r.discover.Addrs(addr, r.logger)
if err != nil {
r.logger.Printf("[ERR] agent: Join error %s", err)
} else {
addrs = append(addrs, servers...)
}
default:
addrs = append(addrs, addr)
}
}
if len(addrs) > 0 {
if r.serverEnabled && r.serverJoin != nil {
n, err = r.serverJoin(addrs)
if err == nil {
r.logger.Printf("[INFO] agent: Join completed. Server synced with %d initial servers", n)
return
}
}
if r.clientEnabled && r.clientJoin != nil {
n, err = r.clientJoin(addrs)
if err == nil {
r.logger.Printf("[INFO] agent: Join completed. Client synced with %d initial servers", n)
return
}
}
}
attempt++
if serverJoin.RetryMaxAttempts > 0 && attempt > serverJoin.RetryMaxAttempts {
r.logger.Printf("[ERR] agent: max join retry exhausted, exiting")
close(r.errCh)
return
}
if err != nil {
r.logger.Printf("[WARN] agent: Join failed: %q, retrying in %v", err,
serverJoin.RetryInterval)
}
time.Sleep(serverJoin.RetryInterval)
}
}