-
Notifications
You must be signed in to change notification settings - Fork 1
/
raft.go
538 lines (469 loc) · 22.2 KB
/
raft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
package raft
import (
"context"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"sync"
"time"
)
// NodeConfig is the mandatory configuration for the local node. Every field in NodeConfig must be provided by
// application using the raft package. NodeConfig is passed in when starting up the node using MakeNode.
// (Optional configuration can be passed in using WithX NodeOption options).
type NodeConfig struct {
//
// Raft cluster node addresses. Nodes should include all the node addresses including the local one, in the
// form address:port. This makes configuration easy in that all nodes can share the same configuration.
//
// The order of the nodes (ignoring the local node) is also interpreted as the order of preference to transfer
// leadership to in case we need to transfer. Note that this tie breaker only kicks in amongst nodes
// which have a log matched to ours.
Nodes []string
//
// Application provides a channel over which committed log commands are published for the application to consume.
// The log commands are opaque to the raft package.
LogCmds chan []byte
//
// LogDB points at file location which is the home of the persisted logDB.
LogDB string
//
// Private fields below are set throught he WithX options passed in to make node.
//
// Pass in method which provides dial options to use when connecting as gRPC client with other nodes as servers.
// Exposing this configuration allows application to determine whether, for example, to use TLS in raft exchanges.
// The callback passes in the local node and remote node (as in Nodes above) for which we are setting up client
// connection.
clientDialOptionsFn func(local, remote string) []grpc.DialOption
//
// Pass in method which provides server side grpc options. These will be merged in with default options, with
// default options overridden if provided in configuration. The callback passes in the local node.
serverOptionsFn func(local string) []grpc.ServerOption
//
// Channel depths (optional). If not set, depths will default to sensible values.
channelDepth struct {
clientEvents int32
serverEvents int32
}
//
// Configurable raft timers.
timers struct {
// leaderTimeout is used to determine the maximum period which will elapse without getting AppendEntry
// messages from the leader. On the follower side, a random value between leaderTimeout and 2*leaderTimeout is
// used by the raft package to determine when to force an election. On the leader side, leader will attempt
// to send at least one AppendEntry (possible empty if necessary) within ever leaderTimeout period. Randomized
// election timeouts minimise the probability of split votes and resolves them quickly when they happen as
// described in Section 3.4 of CBTP. leaderTimeout defaults to 2s if not set.
leaderTimeout time.Duration
// gRPCTimeout is used to determine when we should give up on an attempt to invoke an RPC.
gRPCTimeout time.Duration
}
//
// Maximum size of batch of commands sent in AppendEntry. Defaults to 32 if not set.
logCmdBatchSize int32
}
const minNodesInCluster = 3
// NodeConfig.validate: provides validation function for the configuration presented by user. Defaults are also
// set if necessary.
func (cfg *NodeConfig) validate(localNodeIndex int32) error {
if len(cfg.Nodes) < minNodesInCluster {
return raftErrorf(
RaftErrorMissingNodeConfig,
"not enough endpoints specified in Nodes %s, expect at least %d "+
"e.g. 'n1.example.com:443','n3.example.com:443','n3.example.com:443'",
cfg.Nodes, minNodesInCluster)
}
if cfg.LogCmds == nil {
return raftErrorf(
RaftErrorMissingNodeConfig,
"missing LogCmds, a channel over which raft package will publish committed log commands")
}
if int32(len(cfg.Nodes)) <= localNodeIndex {
return raftErrorf(
RaftErrorBadLocalNodeIndex,
"localNodeIndex specified %d is out of bounds for number endpoints specified in Nodes %d",
localNodeIndex, len(cfg.Nodes))
}
if cfg.timers.leaderTimeout == 0 {
cfg.timers.leaderTimeout = time.Duration(2 * time.Second)
}
if cfg.timers.gRPCTimeout == 0 {
// No use waiting for much longer than this. Timeouts will be firing all over the place soon enough.
cfg.timers.gRPCTimeout = cfg.timers.leaderTimeout >> 1
}
if cfg.channelDepth.clientEvents == 0 {
cfg.channelDepth.clientEvents = 32
}
if cfg.channelDepth.serverEvents == 0 {
cfg.channelDepth.serverEvents = 32
}
if cfg.logCmdBatchSize == 0 {
cfg.logCmdBatchSize = 32
}
// Default to insecure dial options (i.e. no underlying TLS), with reasonable dial backoff and timeout options.
cfg.clientDialOptionsFn = func(l, r string) []grpc.DialOption {
return []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBackoffMaxDelay(cfg.timers.leaderTimeout >> 2), // default to connect 4x times within a timeout period
}
}
if cfg.LogDB == "" {
return raftErrorf(
RaftErrorMissingNodeConfig,
"missing LogDB, a file name where local node persists both metadata and log entries")
}
return nil
}
// Node tracks the state and configuration of this local node. Public access to services provided by node are concurrency
// safe. Node structure carries the state of the local running raft instance.
type Node struct {
// My index in the cluster. This attribute is set once and is immutable (i.e. can be accessed from anywhwere).
index int32
// Readonly state provided when the Node is created.
config *NodeConfig
// raftEngine implements the raft state machine.
engine *raftEngine
// Server and client side state for messaging (independent of role). Messaging is implemented largely in raft_grpc.go.
messaging *raftMessaging
// fatalErrorFeedback feeds back fatal errors to the client.
// Do not push into channel directly; use signalFatalError().
fatalErrorFeedback chan error
// We also remember we have take a fatal error, in order to avoid graceful shutdown attempt.
fatalErrorCount *atomic.Int32
// Track rootCancel function used to clean up autonomously on fatal errors.
cancel context.CancelFunc
// metrics structure associated with this node.
metrics *metricsHolder
// logger for Node, configurable through WithLogger, or WithPreConfiguredLogger options.
logger *zap.SugaredLogger
// Very low level debugging.
verboseLogging bool
}
// FatalErrorChannel returns an error channel which is used by the raft Node to signal an unrecoverable failure
// asynchronously to the application. Such errors are expected to occur with vanishingly small probability.
// An example of such an error would be if the dial options or gRPC server options provided make it impossible
// for the client to successfully connect with the server (RaftErrorClientConnectionUnrecoverable). When a fatal
// error is registered, raft package will stop operating, and will mark the root wait group done.
func (n *Node) FatalErrorChannel() chan error {
return n.fatalErrorFeedback
}
func (n *Node) logKV() []interface{} {
kv := []interface{}{"obj", "Node"}
if n.messaging.server != nil {
kv = append(kv, n.messaging.server.logKV()...)
}
kv = append(kv, "obj", "localNode", "clients", len(n.messaging.clients),
"fatalErrorCount", n.fatalErrorCount.Load())
return kv
}
// signalFatalError allows package to indicate fatal error to user. This will typically be followed by the client
// shutting down by cancelling context. If the buffered channel is full, we would just skip asking yet again.
func (n *Node) signalFatalError(err error) {
n.fatalErrorCount.Inc()
select {
case n.fatalErrorFeedback <- err:
n.logger.Errorw("raft, signalling fatal error", raftErrKeyword, err.Error())
n.cancel()
default:
// If pushing to fatalErrorFeedback would block, then we don't bother. Because we are using a buffered channel,
// if we get here it means that the channel is busy already - one fatal error is as good as many.
n.logger.Errorw("raft, skipped signalling fatal error, signalled already", raftErrKeyword, err.Error())
}
}
func (n *Node) mustGetClientFromId(clientIndex int32) *raftClient {
var rc *raftClient
defer func() {
if rc == nil {
n.signalFatalError(raftErrorf(
RaftErrorMustFailed, "fetching client %d expected to exist", clientIndex))
}
}()
if n != nil && n.messaging != nil {
rc = n.messaging.clients[clientIndex]
}
return rc
}
// NodeOption operator, operates on node to manage configuration.
type NodeOption func(*Node) error
// WithLogger option is invoked by the application to provide a customised zap logger option, or to disable logging.
// The NodeOption returned by WithLogger is passed in to MakeNode to control logging; e.g. to provide a preconfigured
// application logger. If logger passed in is nil, raft will disable logging.
//
// If WithLogger generated NodeOption is not passed in, package uses its own configured zap logger.
//
// Finally, if application wishes to derive its logger as some variant of the default raft logger, application can invoke
// DefaultZapLoggerConfig() to fetch a default logger configuration. It can use that configuration (modified as necessary)
// to build a new logger directly through zap library. That new logger can then be passed into WithLogger to generate
// the appropriate node option. An example of exactly this use case is available in the godoc examples. In the example,
// the logger configuration is set up to allow for on-the-fly changes to logging level.
//
// verboseLogging controls whether raft package redirects underlying gprc middleware logging to zap log and includes
// ultra low level debugging messages including keepalives. This makes debug very noisy, and unless in depth low level
// message troubleshooting is required, verboseLogging should be set to false.
func WithLogger(logger *zap.Logger, verboseLogging bool) NodeOption {
return func(n *Node) error {
if logger != nil {
n.logger = logger.Sugar()
} else {
n.logger = zap.NewNop().Sugar()
}
n.verboseLogging = verboseLogging
return nil
}
}
// WithMetrics option used with MakeNode to specify metrics registry we should count in. Argument namespace specifies
// the namespace for the metrics. This is useful if the application prefixes all its metrics with a prefix. For
// e.g. if namespace is 'foo', then all raft package metrics will be prefixed with 'foo.raft.'. Argument detailed
// controls whether detailed (and more expensive) metrics are tracked (e.g. grpc latency distribution).
// If nil is passed in for the registry, the default registry prometheus.DefaultRegisterer is used. Do note that
// the package does not setup serving metrics; that is up to the application. Examples are included to show how to
// setup a custom metrics registry to log against. If the WithMetrics NodeOption is NOT passed in to MakeNode, metrics
// collection is disabled.
func WithMetrics(registry *prometheus.Registry, namespace string, detailed bool) NodeOption {
return func(n *Node) error {
n.metrics = initMetrics(registry, namespace, detailed, n.index)
return nil
}
}
// WithLeaderTimeout used with MakeNode to specify leader timeout: leader timeout is used to determine the maximum period
// which can elapse without getting AppendEntry messages from the leader without forcing a new election. On the follower
// side, a random value between leaderTimeout and 2*leaderTimeout is used by the raft package to determine when to force
// an election. On the leader side, leader will attempt to send at least one AppendEntry (possible empty if necessary)
// within ever leaderTimeout period. Randomized election timeouts minimise the probability of split votes and resolves
// them quickly when they happen as described in Section 3.4 of CBTP. LeaderTimeout defaults to 2s if not set.
func WithLeaderTimeout(leaderTimeout time.Duration) NodeOption {
return func(n *Node) error {
n.config.timers.leaderTimeout = leaderTimeout
return nil
}
}
// WithUnaryGRPCTimeout used with MakeNode to specify timeout in gRPC RPC commands. By default, gRPC timeouts are set to
// 5s.
func WithUnaryGRPCTimeout(d time.Duration) NodeOption {
return func(n *Node) error {
n.config.timers.gRPCTimeout = d
return nil
}
}
// WithChannelDepthToClientOffload allows the application to overload the channel depth between the raft core engine,
// and the goroutine which handles messaging to the remote nodes. A sensible default value (32) is used if this option is not
// specified.
func WithChannelDepthToClientOffload(depth int32) NodeOption {
return func(n *Node) error {
if depth <= 0 {
return errors.New("bad channel depth in WithChannelDepthToClientOffload, must be > 0")
}
n.config.channelDepth.clientEvents = depth
return nil
}
}
// WithLogCommandBatchSize sets the maximum number of log commands which can be sent in one AppendEntryRequest RPC.
// Defaults to a sensible value (32) if not set.
func WithLogCommandBatchSize(depth int32) NodeOption {
return func(n *Node) error {
if depth <= 0 {
return errors.New("bad batch size in WithLogCommandBatchSize, must be > 0")
}
n.config.logCmdBatchSize = depth
return nil
}
}
// WithServerOptionsFn sets up callback used to allow application to specify server side grpc options for local raft
// node. These server options will be merged in with default options, with default options overwritten if provided
// by callback. The callback passes in the local node as specified in the Nodes configuration. Server side options
// could be used, for example, to set up mutually authenticated TLS protection of exchanges with other nodes.
func WithServerOptionsFn(fn func(local string) []grpc.ServerOption) NodeOption {
return func(n *Node) error {
n.config.serverOptionsFn = fn
return nil
}
}
// WithClientDialOptionsFn sets up callback used to allow application to specify client side grpc options for local
// raft node. These client options will be merged in with default options, with default options overwritten if provided
// by callback. The callback passes in the local/remote node pair in the form specified in the Nodes configuration.
// Client side options could be used, for example, to set up mutually authenticated TLS protection of exchanges
// with other nodes.
func WithClientDialOptionsFn(fn func(local, remote string) []grpc.DialOption) NodeOption {
return func(n *Node) error {
n.config.clientDialOptionsFn = fn
return nil
}
}
// MakeNode starts the raft node according to configuration provided.
//
// Node is returned, and public methods associated with Node can be used to interact with Node from multiple go
// routines e.g. specifically in order to access the replicated log.
//
// Context can be cancelled to signal exit. WaitGroup wg should have 1 added to it prior to calling MakeNode and
// should be waited on by the caller before exiting following cancellation. Whether MakeNode returns successfully ot
// not, WaitGroup will be marked Done() by the time the Node has cleaned up.
//
// The configuration block NodeConfig, along with localNodeIndex determine the configuration required to join the
// cluster. The localNodeIndex determines the identity of the local node as an index into the list of nodes in
// the cluster as specific in NodeConfig Nodes field.
//
// If MakeNode returns without error, than over its lifetime it will be striving to maintain
// the node as a raft member in the raft cluster, and maintaining its replica of the replicated
// log.
//
// If a fatal error is encountered at any point in the life of the node after MakeNode has returned, error will be
// signalled over the fatalError channel. A buffered channel of errors is provided to allow for raft package to signal
// fatal errors upstream and allow client to determine best course of action; typically close context to shutdown.
// As in the normal shutdown case, following receipt of a fatal error, caller should cancel context and wait
// for wait group before exiting. FatalErrorChannel method on the returned Node returns the error channel the
// application should consume.
//
// MakeNode also accepts various options including, gRPC server and dial options, logging and metrics (see functions
// returning NodeOption like WithMetrics, WithLogging etc).
//
// For logging, we would like to support structured logging. This makes specifying a useful
// logger interface a little messier. Instead we depend on Uber zap and its interface, but allow user to
// either provide a configured zap logger of its own, allow raft to use its default logging setup,
// or disable logging altogether. Customisation is achieved through the WithLogging option.
//
// For metrics, raft package tries to adhere to the USE method as described here:
// (http://www.brendangregg.com/usemethod.html) - in summary 'For every resource, track utilization, saturation
// and errors.'. On top of that we expect to be handed a metrics registry, and if one is provided, then we register
// our metrics against that. Metrics registry and customisation can be provided through the WithMetrics option.
//
func MakeNode(
ctx context.Context,
wg *sync.WaitGroup,
cfg NodeConfig,
localNodeIndex int32,
opts ...NodeOption) (*Node, error) {
defer wg.Done()
err := cfg.validate(localNodeIndex)
if err != nil {
// We failed to initialise logging. We cannot log (obviously), so we simply return the error and
// bail.
return nil, err
}
n := &Node{
index: localNodeIndex,
config: &cfg,
messaging: &raftMessaging{},
// A single fatal error is sufficient to do the job. Create buffered channel of 1. This matters,
// because when we signal, where we to block, we would skip enqueuing signal on the basis we know at
// least one signal is pending. And one signal would be enough.
fatalErrorFeedback: make(chan error, 1),
fatalErrorCount: atomic.NewInt32(0),
}
for _, opt := range opts {
err := opt(n)
if err != nil {
// It is too early and logging may not be setup yet. Simply return error.
return nil, raftErrorf(RaftErrorBadMakeNodeOption, "applied option err [%v]", err)
}
}
//
// Setup a default discard logger to avoid test for every log entry.
err = initLogging(n)
if err != nil {
// We failed to initialise logging. We cannot log (obviously), so we simply return the error and
// bail.
return nil, raftErrorf(err, "init logging failed")
}
n.logger.Info("raft package, hello (logging can be customised or disabled using WithLogger options)")
err = initMessaging(ctx, n)
if err != nil {
return nil, err
}
err = initRaftEngine(ctx, n)
if err != nil {
return nil, err
}
//
// We are ready to run. We will allocate our own context. We do this in order to handle the owner shutdown
// gracefully; specifically to orchestrate leadership transfer if we are leader on shutdown. Section 3.10 CBTP.
messagingCtx, cancel := context.WithCancel(context.Background())
n.cancel = cancel
// Kick off messaging, remembering to add to the wait group. This ensures, that as long as client
// honours wait group (i.e. waits on wait group on exit), then when exiting gracefully, the caller will
// wait on the raft package to shut down (e.g. including leader transfer).
wg.Add(1)
// Use and internal workgroup we can wait on so we can clean up (e.g. flush the logger) on exit.
var messagingWg sync.WaitGroup
messagingWg.Add(1)
runMessaging(messagingCtx, &messagingWg, n)
engineCtx, engineCancel := context.WithCancel(context.Background())
var engineWg sync.WaitGroup
engineWg.Add(1)
go n.engine.run(engineCtx, &engineWg, n)
// Wait for owner shutdown, wait for clean shutdown, then return.
go func() {
select {
case <-messagingCtx.Done():
n.logger.Info("raft package internal shutdown triggered")
case <-ctx.Done():
n.logger.Info("application is requesting a shutdown")
}
fec := n.fatalErrorCount.Load()
if fec == 0 {
n.blockOnGracefulShutdown()
} else {
n.logger.Debug("raft skipping graceful shutdown because we have taken fatal errors already")
}
// cancel() will signal exit to all the goroutines spawned by raft package. These will in turn mark wait group done
// and let the owner eventually proceed.
cancel()
messagingWg.Wait()
// we can now kill engineWg now that messageing is shutdown.
engineCancel()
engineWg.Wait()
// flush the logger to make sure we get all the logs
n.logger.Info("raft package, goodbye")
_ = n.logger.Sync()
wg.Done()
}()
return n, nil
}
// Trigger graceful shutdown, and wait until this is complete.
func (n *Node) blockOnGracefulShutdown() {
//
// TODO: send message to trigger graceful shutdown to the main state machine.
n.logger.Debugw("graceful shutdown, and triggering RequestTimeout is not implemented yet")
}
func (n *Node) keepalivePeriod() time.Duration {
return n.config.timers.leaderTimeout / 3
}
//
// DefaultZapLoggerConfig provides a production logger configuration (logs Info and above, JSON to stderr, with
// stacktrace, caller and sampling disabled) which can be customised by application to produce its own logger based
// on the raft configuration. Any logger provided by the application will also have its name extended by the raft
// package to clearly identify that log message comes from raft. For example, if the application log is named
// "foo", then the raft logs will be labelled with key "logger" value "foo.raft".
func DefaultZapLoggerConfig() zap.Config {
lcfg := zap.NewProductionConfig()
lcfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
lcfg.DisableStacktrace = false
lcfg.DisableCaller = true
lcfg.Sampling = nil
return lcfg
}
// initLogging ensures that node.logger points at something even if it is pointing to a noop logger.
// By default, we log to an opinionated pre-configured log. The WithLog option can override configuration
// or disable logging completely.
func initLogging(n *Node) error {
if n.logger == nil {
logger, err := DefaultZapLoggerConfig().Build()
if err != nil {
return raftErrorf(err, "failed to set up logging")
}
n.logger = logger.Sugar()
}
// We must, absolutely must, never return without a logger and without an error.
if n.logger == nil {
return raftErrorf(
RaftErrorMissingLogger, "tried to set up a logger, but failed, zap did not indicate why")
}
// Set logger name. This will end up being concatenated to a any preexisting log name. E.g. if the application
// provide its log named 'myapp', then logger field in logs from raft will be 'myapp.raft'.
n.logger = n.logger.Named(fmt.Sprintf("raft.NODE%d", n.index))
//
// Logger set up already - nothing we need too do.
return nil
}