/
config.go
321 lines (282 loc) · 7.38 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
package config
import (
"fmt"
"io/ioutil"
"os"
"sync"
"syscall"
"time"
"gopkg.in/yaml.v2"
"github.com/30x/changeagent/hooks"
)
const (
// DefaultElectionTimeout is the election timeout out of the box.
// It may be modified using UpdateRaftConfig
DefaultElectionTimeout = "10s"
defaultElectionDuration = 10 * time.Second
// DefaultHeartbeatTimeout is the default heartbeat timeout. It may be
// modified using UpdateRaftConfig
DefaultHeartbeatTimeout = "2s"
defaultHeartbeatDuration = 2 * time.Second
minHeartbeatTimeout = 100 * time.Millisecond
minHeartbeatRatio = 2
)
/*
State describes configuration information about the raft service
that is shared around the cluster.
If either MinPurgeRecords OR MinPurgeDuration is set to greater than zero,
then the leader will send a purge request to the quorum every so often.
A State is also an RWMutex so that it can (and should) be locked and
unlocked on each go-around.
*/
type State struct {
latch *sync.RWMutex
// PurgeDuration is parsed from MinPurgeDuration
purgeDuration time.Duration
// ElectionDuration is parsed from ElectionTimeout
electionDuration time.Duration
// HeartbeatDuration is parsed from HeartbeatTimeout
heartbeatDuration time.Duration
// minPurgeRecords comes straight from YAML
minPurgeRecords uint32
webHooks []hooks.WebHook
}
// StoredState is the structure that is read from and written to YAML
type StoredState struct {
MinPurgeRecords uint32 `yaml:"minPurgeRecords"`
MinPurgeDuration string `yaml:"minPurgeDuration"`
ElectionTimeout string `yaml:"electionTimeout"`
HeartbeatTimeout string `yaml:"heartbeatTimeout"`
WebHooks []hooks.WebHook `yaml:"webHooks,omitempty"`
}
/*
MinPurgeRecords defines the minimum number of records that must be
retained on a purge. Default is zero, which means no purging.
*/
func (c *State) MinPurgeRecords() uint32 {
c.RLock()
defer c.RUnlock()
return c.minPurgeRecords
}
/*
SetMinPurgeRecords updates MinPurgeRecords in a thread-safe way.
*/
func (c *State) SetMinPurgeRecords(v uint32) {
c.Lock()
c.minPurgeRecords = v
c.Unlock()
}
/*
MinPurgeDuration defines the minimum amount of time that a record must
remain on the change list before being purged. Default is zero, which
no purging.
*/
func (c *State) MinPurgeDuration() time.Duration {
c.RLock()
defer c.RUnlock()
return c.purgeDuration
}
/*
SetMinPurgeDuration updates MinPurgeDuration in a thread-safe way.
*/
func (c *State) SetMinPurgeDuration(d time.Duration) {
c.Lock()
c.purgeDuration = d
c.Unlock()
}
/*
ElectionTimeout is the amount of time a node will wait once it has heard
from the current leader before it declares itself a candidate.
It must always be a small multiple of HeartbeatTimeout.
*/
func (c *State) ElectionTimeout() time.Duration {
c.RLock()
defer c.RUnlock()
return c.electionDuration
}
/*
SetElectionTimeout updates ElectionTimeout in a thread-safe way.
*/
func (c *State) SetElectionTimeout(d time.Duration) {
c.Lock()
c.electionDuration = d
c.Unlock()
}
/*
HeartbeatTimeout is the amount of time between heartbeat messages from the
leader to other nodes.
*/
func (c *State) HeartbeatTimeout() time.Duration {
c.RLock()
defer c.RUnlock()
return c.heartbeatDuration
}
/*
SetHeartbeatTimeout updates HeartbeatTimeout in a thread-safe way.
*/
func (c *State) SetHeartbeatTimeout(d time.Duration) {
c.Lock()
c.heartbeatDuration = d
c.Unlock()
}
/*
Timeouts is a quick way to get the election and heartbeat timeouts in
one call. The first one returned is the heartbeat timeout, then election
timeout.
*/
func (c *State) Timeouts() (time.Duration, time.Duration) {
c.RLock()
defer c.RUnlock()
return c.heartbeatDuration, c.electionDuration
}
/*
WebHooks are a list of hooks that we might invoke before persisting a
change
*/
func (c *State) WebHooks() []hooks.WebHook {
c.RLock()
defer c.RUnlock()
return c.webHooks
}
/*
SetWebHooks updates the web hooks in a thread-safe way.
*/
func (c *State) SetWebHooks(h []hooks.WebHook) {
c.Lock()
c.webHooks = h
c.Unlock()
}
/*
GetDefaultConfig should be used as the basis for any configuration changes.
*/
func GetDefaultConfig() *State {
cfg := new(State)
cfg.latch = &sync.RWMutex{}
cfg.Lock()
cfg.electionDuration = defaultElectionDuration
cfg.heartbeatDuration = defaultHeartbeatDuration
cfg.Unlock()
return cfg
}
/*
Load replaces the current configuration from a bunch of YAML.
*/
func (c *State) Load(buf []byte) error {
var stored StoredState
err := yaml.Unmarshal(buf, &stored)
if err != nil {
return err
}
c.Lock()
defer c.Unlock()
if stored.MinPurgeDuration != "" {
c.purgeDuration, err = time.ParseDuration(stored.MinPurgeDuration)
if err != nil {
return fmt.Errorf("Error parsing minPurgeDuration: %s", err)
}
}
if stored.ElectionTimeout != "" {
c.electionDuration, err = time.ParseDuration(stored.ElectionTimeout)
if err != nil {
return fmt.Errorf("Error parsing electionTimeout: %s", err)
}
}
if stored.HeartbeatTimeout != "" {
c.heartbeatDuration, err = time.ParseDuration(stored.HeartbeatTimeout)
if err != nil {
return fmt.Errorf("Error parsing heartbeatTimeout: %s", err)
}
}
c.minPurgeRecords = stored.MinPurgeRecords
c.webHooks = stored.WebHooks
err = c.Validate()
if err != nil {
return err
}
return nil
}
/*
LoadFile loads configuration from a file.
*/
func (c *State) LoadFile(fileName string) error {
cf, err := os.Open(fileName)
if err != nil {
return err
}
defer cf.Close()
buf, err := ioutil.ReadAll(cf)
if err != nil {
return err
}
return c.Load(buf)
}
/*
Store returns the encoded configuration as a byte slice.
*/
func (c *State) Store() ([]byte, error) {
c.RLock()
defer c.RUnlock()
stored := StoredState{
MinPurgeRecords: c.minPurgeRecords,
ElectionTimeout: fmt.Sprintf("%s", c.electionDuration),
HeartbeatTimeout: fmt.Sprintf("%s", c.heartbeatDuration),
MinPurgeDuration: fmt.Sprintf("%s", c.purgeDuration),
WebHooks: c.webHooks,
}
return yaml.Marshal(&stored)
}
/*
StoreFile writes the configuration to a file.
*/
func (c *State) StoreFile(fileName string) error {
buf, err := c.Store()
if err != nil {
return err
}
cf, err := os.OpenFile(fileName, syscall.O_WRONLY|syscall.O_CREAT, 0666)
if err != nil {
return err
}
defer cf.Close()
_, err = cf.Write(buf)
return err
}
/*
ShouldPurgeRecords is a convenience that lets us know if both parameters are
set to cause automatic purging to happen.
*/
func (c *State) ShouldPurgeRecords() bool {
c.RLock()
defer c.RUnlock()
return c.minPurgeRecords > 0 && c.purgeDuration > 0
}
/*
Validate parses some of the strings in the configuration and it also
returns an error if basic parameters are not met.
*/
func (c *State) Validate() error {
if c.heartbeatDuration < minHeartbeatTimeout {
return fmt.Errorf("Heartbeat timeout must be at least %s", minHeartbeatTimeout)
}
if c.electionDuration < (c.heartbeatDuration * minHeartbeatRatio) {
return fmt.Errorf("Election timeout %s must be at least %s",
c.electionDuration, c.heartbeatDuration*minHeartbeatRatio)
}
return nil
}
// Lock locks the internal latch just like a Mutex
func (c *State) Lock() {
c.latch.Lock()
}
// Unlock unlocks the internal latch just like a Mutex
func (c *State) Unlock() {
c.latch.Unlock()
}
// RLock locks the internal latch just like an RWMutex
func (c *State) RLock() {
c.latch.RLock()
}
// RUnlock unlocks the internal latch just like an RWMutex
func (c *State) RUnlock() {
c.latch.RUnlock()
}