Skip to content

Commit

Permalink
storage: compute quorum commit index instead of using raft.Status.Commit
Browse files Browse the repository at this point in the history
Compute the index at which a quorum of nodes have committed instead of
using raft.Status.Commit. The latter can be in advance of the computed
quorum commit index just after a replica has been added to the
group. And by computing the value ourselves we can include the pending
snapshot index in the computation.

Fixes #10409
  • Loading branch information
petermattis committed Nov 6, 2016
1 parent f7f257f commit 567ef20
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 32 deletions.
51 changes: 42 additions & 9 deletions pkg/storage/raft_log_queue.go
Expand Up @@ -17,6 +17,7 @@
package storage

import (
"sort"
"time"

"github.com/coreos/etcd/raft"
Expand Down Expand Up @@ -136,35 +137,36 @@ func getTruncatableIndexes(ctx context.Context, r *Replica) (uint64, uint64, err
func computeTruncatableIndex(
raftStatus *raft.Status, raftLogSize, targetSize int64, firstIndex, pendingSnapshotIndex uint64,
) uint64 {
truncatableIndex := raftStatus.Commit
quorumIndex := getQuorumIndex(raftStatus, pendingSnapshotIndex)
truncatableIndex := quorumIndex
if raftLogSize <= targetSize {
// Only truncate to one of the behind indexes if the raft log is less than
// the target size. If the raft log is greater than the target size we
// always truncate to the quorum commit index.
truncatableIndex = getBehindIndex(raftStatus)
truncatableIndex = getBehindIndex(raftStatus, quorumIndex)
// The pending snapshot index acts as a placeholder for a replica that is
// about to be added to the range. We don't want to truncate the log in a
// way that will require that new replica to be caught up via a Raft
// snapshot.
if pendingSnapshotIndex > 0 && truncatableIndex > pendingSnapshotIndex {
truncatableIndex = pendingSnapshotIndex
}
if truncatableIndex < firstIndex {
truncatableIndex = firstIndex
}
}

if truncatableIndex < firstIndex {
truncatableIndex = firstIndex
}
// Never truncate past the quorum committed index.
if truncatableIndex > raftStatus.Commit {
truncatableIndex = raftStatus.Commit
if truncatableIndex > quorumIndex {
truncatableIndex = quorumIndex
}
return truncatableIndex
}

// getBehindIndex returns the raft log index of the oldest node or the quorum
// commit index if all nodes are caught up.
func getBehindIndex(raftStatus *raft.Status) uint64 {
behind := raftStatus.Commit
func getBehindIndex(raftStatus *raft.Status, quorumIndex uint64) uint64 {
behind := quorumIndex
for _, progress := range raftStatus.Progress {
index := progress.Match
if behind > index {
Expand All @@ -174,6 +176,23 @@ func getBehindIndex(raftStatus *raft.Status) uint64 {
return behind
}

// getQuorumIndex returns the index which a quorum of the nodes have
// committed. The pendingSnapshotIndex indicates the index of a pending
// snapshot which is considered part of the Raft group even though it hasn't
// been added yet.
func getQuorumIndex(raftStatus *raft.Status, pendingSnapshotIndex uint64) uint64 {
match := make([]uint64, 0, len(raftStatus.Progress)+1)
for _, progress := range raftStatus.Progress {
match = append(match, progress.Match)
}
if pendingSnapshotIndex != 0 {
match = append(match, pendingSnapshotIndex)
}
sort.Sort(uint64Slice(match))
quorum := computeQuorum(len(match))
return match[len(match)-quorum]
}

// shouldQueue determines whether a range should be queued for truncating. This
// is true only if the replica is the raft leader and if the total number of
// the range's raft log's stale entries exceeds RaftLogQueueStaleThreshold.
Expand Down Expand Up @@ -224,3 +243,17 @@ func (*raftLogQueue) timer() time.Duration {
func (*raftLogQueue) purgatoryChan() <-chan struct{} {
return nil
}

var _ sort.Interface = uint64Slice(nil)

// uint64Slice implements sort.Interface
type uint64Slice []uint64

// Len implements sort.Interface
func (a uint64Slice) Len() int { return len(a) }

// Swap implements sort.Interface
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

// Less implements sort.Interface
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }
80 changes: 57 additions & 23 deletions pkg/storage/raft_log_queue_test.go
Expand Up @@ -36,65 +36,99 @@ func TestGetBehindIndex(t *testing.T) {

testCases := []struct {
progress []uint64
commit uint64
expected uint64
}{
// Basic cases.
{[]uint64{1}, 1, 1},
{[]uint64{1, 2}, 2, 1},
{[]uint64{2, 3, 4}, 4, 2},
{[]uint64{1, 2, 3, 4, 5}, 3, 1},
{[]uint64{1}, 1},
{[]uint64{1, 2}, 1},
{[]uint64{2, 3, 4}, 2},
{[]uint64{1, 2, 3, 4, 5}, 1},
// sorting.
{[]uint64{5, 4, 3, 2, 1}, 3, 1},
{[]uint64{5, 4, 3, 2, 1}, 1},
}
for i, c := range testCases {
status := &raft.Status{
Progress: make(map[uint64]raft.Progress),
}
status.Commit = c.commit
for j, v := range c.progress {
status.Progress[uint64(j)] = raft.Progress{Match: v}
}
out := getBehindIndex(status)
out := getBehindIndex(status, getQuorumIndex(status, 0))
if !reflect.DeepEqual(c.expected, out) {
t.Errorf("%d: getBehindIndex(...) expected %d, but got %d", i, c.expected, out)
}
}
}

func TestGetQuorumIndex(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
progress []uint64
pendingSnapshotIndex uint64
expected uint64
}{
// Basic cases.
{[]uint64{1}, 0, 1},
{[]uint64{2}, 1, 1},
{[]uint64{1, 2}, 0, 1},
{[]uint64{2, 3}, 1, 2},
{[]uint64{1, 2, 3}, 0, 2},
{[]uint64{2, 3, 4}, 1, 2},
{[]uint64{1, 2, 3, 4}, 0, 2},
{[]uint64{2, 3, 4, 5}, 1, 3},
{[]uint64{1, 2, 3, 4, 5}, 0, 3},
{[]uint64{2, 3, 4, 5, 6}, 1, 3},
// Sorting.
{[]uint64{5, 4, 3, 2, 1}, 0, 3},
}
for i, c := range testCases {
status := &raft.Status{
Progress: make(map[uint64]raft.Progress),
}
for j, v := range c.progress {
status.Progress[uint64(j)] = raft.Progress{Match: v}
}
quorumMatchedIndex := getQuorumIndex(status, c.pendingSnapshotIndex)
if c.expected != quorumMatchedIndex {
t.Fatalf("%d: expected %d, but got %d", i, c.expected, quorumMatchedIndex)
}
}
}

func TestComputeTruncatableIndex(t *testing.T) {
defer leaktest.AfterTest(t)()

const targetSize = 1000

testCases := []struct {
progress []uint64
commit uint64
raftLogSize int64
firstIndex uint64
pendingSnapshot uint64
expected uint64
}{
{[]uint64{1, 2}, 1, 100, 1, 0, 1},
{[]uint64{1, 5, 5}, 5, 100, 1, 0, 1},
{[]uint64{1, 5, 5}, 5, 100, 2, 0, 2},
{[]uint64{5, 5, 5}, 5, 100, 2, 0, 5},
{[]uint64{5, 5, 5}, 5, 100, 2, 1, 2},
{[]uint64{5, 5, 5}, 5, 100, 2, 3, 3},
{[]uint64{1, 2, 3, 4}, 3, 100, 1, 0, 1},
{[]uint64{1, 2, 3, 4}, 3, 100, 2, 0, 2},
{[]uint64{1, 2}, 100, 1, 0, 1},
{[]uint64{1, 5, 5}, 100, 1, 0, 1},
{[]uint64{1, 5, 5}, 100, 2, 0, 2},
{[]uint64{5, 5, 5}, 100, 2, 0, 5},
{[]uint64{5, 5, 5}, 100, 2, 1, 2},
{[]uint64{5, 5, 5}, 100, 2, 3, 3},
{[]uint64{1, 2, 3, 4}, 100, 1, 0, 1},
{[]uint64{1, 2, 3, 4}, 100, 2, 0, 2},
// If over targetSize, should truncate to quorum committed index.
{[]uint64{1, 2, 3, 4}, 3, 2000, 1, 0, 3},
{[]uint64{1, 2, 3, 4}, 3, 2000, 2, 0, 3},
{[]uint64{1, 2, 3, 4}, 3, 2000, 3, 0, 3},
// Never truncate past raftStatus.Commit.
{[]uint64{4, 5, 6}, 3, 100, 4, 0, 3},
{[]uint64{1, 3, 3, 4}, 2000, 1, 0, 3},
{[]uint64{1, 3, 3, 4}, 2000, 2, 0, 3},
{[]uint64{1, 3, 3, 4}, 2000, 3, 0, 3},
// The pending snapshot index affects the quorum commit index.
{[]uint64{4}, 2000, 1, 1, 1},
// Never truncate past the quorum commit index.
{[]uint64{3, 3, 6}, 100, 4, 0, 3},
}
for i, c := range testCases {
status := &raft.Status{
Progress: make(map[uint64]raft.Progress),
}
status.Commit = c.commit
for j, v := range c.progress {
status.Progress[uint64(j)] = raft.Progress{Match: v}
}
Expand Down

0 comments on commit 567ef20

Please sign in to comment.