forked from hashicorp/serf
/
config.go
433 lines (370 loc) · 12.5 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
package agent
import (
"encoding/base64"
"encoding/json"
"fmt"
"github.com/hashicorp/serf/serf"
"github.com/mitchellh/mapstructure"
"io"
"net"
"os"
"path/filepath"
"sort"
"strings"
"time"
)
// This is the default port that we use for Serf communication
const DefaultBindPort int = 7946
// DefaultConfig contains the defaults for configurations.
func DefaultConfig() *Config {
return &Config{
Tags: make(map[string]string),
BindAddr: "0.0.0.0",
AdvertiseAddr: "",
LogLevel: "INFO",
RPCAddr: "127.0.0.1:7373",
Protocol: serf.ProtocolVersionMax,
ReplayOnJoin: false,
Profile: "lan",
}
}
type dirEnts []os.FileInfo
// Config is the configuration that can be set for an Agent. Some of these
// configurations are exposed as command-line flags to `serf agent`, whereas
// many of the more advanced configurations can only be set by creating
// a configuration file.
type Config struct {
// All the configurations in this section are identical to their
// Serf counterparts. See the documentation for Serf.Config for
// more info.
NodeName string `mapstructure:"node_name"`
Role string `mapstructure:"role"`
// Tags are used to attach key/value metadata to a node. They have
// replaced 'Role' as a more flexible meta data mechanism. For compatibility,
// the 'role' key is special, and is used for backwards compatibility.
Tags map[string]string `mapstructure:"tags"`
// BindAddr is the address that the Serf agent's communication ports
// will bind to. Serf will use this address to bind to for both TCP
// and UDP connections. If no port is present in the address, the default
// port will be used.
BindAddr string `mapstructure:"bind"`
// AdvertiseAddr is the address that the Serf agent will advertise to
// other members of the cluster. Can be used for basic NAT traversal
// where both the internal ip:port and external ip:port are known.
AdvertiseAddr string `mapstructure:"advertise"`
// EncryptKey is the secret key to use for encrypting communication
// traffic for Serf. The secret key must be exactly 16-bytes, base64
// encoded. The easiest way to do this on Unix machines is this command:
// "head -c16 /dev/urandom | base64". If this is not specified, the
// traffic will not be encrypted.
EncryptKey string `mapstructure:"encrypt_key"`
// LogLevel is the level of the logs to output.
// This can be updated during a reload.
LogLevel string `mapstructure:"log_level"`
// RPCAddr is the address and port to listen on for the agent's RPC
// interface.
RPCAddr string `mapstructure:"rpc_addr"`
// RPCAuthKey is a key that can be set to optionally require that
// RPC's provide an authentication key. This is meant to be
// a very simple authentication control
RPCAuthKey string `mapstructure:"rpc_auth"`
// Protocol is the Serf protocol version to use.
Protocol int `mapstructure:"protocol"`
// ReplayOnJoin tells Serf to replay past user events
// when joining based on a `StartJoin`.
ReplayOnJoin bool `mapstructure:"replay_on_join"`
// StartJoin is a list of addresses to attempt to join when the
// agent starts. If Serf is unable to communicate with any of these
// addresses, then the agent will error and exit.
StartJoin []string `mapstructure:"start_join"`
// EventHandlers is a list of event handlers that will be invoked.
// These can be updated during a reload.
EventHandlers []string `mapstructure:"event_handlers"`
// Profile is used to select a timing profile for Serf. The supported choices
// are "wan", "lan", and "local". The default is "lan"
Profile string `mapstructure:"profile"`
// SnapshotPath is used to allow Serf to snapshot important transactional
// state to make a more graceful recovery possible. This enables auto
// re-joining a cluster on failure and avoids old message replay.
SnapshotPath string `mapstructure:"snapshot_path"`
// LeaveOnTerm controls if Serf does a graceful leave when receiving
// the TERM signal. Defaults false. This can be changed on reload.
LeaveOnTerm bool `mapstructure:"leave_on_terminate"`
// SkipLeaveOnInt controls if Serf skips a graceful leave when receiving
// the INT signal. Defaults false. This can be changed on reload.
SkipLeaveOnInt bool `mapstructure:"skip_leave_on_interrupt"`
// Discover is used to setup an mDNS Discovery name. When this is set, the
// agent will setup an mDNS responder and periodically run an mDNS query
// to look for peers. For peers on a network that supports multicast, this
// allows Serf agents to join each other with zero configuration.
Discover string `mapstructure:"discover"`
// Interface is used to provide a binding interface to use. It can be
// used instead of providing a bind address, as Serf will discover the
// address of the provided interface. It is also used to set the multicast
// device used with `-discover`.
Interface string `mapstructure:"interface"`
// ReconnectIntervalRaw is the string reconnect interval time. This interval
// controls how often we attempt to connect to a failed node.
ReconnectIntervalRaw string `mapstructure:"reconnect_interval"`
ReconnectInterval time.Duration `mapstructure:"-"`
// ReconnectTimeoutRaw is the string reconnect timeout. This timeout controls
// for how long we attempt to connect to a failed node before removing
// it from the cluster.
ReconnectTimeoutRaw string `mapstructure:"reconnect_timeout"`
ReconnectTimeout time.Duration `mapstructure:"-"`
// TombstoneTimeoutRaw is the string tombstone timeout. This timeout controls
// for how long we remember a left node before removing it from the cluster.
TombstoneTimeoutRaw string `mapstructure:"tombstone_timeout"`
TombstoneTimeout time.Duration `mapstructure:"-"`
// By default Serf will attempt to resolve name conflicts. This is done by
// determining which node the majority believe to be the proper node, and
// by having the minority node shutdown. If you want to disable this behavior,
// then this flag can be set to true.
DisableNameResolution bool `mapstructure:"disable_name_resolution"`
}
// BindAddrParts returns the parts of the BindAddr that should be
// used to configure Serf.
func (c *Config) AddrParts(address string) (string, int, error) {
checkAddr := address
START:
_, _, err := net.SplitHostPort(checkAddr)
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
checkAddr = fmt.Sprintf("%s:%d", checkAddr, DefaultBindPort)
goto START
}
if err != nil {
return "", 0, err
}
// Get the address
addr, err := net.ResolveTCPAddr("tcp", checkAddr)
if err != nil {
return "", 0, err
}
return addr.IP.String(), addr.Port, nil
}
// EncryptBytes returns the encryption key configured.
func (c *Config) EncryptBytes() ([]byte, error) {
return base64.StdEncoding.DecodeString(c.EncryptKey)
}
// EventScripts returns the list of EventScripts associated with this
// configuration and specified by the "event_handlers" configuration.
func (c *Config) EventScripts() []EventScript {
result := make([]EventScript, 0, len(c.EventHandlers))
for _, v := range c.EventHandlers {
part := ParseEventScript(v)
result = append(result, part...)
}
return result
}
// Networkinterface is used to get the associated network
// interface from the configured value
func (c *Config) NetworkInterface() (*net.Interface, error) {
if c.Interface == "" {
return nil, nil
}
return net.InterfaceByName(c.Interface)
}
// DecodeConfig reads the configuration from the given reader in JSON
// format and decodes it into a proper Config structure.
func DecodeConfig(r io.Reader) (*Config, error) {
var raw interface{}
dec := json.NewDecoder(r)
if err := dec.Decode(&raw); err != nil {
return nil, err
}
// Decode
var md mapstructure.Metadata
var result Config
msdec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
Metadata: &md,
Result: &result,
})
if err != nil {
return nil, err
}
if err := msdec.Decode(raw); err != nil {
return nil, err
}
// Decode the time values
if result.ReconnectIntervalRaw != "" {
dur, err := time.ParseDuration(result.ReconnectIntervalRaw)
if err != nil {
return nil, err
}
result.ReconnectInterval = dur
}
if result.ReconnectTimeoutRaw != "" {
dur, err := time.ParseDuration(result.ReconnectTimeoutRaw)
if err != nil {
return nil, err
}
result.ReconnectTimeout = dur
}
if result.TombstoneTimeoutRaw != "" {
dur, err := time.ParseDuration(result.TombstoneTimeoutRaw)
if err != nil {
return nil, err
}
result.TombstoneTimeout = dur
}
return &result, nil
}
// containsKey is used to check if a slice of string keys contains
// another key
func containsKey(keys []string, key string) bool {
for _, k := range keys {
if k == key {
return true
}
}
return false
}
// MergeConfig merges two configurations together to make a single new
// configuration.
func MergeConfig(a, b *Config) *Config {
var result Config = *a
// Copy the strings if they're set
if b.NodeName != "" {
result.NodeName = b.NodeName
}
if b.Role != "" {
result.Role = b.Role
}
if b.Tags != nil {
if result.Tags == nil {
result.Tags = make(map[string]string)
}
for name, value := range b.Tags {
result.Tags[name] = value
}
}
if b.BindAddr != "" {
result.BindAddr = b.BindAddr
}
if b.AdvertiseAddr != "" {
result.AdvertiseAddr = b.AdvertiseAddr
}
if b.EncryptKey != "" {
result.EncryptKey = b.EncryptKey
}
if b.LogLevel != "" {
result.LogLevel = b.LogLevel
}
if b.Protocol > 0 {
result.Protocol = b.Protocol
}
if b.RPCAddr != "" {
result.RPCAddr = b.RPCAddr
}
if b.RPCAuthKey != "" {
result.RPCAuthKey = b.RPCAuthKey
}
if b.ReplayOnJoin != false {
result.ReplayOnJoin = b.ReplayOnJoin
}
if b.Profile != "" {
result.Profile = b.Profile
}
if b.SnapshotPath != "" {
result.SnapshotPath = b.SnapshotPath
}
if b.LeaveOnTerm == true {
result.LeaveOnTerm = true
}
if b.SkipLeaveOnInt == true {
result.SkipLeaveOnInt = true
}
if b.Discover != "" {
result.Discover = b.Discover
}
if b.Interface != "" {
result.Interface = b.Interface
}
if b.ReconnectInterval != 0 {
result.ReconnectInterval = b.ReconnectInterval
}
if b.ReconnectTimeout != 0 {
result.ReconnectTimeout = b.ReconnectTimeout
}
if b.TombstoneTimeout != 0 {
result.TombstoneTimeout = b.TombstoneTimeout
}
if b.DisableNameResolution {
result.DisableNameResolution = true
}
// Copy the event handlers
result.EventHandlers = make([]string, 0, len(a.EventHandlers)+len(b.EventHandlers))
result.EventHandlers = append(result.EventHandlers, a.EventHandlers...)
result.EventHandlers = append(result.EventHandlers, b.EventHandlers...)
// Copy the start join addresses
result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin))
result.StartJoin = append(result.StartJoin, a.StartJoin...)
result.StartJoin = append(result.StartJoin, b.StartJoin...)
return &result
}
// ReadConfigPaths reads the paths in the given order to load configurations.
// The paths can be to files or directories. If the path is a directory,
// we read one directory deep and read any files ending in ".json" as
// configuration files.
func ReadConfigPaths(paths []string) (*Config, error) {
result := new(Config)
for _, path := range paths {
f, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("Error reading '%s': %s", path, err)
}
fi, err := f.Stat()
if err != nil {
f.Close()
return nil, fmt.Errorf("Error reading '%s': %s", path, err)
}
if !fi.IsDir() {
config, err := DecodeConfig(f)
f.Close()
if err != nil {
return nil, fmt.Errorf("Error decoding '%s': %s", path, err)
}
result = MergeConfig(result, config)
continue
}
contents, err := f.Readdir(-1)
f.Close()
if err != nil {
return nil, fmt.Errorf("Error reading '%s': %s", path, err)
}
// Sort the contents, ensures lexical order
sort.Sort(dirEnts(contents))
for _, fi := range contents {
// Don't recursively read contents
if fi.IsDir() {
continue
}
// If it isn't a JSON file, ignore it
if !strings.HasSuffix(fi.Name(), ".json") {
continue
}
subpath := filepath.Join(path, fi.Name())
f, err := os.Open(subpath)
if err != nil {
return nil, fmt.Errorf("Error reading '%s': %s", subpath, err)
}
config, err := DecodeConfig(f)
f.Close()
if err != nil {
return nil, fmt.Errorf("Error decoding '%s': %s", subpath, err)
}
result = MergeConfig(result, config)
}
}
return result, nil
}
// Implement the sort interface for dirEnts
func (d dirEnts) Len() int {
return len(d)
}
func (d dirEnts) Less(i, j int) bool {
return d[i].Name() < d[j].Name()
}
func (d dirEnts) Swap(i, j int) {
d[i], d[j] = d[j], d[i]
}