Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/raft-pre-vote' into raft-pre-vote
Browse files Browse the repository at this point in the history
# Conflicts:
#	raft.go
  • Loading branch information
dhiaayachi committed Oct 16, 2023
2 parents 785b127 + 63a9333 commit 0b1811d
Show file tree
Hide file tree
Showing 18 changed files with 76 additions and 51 deletions.
22 changes: 20 additions & 2 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ type Raft struct {
// candidate because the leader tries to transfer leadership. This flag is
// used in RequestVoteRequest to express that a leadership transfer is going
// on.
candidateFromLeadershipTransfer bool
candidateFromLeadershipTransfer atomic.Bool

// Stores our local server ID, used to avoid sending RPCs to ourself
localID ServerID
Expand Down Expand Up @@ -792,12 +792,23 @@ func (r *Raft) LeaderWithID() (ServerAddress, ServerID) {
// An optional timeout can be provided to limit the amount of time we wait
// for the command to be started. This must be run on the leader or it
// will fail.
//
// If the node discovers it is no longer the leader while applying the command,
// it will return ErrLeadershipLost. There is no way to guarantee whether the
// write succeeded or failed in this case. For example, if the leader is
// partitioned it can't know if a quorum of followers wrote the log to disk. If
// at least one did, it may survive into the next leader's term.
//
// If a user snapshot is restored while the command is in-flight, an
// ErrAbortedByRestore is returned. In this case the write effectively failed
// since its effects will not be present in the FSM after the restore.
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {
return r.ApplyLog(Log{Data: cmd}, timeout)
}

// ApplyLog performs Apply but takes in a Log directly. The only values
// currently taken from the submitted Log are Data and Extensions.
// currently taken from the submitted Log are Data and Extensions. See
// Apply for details on error cases.
func (r *Raft) ApplyLog(log Log, timeout time.Duration) ApplyFuture {
metrics.IncrCounter([]string{"raft", "apply"}, 1)

Expand Down Expand Up @@ -1198,6 +1209,13 @@ func (r *Raft) LastIndex() uint64 {
return r.getLastIndex()
}

// CommitIndex returns the committed index.
// This API maybe helpful for server to implement the read index optimization
// as described in the Raft paper.
func (r *Raft) CommitIndex() uint64 {
return r.getCommitIndex()
}

// AppliedIndex returns the last index applied to the FSM. This is generally
// lagging behind the last index, especially for indexes that are persisted but
// have not yet been considered committed by the leader. NOTE - this reflects
Expand Down
1 change: 0 additions & 1 deletion commitment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ func TestCommitment_noVoterSanity(t *testing.T) {
if drainNotifyCh(commitCh) {
t.Fatalf("unexpected commit notify")
}

}

// Single voter commits immediately.
Expand Down
4 changes: 2 additions & 2 deletions file_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewFileSnapshotStoreWithLogger(base string, retain int, logger hclog.Logger

// Ensure our path exists
path := filepath.Join(base, snapPath)
if err := os.MkdirAll(path, 0755); err != nil && !os.IsExist(err) {
if err := os.MkdirAll(path, 0o755); err != nil && !os.IsExist(err) {
return nil, fmt.Errorf("snapshot path not accessible: %v", err)
}

Expand Down Expand Up @@ -170,7 +170,7 @@ func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64,
f.logger.Info("creating new snapshot", "path", path)

// Make the directory
if err := os.MkdirAll(path, 0755); err != nil {
if err := os.MkdirAll(path, 0o755); err != nil {
f.logger.Error("failed to make snapshot directly", "error", err)
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions file_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func TestFileSS_CreateSnapshotMissingParentDir(t *testing.T) {
if err != nil {
t.Fatalf("should not fail when using non existing parent")
}

}

func TestFileSS_CreateSnapshot(t *testing.T) {
// Create a test dir
dir, err := os.MkdirTemp("", "raft")
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestFileSS_BadPerm(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
if err = os.Chmod(dir2, 000); err != nil {
if err = os.Chmod(dir2, 0o00); err != nil {
t.Fatalf("err: %s", err)
}
defer os.Chmod(dir2, 777) // Set perms back for delete
Expand Down
4 changes: 2 additions & 2 deletions fuzzy/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (c *cluster) RecordState(t *testing.T) {
copyDir(td, sd)
dump := func(n *raftNode) {
nt := filepath.Join(td, n.name)
os.Mkdir(nt, 0777)
os.Mkdir(nt, 0o777)
n.fsm.WriteTo(filepath.Join(nt, "fsm.txt"))
n.transport.DumpLog(nt)
}
Expand All @@ -315,7 +315,7 @@ func copyDir(target, src string) {
filepath.Walk(src, func(path string, info os.FileInfo, err error) error {
relPath := path[len(src):]
if info.IsDir() {
return os.MkdirAll(filepath.Join(target, relPath), 0777)
return os.MkdirAll(filepath.Join(target, relPath), 0o777)
}
return copyFile(filepath.Join(target, relPath), path)
})
Expand Down
1 change: 1 addition & 0 deletions fuzzy/fsm_batch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

//go:build batchtest
// +build batchtest

package fuzzy
Expand Down
2 changes: 1 addition & 1 deletion fuzzy/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func resolveDirectory(dir string, create bool) (string, error) {
}
if create {
if _, err := os.Stat(resolved); os.IsNotExist(err) {
if err := os.MkdirAll(resolved, 0744); err != nil {
if err := os.MkdirAll(resolved, 0o744); err != nil {
return "", err
}
}
Expand Down
6 changes: 2 additions & 4 deletions fuzzy/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,18 @@ import (
"bytes"
"errors"
"fmt"
"github.com/hashicorp/go-hclog"
"io"
"os"
"path/filepath"
"sync"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/raft"
)

var (
codecHandle codec.MsgpackHandle
)
var codecHandle codec.MsgpackHandle

type appendEntries struct {
source string
Expand Down
14 changes: 13 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/hashicorp/raft

go 1.16
go 1.17

retract v1.1.3 // Deleted original tag; module checksum may not be accurate.

Expand All @@ -10,3 +10,15 @@ require (
github.com/hashicorp/go-msgpack v0.5.5
github.com/stretchr/testify v1.8.4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
3 changes: 2 additions & 1 deletion inmem_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
package raft

import (
"github.com/stretchr/testify/require"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestInmemTransportImpl(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func TestRaft_RestartFollower_LongInitialHeartbeat(t *testing.T) {
seeNewLeader := func(o *Observation) bool { _, ok := o.Data.(LeaderObservation); return ok }
leaderCh := make(chan Observation)
// TODO Closing this channel results in panics, even though we're calling Release.
//defer close(leaderCh)
// defer close(leaderCh)
leaderChanges := new(uint32)
go func() {
for range leaderCh {
Expand Down
12 changes: 6 additions & 6 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ func TestOldestLog(t *testing.T) {
{
Name: "simple case",
Logs: []*Log{
&Log{
{
Index: 1234,
Term: 1,
},
&Log{
{
Index: 1235,
Term: 1,
},
&Log{
{
Index: 1236,
Term: 2,
},
Expand Down Expand Up @@ -76,16 +76,16 @@ func TestEmitsLogStoreMetrics(t *testing.T) {

s := NewInmemStore()
logs := []*Log{
&Log{
{
Index: 1234,
Term: 1,
AppendedAt: time.Now(),
},
&Log{
{
Index: 1235,
Term: 1,
},
&Log{
{
Index: 1236,
Term: 2,
},
Expand Down
5 changes: 0 additions & 5 deletions net_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ func makeAppendRPCResponse() AppendEntriesResponse {
}

func TestNetworkTransport_AppendEntries(t *testing.T) {

for _, useAddrProvider := range []bool{true, false} {
// Transport 1 is consumer
trans1, err := makeTransport(t, useAddrProvider, "localhost:0")
Expand Down Expand Up @@ -279,7 +278,6 @@ func TestNetworkTransport_AppendEntries(t *testing.T) {
}

func TestNetworkTransport_AppendEntriesPipeline(t *testing.T) {

for _, useAddrProvider := range []bool{true, false} {
// Transport 1 is consumer
trans1, err := makeTransport(t, useAddrProvider, "localhost:0")
Expand Down Expand Up @@ -539,7 +537,6 @@ func TestNetworkTransport_AppendEntriesPipeline_MaxRPCsInFlight(t *testing.T) {
}

func TestNetworkTransport_RequestVote(t *testing.T) {

for _, useAddrProvider := range []bool{true, false} {
// Transport 1 is consumer
trans1, err := makeTransport(t, useAddrProvider, "localhost:0")
Expand Down Expand Up @@ -601,7 +598,6 @@ func TestNetworkTransport_RequestVote(t *testing.T) {
}

func TestNetworkTransport_InstallSnapshot(t *testing.T) {

for _, useAddrProvider := range []bool{true, false} {
// Transport 1 is consumer
trans1, err := makeTransport(t, useAddrProvider, "localhost:0")
Expand Down Expand Up @@ -865,7 +861,6 @@ func (sl testCountingStreamLayer) Dial(address ServerAddress, timeout time.Durat
// do not result in a tight loop and spam the log. We verify this here by counting the number
// of calls against Accept() and the logger
func TestNetworkTransport_ListenBackoff(t *testing.T) {

// testTime is the amount of time we will allow NetworkTransport#listen() to run
// This needs to be long enough that to verify that maxDelay is in force,
// but not so long as to be obnoxious when running the test suite.
Expand Down
6 changes: 3 additions & 3 deletions peersjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestPeersJSON_BadConfiguration(t *testing.T) {
defer os.RemoveAll(base)

peers := filepath.Join(base, "peers.json")
if err = os.WriteFile(peers, []byte("null"), 0666); err != nil {
if err = os.WriteFile(peers, []byte("null"), 0o666); err != nil {
t.Fatalf("err: %v", err)
}

Expand All @@ -46,7 +46,7 @@ func TestPeersJSON_ReadPeersJSON(t *testing.T) {
"127.0.0.3:123"]
`)
peers := filepath.Join(base, "peers.json")
if err = os.WriteFile(peers, content, 0666); err != nil {
if err = os.WriteFile(peers, content, 0o666); err != nil {
t.Fatalf("err: %v", err)
}
var configuration Configuration
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestPeersJSON_ReadConfigJSON(t *testing.T) {
]
`)
peers := filepath.Join(base, "peers.json")
if err = os.WriteFile(peers, content, 0666); err != nil {
if err = os.WriteFile(peers, content, 0o666); err != nil {
t.Fatalf("err: %v", err)
}

Expand Down
18 changes: 11 additions & 7 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (r *Raft) runCandidate() {
// which will make other servers vote even though they have a leader already.
// It is important to reset that flag, because this priviledge could be abused
// otherwise.
defer func() { r.candidateFromLeadershipTransfer = false }()
defer func() { r.candidateFromLeadershipTransfer.Store(false) }()

electionTimeout := r.config().ElectionTimeout
electionTimer := randomTimeout(electionTimeout)
Expand Down Expand Up @@ -491,6 +491,11 @@ func (r *Raft) runLeader() {
select {
case notify <- true:
case <-r.shutdownCh:
// make sure push to the notify channel ( if given )
select {
case notify <- true:
default:
}
}
}

Expand Down Expand Up @@ -792,7 +797,7 @@ func (r *Raft) leaderLoop() {

start := time.Now()
var groupReady []*list.Element
var groupFutures = make(map[uint64]*logFuture)
groupFutures := make(map[uint64]*logFuture)
var lastIdxInGroup uint64

// Pull all inflight logs that are committed off the queue.
Expand Down Expand Up @@ -841,7 +846,6 @@ func (r *Raft) leaderLoop() {
if v.quorumSize == 0 {
// Just dispatched, start the verification
r.verifyLeader(v)

} else if v.votes < v.quorumSize {
// Early return, means there must be a new leader
r.logger.Warn("new leader elected, stepping down")
Expand Down Expand Up @@ -1448,7 +1452,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {

// Increase the term if we see a newer one, also transition to follower
// if we ever get an appendEntries call
if a.Term > r.getCurrentTerm() || (r.getState() != Follower && !r.candidateFromLeadershipTransfer) {
if a.Term > r.getCurrentTerm() || (r.getState() != Follower && !r.candidateFromLeadershipTransfer.Load()) {
// Ensure transition to follower
r.setState(Follower)
r.setCurrentTerm(a.Term)
Expand All @@ -1468,7 +1472,6 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
var prevLogTerm uint64
if a.PrevLogEntry == lastIdx {
prevLogTerm = lastTerm

} else {
var prevLog Log
if err := r.logs.GetLog(a.PrevLogEntry, &prevLog); err != nil {
Expand Down Expand Up @@ -1907,7 +1910,8 @@ func (r *Raft) electSelf(preVote bool) <-chan *voteResult {
Candidate: r.trans.EncodePeer(r.localID, r.localAddr),
LastLogIndex: lastIdx,
LastLogTerm: lastTerm,
LeadershipTransfer: r.candidateFromLeadershipTransfer,
LeadershipTransfer: r.candidateFromLeadershipTransfer.Load(),
PreVote: preVote,
PreVote: preVote,
}

Expand Down Expand Up @@ -2045,7 +2049,7 @@ func (r *Raft) initiateLeadershipTransfer(id *ServerID, address *ServerAddress)
func (r *Raft) timeoutNow(rpc RPC, req *TimeoutNowRequest) {
r.setLeader("", "")
r.setState(Candidate)
r.candidateFromLeadershipTransfer = true
r.candidateFromLeadershipTransfer.Store(true)
rpc.Respond(&TimeoutNowResponse{}, nil)
}

Expand Down
Loading

0 comments on commit 0b1811d

Please sign in to comment.