Skip to content

Commit

Permalink
Make snapshot timing and trailing logs hot-reloadable in raft (#444)
Browse files Browse the repository at this point in the history
* Make snapshot timing and trailing logs hot-reloadable in raft

* Update comments

* Update config.go

* Update snapshot.go
  • Loading branch information
banks committed Mar 31, 2021
1 parent e6764bc commit f52bb63
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 51 deletions.
47 changes: 40 additions & 7 deletions api.go
Expand Up @@ -81,8 +81,15 @@ type Raft struct {
// be committed and applied to the FSM.
applyCh chan *logFuture

// Configuration provided at Raft initialization
conf Config
// conf stores the current configuration to use. This is the most recent one
// provided. All reads of config values should use the config() helper method
// to read this safely.
conf atomic.Value

// confReloadMu ensures that only one thread can reload config at once since
// we need to read-modify-write the atomic. It is NOT necessary to hold this
// for any other operation e.g. reading config using config().
confReloadMu sync.Mutex

// FSM is the client state machine to apply commands to
fsm FSM
Expand Down Expand Up @@ -385,9 +392,9 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
return nil
}

// GetConfiguration returns the configuration of the Raft cluster without
// starting a Raft instance or connecting to the cluster
// This function has identical behavior to Raft.GetConfiguration
// GetConfiguration returns the persisted configuration of the Raft cluster
// without starting a Raft instance or connecting to the cluster. This function
// has identical behavior to Raft.GetConfiguration.
func GetConfiguration(conf *Config, fsm FSM, logs LogStore, stable StableStore,
snaps SnapshotStore, trans Transport) (Configuration, error) {
conf.skipStartup = true
Expand Down Expand Up @@ -505,7 +512,6 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
r := &Raft{
protocolVersion: protocolVersion,
applyCh: applyCh,
conf: *conf,
fsm: fsm,
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
Expand All @@ -530,6 +536,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
leadershipTransferCh: make(chan *leadershipTransferFuture, 1),
}

r.conf.Store(*conf)

// Initialize as a follower.
r.setState(Follower)

Expand Down Expand Up @@ -583,7 +591,7 @@ func (r *Raft) restoreSnapshot() error {

// Try to load in order of newest to oldest
for _, snapshot := range snapshots {
if !r.conf.NoSnapshotRestoreOnStart {
if !r.config().NoSnapshotRestoreOnStart {
_, source, err := r.snapshots.Open(snapshot.ID)
if err != nil {
r.logger.Error("failed to open snapshot", "id", snapshot.ID, "error", err)
Expand Down Expand Up @@ -630,6 +638,31 @@ func (r *Raft) restoreSnapshot() error {
return nil
}

func (r *Raft) config() Config {
return r.conf.Load().(Config)
}

// ReloadConfig updates the configuration of a running raft node. If the new
// configuration is invalid an error is returned and no changes made to the
// instance.
func (r *Raft) ReloadConfig(rc *ReloadableConfig) error {
r.confReloadMu.Lock()
defer r.confReloadMu.Unlock()

// Load the current config (note we are under a lock so it can't be changed
// between this read and a later Store).
oldCfg := r.config()

// Set the reloadable fields
newCfg := rc.apply(oldCfg)

if err := ValidateConfig(&newCfg); err != nil {
return err
}
r.conf.Store(newCfg)
return nil
}

// BootstrapCluster is equivalent to non-member BootstrapCluster but can be
// called on an un-bootstrapped Raft instance after it has been created. This
// should only be called at the beginning of time for the cluster with an
Expand Down
52 changes: 45 additions & 7 deletions config.go
Expand Up @@ -164,19 +164,23 @@ type Config struct {
// we can become a leader of a cluster containing only this node.
ShutdownOnRemove bool

// TrailingLogs controls how many logs we leave after a snapshot. This is
// used so that we can quickly replay logs on a follower instead of being
// forced to send an entire snapshot.
// TrailingLogs controls how many logs we leave after a snapshot. This is used
// so that we can quickly replay logs on a follower instead of being forced to
// send an entire snapshot. The value passed here is the initial setting used.
// This can be tuned during operation using ReloadConfig.
TrailingLogs uint64

// SnapshotInterval controls how often we check if we should perform a snapshot.
// We randomly stagger between this value and 2x this value to avoid the entire
// cluster from performing a snapshot at once.
// SnapshotInterval controls how often we check if we should perform a
// snapshot. We randomly stagger between this value and 2x this value to avoid
// the entire cluster from performing a snapshot at once. The value passed
// here is the initial setting used. This can be tuned during operation using
// ReloadConfig.
SnapshotInterval time.Duration

// SnapshotThreshold controls how many outstanding logs there must be before
// we perform a snapshot. This is to prevent excessive snapshots when we can
// just replay a small set of logs.
// just replay a small set of logs. The value passed here is the initial
// setting used. This can be tuned during operation using ReloadConfig.
SnapshotThreshold uint64

// LeaderLeaseTimeout is used to control how long the "lease" lasts
Expand Down Expand Up @@ -218,6 +222,40 @@ type Config struct {
skipStartup bool
}

// ReloadableConfig is the subset of Config that may be reconfigured during
// runtime using raft.ReloadConfig. We choose to duplicate fields over embedding
// or accepting a Config but only using specific fields to keep the API clear.
// Reconfiguring some fields is potentially dangerous so we should only
// selectively enable it for fields where that is allowed.
type ReloadableConfig struct {
// TrailingLogs controls how many logs we leave after a snapshot. This is used
// so that we can quickly replay logs on a follower instead of being forced to
// send an entire snapshot. The value passed here updates the setting at runtime
// which will take effect as soon as the next snapshot completes and truncation
// occurs.
TrailingLogs uint64

// SnapshotInterval controls how often we check if we should perform a snapshot.
// We randomly stagger between this value and 2x this value to avoid the entire
// cluster from performing a snapshot at once.
SnapshotInterval time.Duration

// SnapshotThreshold controls how many outstanding logs there must be before
// we perform a snapshot. This is to prevent excessive snapshots when we can
// just replay a small set of logs.
SnapshotThreshold uint64
}

// apply sets the reloadable fields on the passed Config to the values in
// `ReloadableConfig`. It returns a copy of Config with the fields from this
// ReloadableConfig set.
func (rc *ReloadableConfig) apply(to Config) Config {
to.TrailingLogs = rc.TrailingLogs
to.SnapshotInterval = rc.SnapshotInterval
to.SnapshotThreshold = rc.SnapshotThreshold
return to
}

// DefaultConfig returns a Config with usable defaults.
func DefaultConfig() *Config {
return &Config{
Expand Down
56 changes: 37 additions & 19 deletions raft.go
Expand Up @@ -29,7 +29,7 @@ var (
// responses.
func (r *Raft) getRPCHeader() RPCHeader {
return RPCHeader{
ProtocolVersion: r.conf.ProtocolVersion,
ProtocolVersion: r.config().ProtocolVersion,
}
}

Expand All @@ -56,7 +56,7 @@ func (r *Raft) checkRPCHeader(rpc RPC) error {
// currently what we want, and in general support one version back. We
// may need to revisit this policy depending on how future protocol
// changes evolve.
if header.ProtocolVersion < r.conf.ProtocolVersion-1 {
if header.ProtocolVersion < r.config().ProtocolVersion-1 {
return ErrUnsupportedProtocol
}

Expand Down Expand Up @@ -151,7 +151,7 @@ func (r *Raft) runFollower() {
didWarn := false
r.logger.Info("entering follower state", "follower", r, "leader", r.Leader())
metrics.IncrCounter([]string{"raft", "state", "follower"}, 1)
heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)
heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout)

for r.getState() == Follower {
select {
Expand Down Expand Up @@ -187,11 +187,12 @@ func (r *Raft) runFollower() {

case <-heartbeatTimer:
// Restart the heartbeat timer
heartbeatTimer = randomTimeout(r.conf.HeartbeatTimeout)
hbTimeout := r.config().HeartbeatTimeout
heartbeatTimer = randomTimeout(hbTimeout)

// Check if we have had a successful contact
lastContact := r.LastContact()
if time.Now().Sub(lastContact) < r.conf.HeartbeatTimeout {
if time.Now().Sub(lastContact) < hbTimeout {
continue
}

Expand Down Expand Up @@ -228,7 +229,8 @@ func (r *Raft) runFollower() {
// called on the main thread, and only makes sense in the follower state.
func (r *Raft) liveBootstrap(configuration Configuration) error {
// Use the pre-init API to make the static updates.
err := BootstrapCluster(&r.conf, r.logs, r.stable, r.snapshots,
cfg := r.config()
err := BootstrapCluster(&cfg, r.logs, r.stable, r.snapshots,
r.trans, configuration)
if err != nil {
return err
Expand Down Expand Up @@ -260,7 +262,7 @@ func (r *Raft) runCandidate() {
// otherwise.
defer func() { r.candidateFromLeadershipTransfer = false }()

electionTimer := randomTimeout(r.conf.ElectionTimeout)
electionTimer := randomTimeout(r.config().ElectionTimeout)

// Tally the votes, need a simple majority
grantedVotes := 0
Expand Down Expand Up @@ -370,8 +372,13 @@ func (r *Raft) runLeader() {
// Notify that we are the leader
overrideNotifyBool(r.leaderCh, true)

// Store the notify chan. It's not reloadable so shouldn't change before the
// defer below runs, but this makes sure we always notify the same chan if
// ever for both gaining and loosing leadership.
notify := r.config().NotifyCh

// Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil {
if notify != nil {
select {
case notify <- true:
case <-r.shutdownCh:
Expand Down Expand Up @@ -427,7 +434,7 @@ func (r *Raft) runLeader() {
overrideNotifyBool(r.leaderCh, false)

// Push to the notify channel if given
if notify := r.conf.NotifyCh; notify != nil {
if notify != nil {
select {
case notify <- false:
case <-r.shutdownCh:
Expand Down Expand Up @@ -548,7 +555,9 @@ func (r *Raft) leaderLoop() {
// only a single peer (ourself) and replicating to an undefined set
// of peers.
stepDown := false
lease := time.After(r.conf.LeaderLeaseTimeout)
// This is only used for the first lease check, we reload lease below
// based on the current config value.
lease := time.After(r.config().LeaderLeaseTimeout)

for r.getState() == Leader {
select {
Expand Down Expand Up @@ -583,7 +592,7 @@ func (r *Raft) leaderLoop() {
// the stopCh and doneCh.
go func() {
select {
case <-time.After(r.conf.ElectionTimeout):
case <-time.After(r.config().ElectionTimeout):
close(stopCh)
err := fmt.Errorf("leadership transfer timeout")
r.logger.Debug(err.Error())
Expand Down Expand Up @@ -680,7 +689,7 @@ func (r *Raft) leaderLoop() {
metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(len(groupReady)))

if stepDown {
if r.conf.ShutdownOnRemove {
if r.config().ShutdownOnRemove {
r.logger.Info("removed ourself, shutting down")
r.Shutdown()
} else {
Expand Down Expand Up @@ -751,7 +760,7 @@ func (r *Raft) leaderLoop() {
// Group commit, gather all the ready commits
ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP:
for i := 0; i < r.conf.MaxAppendEntries; i++ {
for i := 0; i < r.config().MaxAppendEntries; i++ {
select {
case newLog := <-r.applyCh:
ready = append(ready, newLog)
Expand All @@ -776,7 +785,7 @@ func (r *Raft) leaderLoop() {

// Next check interval should adjust for the last node we've
// contacted, without going negative
checkInterval := r.conf.LeaderLeaseTimeout - maxDiff
checkInterval := r.config().LeaderLeaseTimeout - maxDiff
if checkInterval < minCheckInterval {
checkInterval = minCheckInterval
}
Expand Down Expand Up @@ -872,6 +881,11 @@ func (r *Raft) checkLeaderLease() time.Duration {
// Track contacted nodes, we can always contact ourself
contacted := 0

// Store lease timeout for this one check invocation as we need to refer to it
// in the loop and would be confusing if it ever becomes reloadable and
// changes between iterations below.
leaseTimeout := r.config().LeaderLeaseTimeout

// Check each follower
var maxDiff time.Duration
now := time.Now()
Expand All @@ -883,14 +897,14 @@ func (r *Raft) checkLeaderLease() time.Duration {
}
f := r.leaderState.replState[server.ID]
diff := now.Sub(f.LastContact())
if diff <= r.conf.LeaderLeaseTimeout {
if diff <= leaseTimeout {
contacted++
if diff > maxDiff {
maxDiff = diff
}
} else {
// Log at least once at high value, then debug. Otherwise it gets very verbose.
if diff <= 3*r.conf.LeaderLeaseTimeout {
if diff <= 3*leaseTimeout {
r.logger.Warn("failed to contact", "server-id", server.ID, "time", diff)
} else {
r.logger.Debug("failed to contact", "server-id", server.ID, "time", diff)
Expand Down Expand Up @@ -1131,7 +1145,11 @@ func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) {
}
}

batch := make([]*commitTuple, 0, r.conf.MaxAppendEntries)
// Store maxAppendEntries for this call in case it ever becomes reloadable. We
// need to use the same value for all lines here to get the expected result.
maxAppendEntries := r.config().MaxAppendEntries

batch := make([]*commitTuple, 0, maxAppendEntries)

// Apply all the preceding logs
for idx := lastApplied + 1; idx <= index; idx++ {
Expand All @@ -1156,9 +1174,9 @@ func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) {
batch = append(batch, preparedLog)

// If we have filled up a batch, send it to the FSM
if len(batch) >= r.conf.MaxAppendEntries {
if len(batch) >= maxAppendEntries {
applyBatch(batch)
batch = make([]*commitTuple, 0, r.conf.MaxAppendEntries)
batch = make([]*commitTuple, 0, maxAppendEntries)
}

case futureOk:
Expand Down

0 comments on commit f52bb63

Please sign in to comment.