Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Post restore reset #545

Merged
merged 11 commits into from
Mar 17, 2023
15 changes: 14 additions & 1 deletion log.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,26 @@ type LogStore interface {
// StoreLog stores a log entry.
StoreLog(log *Log) error

// StoreLogs stores multiple log entries.
// StoreLogs stores multiple log entries. By default the logs stored may not be contiguous with previous logs (i.e. may have a gap in Index since the last log written). If an implementation can't tolerate this it may optionally implement `MonotonicLogStore` to indicate that this is not allowed. This changes Raft's behaviour after restoring a user snapshot to remove all previous logs instead of relying on a "gap" to signal the discontinuity between logs before the snapshot and logs after.
StoreLogs(logs []*Log) error

// DeleteRange deletes a range of log entries. The range is inclusive.
DeleteRange(min, max uint64) error
}

// MonotonicLogStore is an optional interface for LogStore implementations that
// cannot tolerate gaps in between the Index values of consecutive log entries. For example,
// this may allow more efficient indexing because the Index values are densely populated. If true is
// returned, Raft will avoid relying on gaps to trigger re-synching logs on followers after a
// snapshot is restored. The LogStore must have an efficient implementation of
// DeleteLogs, as this called after every snapshot restore when gaps are not allowed.
jmurret marked this conversation as resolved.
Show resolved Hide resolved
// We avoid deleting all records for LogStores that do not implement MonotonicLogStore
// because this has a major negative performance impact on the BoltDB store that is currently
jmurret marked this conversation as resolved.
Show resolved Hide resolved
// the most widely used.
type MonotonicLogStore interface {
IsMonotonic() bool
}

func oldestLog(s LogStore) (Log, error) {
var l Log

Expand Down
10 changes: 10 additions & 0 deletions log_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ func NewLogCache(capacity int, store LogStore) (*LogCache, error) {
return c, nil
}

// IsMonotonic implements the MonotonicLogStore interface. This is a shim to
// expose the underyling store as monotonically indexed or not.
func (c *LogCache) IsMonotonic() bool {
if store, ok := c.store.(MonotonicLogStore); ok {
return store.IsMonotonic()
}

return false
}

func (c *LogCache) GetLog(idx uint64, log *Log) error {
// Check the buffer for an entry
c.l.RLock()
Expand Down
19 changes: 15 additions & 4 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,14 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error {
r.setLastApplied(lastIndex)
r.setLastSnapshot(lastIndex, term)

r.logger.Info("restored user snapshot", "index", latestIndex)
// Remove old logs if r.logs is a MonotonicLogStore. Log any errors and continue.
if logs, ok := r.logs.(MonotonicLogStore); ok && logs.IsMonotonic() {
if err := r.removeOldLogs(); err != nil {
r.logger.Error("failed to remove old logs", "error", err)
}
}

r.logger.Info("restored user snapshot", "index", lastIndex)
return nil
}

Expand Down Expand Up @@ -1790,15 +1797,19 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
r.setLatestConfiguration(reqConfiguration, reqConfigurationIndex)
r.setCommittedConfiguration(reqConfiguration, reqConfigurationIndex)

// Compact logs, continue even if this fails
if err := r.compactLogs(req.LastLogIndex); err != nil {
// Clear old logs if r.logs is a MonotonicLogStore. Otherwise compact the
// logs. In both cases, log any errors and continue.
if mlogs, ok := r.logs.(MonotonicLogStore); ok && mlogs.IsMonotonic() {
if err := r.removeOldLogs(); err != nil {
r.logger.Error("failed to reset logs", "error", err)
}
} else if err := r.compactLogs(req.LastLogIndex); err != nil {
jmurret marked this conversation as resolved.
Show resolved Hide resolved
r.logger.Error("failed to compact logs", "error", err)
}

r.logger.Info("Installed remote snapshot")
resp.Success = true
r.setLastContact()
return
}

// setLastContact is used to set the last contact time to now
Expand Down
190 changes: 182 additions & 8 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -1019,6 +1020,88 @@ func TestRaft_SnapshotRestore(t *testing.T) {
}
}

func TestRaft_SnapshotRestore_Monotonic(t *testing.T) {
jmurret marked this conversation as resolved.
Show resolved Hide resolved
// Make the cluster
conf := inmemConfig(t)
conf.TrailingLogs = 10
opts := &MakeClusterOpts{
Peers: 1,
Bootstrap: true,
Conf: conf,
MonotonicLogs: true,
}
c := MakeClusterCustom(t, opts)
defer c.Close()

leader := c.Leader()

// Commit a lot of things
var future Future
for i := 0; i < 100; i++ {
future = leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0)
}

// Wait for the last future to apply
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Take a snapshot
snapFuture := leader.Snapshot()
if err := snapFuture.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Check for snapshot
snaps, _ := leader.snapshots.List()
if len(snaps) != 1 {
t.Fatalf("should have a snapshot")
}
snap := snaps[0]

// Logs should be trimmed
firstIdx, err := leader.logs.FirstIndex()
if err != nil {
t.Fatalf("err: %v", err)
}
lastIdx, err := leader.logs.LastIndex()
if err != nil {
t.Fatalf("err: %v", err)
}

if firstIdx != snap.Index-conf.TrailingLogs+1 {
t.Fatalf("should trim logs to %d: but is %d", snap.Index-conf.TrailingLogs+1, firstIdx)
}

// Shutdown
shutdown := leader.Shutdown()
if err := shutdown.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Restart the Raft
r := leader
// Can't just reuse the old transport as it will be closed
_, trans2 := NewInmemTransport(r.trans.LocalAddr())
cfg := r.config()
r, err = NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2)
if err != nil {
t.Fatalf("err: %v", err)
}
c.rafts[0] = r

// We should have restored from the snapshot!
if last := r.getLastApplied(); last != snap.Index {
t.Fatalf("bad last index: %d, expecting %d", last, snap.Index)
}

// Verify that logs have not been reset
first, _ := r.logs.FirstIndex()
last, _ := r.logs.LastIndex()
assert.Equal(t, firstIdx, first)
assert.Equal(t, lastIdx, last)
}

func TestRaft_SnapshotRestore_Progress(t *testing.T) {
// Make the cluster
conf := inmemConfig(t)
Expand Down Expand Up @@ -1342,7 +1425,9 @@ func TestRaft_UserSnapshot(t *testing.T) {

// snapshotAndRestore does a snapshot and restore sequence and applies the given
// offset to the snapshot index, so we can try out different situations.
func snapshotAndRestore(t *testing.T, offset uint64) {
func snapshotAndRestore(t *testing.T, offset uint64, monotonicLogStore bool, restoreNewCluster bool) {
t.Helper()

// Make the cluster.
conf := inmemConfig(t)

Expand All @@ -1352,7 +1437,19 @@ func snapshotAndRestore(t *testing.T, offset uint64) {
conf.ElectionTimeout = 500 * time.Millisecond
conf.LeaderLeaseTimeout = 500 * time.Millisecond

c := MakeCluster(3, t, conf)
var c *cluster
numPeers := 3
optsMonotonic := &MakeClusterOpts{
Peers: numPeers,
Bootstrap: true,
Conf: conf,
MonotonicLogs: true,
}
if monotonicLogStore {
c = MakeClusterCustom(t, optsMonotonic)
} else {
c = MakeCluster(numPeers, t, conf)
}
defer c.Close()

// Wait for things to get stable and commit some things.
Expand Down Expand Up @@ -1382,6 +1479,17 @@ func snapshotAndRestore(t *testing.T, offset uint64) {
// Get the last index before the restore.
preIndex := leader.getLastIndex()

if restoreNewCluster {
var c2 *cluster
if monotonicLogStore {
c2 = MakeClusterCustom(t, optsMonotonic)
} else {
c2 = MakeCluster(numPeers, t, conf)
}
c = c2
leader = c.Leader()
}

// Restore the snapshot, twiddling the index with the offset.
meta, reader, err := snap.Open()
meta.Index += offset
Expand All @@ -1397,17 +1505,40 @@ func snapshotAndRestore(t *testing.T, offset uint64) {
// an index to create a hole, and then we apply a no-op after the
// restore.
var expected uint64
if meta.Index < preIndex {
if !restoreNewCluster && meta.Index < preIndex {
expected = preIndex + 2
} else {
// restoring onto a new cluster should always have a last index based
// off of the snaphsot meta index
expected = meta.Index + 2
}

lastIndex := leader.getLastIndex()
if lastIndex != expected {
t.Fatalf("Index was not updated correctly: %d vs. %d", lastIndex, expected)
}

// Ensure all the logs are the same and that we have everything that was
// Ensure raft logs are removed for monotonic log stores but remain
// untouched for non-monotic (BoltDB) logstores.
// When first index = 1, then logs have remained untouched.
// When first indext is set to the next commit index / last index, then
// it means logs have been removed.
raftNodes := make([]*Raft, 0, numPeers+1)
raftNodes = append(raftNodes, leader)
raftNodes = append(raftNodes, c.Followers()...)
for _, raftNode := range raftNodes {
firstLogIndex, err := raftNode.logs.FirstIndex()
require.NoError(t, err)
lastLogIndex, err := raftNode.logs.LastIndex()
require.NoError(t, err)
if monotonicLogStore {
require.Equal(t, expected, firstLogIndex)
} else {
require.Equal(t, uint64(1), firstLogIndex)
}
require.Equal(t, expected, lastLogIndex)
}
// Ensure all the fsm logs are the same and that we have everything that was
// part of the original snapshot, and that the contents after were
// reverted.
c.EnsureSame(t)
Expand All @@ -1418,7 +1549,7 @@ func snapshotAndRestore(t *testing.T, offset uint64) {
}
for i, entry := range fsm.logs {
expected := []byte(fmt.Sprintf("test %d", i))
if bytes.Compare(entry, expected) != 0 {
if !bytes.Equal(entry, expected) {
t.Fatalf("Log entry bad: %v", entry)
}
}
Expand Down Expand Up @@ -1446,10 +1577,17 @@ func TestRaft_UserRestore(t *testing.T) {
10000,
}

restoreToNewClusterCases := []bool{false, true}

for _, c := range cases {
t.Run(fmt.Sprintf("case %v", c), func(t *testing.T) {
snapshotAndRestore(t, c)
})
for _, restoreNewCluster := range restoreToNewClusterCases {
t.Run(fmt.Sprintf("case %v | restored to new cluster: %t", c, restoreNewCluster), func(t *testing.T) {
snapshotAndRestore(t, c, false, restoreNewCluster)
})
t.Run(fmt.Sprintf("monotonic case %v | restored to new cluster: %t", c, restoreNewCluster), func(t *testing.T) {
snapshotAndRestore(t, c, true, restoreNewCluster)
})
}
}
}

Expand Down Expand Up @@ -2380,6 +2518,7 @@ func TestRaft_LeadershipTransferStopRightAway(t *testing.T) {
t.Errorf("leadership shouldn't have started, but instead it error with: %v", err)
}
}

func TestRaft_GetConfigurationNoBootstrap(t *testing.T) {
c := MakeCluster(2, t, nil)
defer c.Close()
Expand Down Expand Up @@ -2417,6 +2556,41 @@ func TestRaft_GetConfigurationNoBootstrap(t *testing.T) {
}
}

func TestRaft_LogStoreIsMonotonic(t *testing.T) {
c := MakeCluster(1, t, nil)
defer c.Close()

// Should be one leader
leader := c.Leader()
c.EnsureLeader(t, leader.localAddr)

// Test the monotonic type assertion on the InmemStore.
_, ok := leader.logs.(MonotonicLogStore)
assert.False(t, ok)

var log LogStore

// Wrapping the non-monotonic store as a LogCache should make it pass the
// type assertion, but the underlying store is still non-monotonic.
log, _ = NewLogCache(100, leader.logs)
mcast, ok := log.(MonotonicLogStore)
require.True(t, ok)
assert.False(t, mcast.IsMonotonic())

// Now create a new MockMonotonicLogStore using the leader logs and expect
// it to work.
log = &MockMonotonicLogStore{s: leader.logs}
mcast, ok = log.(MonotonicLogStore)
require.True(t, ok)
assert.True(t, mcast.IsMonotonic())

// Wrap the mock logstore in a LogCache and check again.
log, _ = NewLogCache(100, log)
mcast, ok = log.(MonotonicLogStore)
require.True(t, ok)
assert.True(t, mcast.IsMonotonic())
}

func TestRaft_CacheLogWithStoreError(t *testing.T) {
c := MakeCluster(2, t, nil)
defer c.Close()
Expand Down
26 changes: 26 additions & 0 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,29 @@ func (r *Raft) compactLogs(snapIdx uint64) error {
}
return nil
}

// removeOldLogs removes all old logs from the store. This is used for
// MonotonicLogStores after restore. Callers should verify that the store
// implementation is monotonic prior to calling.
func (r *Raft) removeOldLogs() error {
defer metrics.MeasureSince([]string{"raft", "removeOldLogs"}, time.Now())

// Determine log ranges to truncate
firstLogIdx, err := r.logs.FirstIndex()
if err != nil {
return fmt.Errorf("failed to get first log index: %w", err)
}

lastLogIdx, err := r.logs.LastIndex()
if err != nil {
return fmt.Errorf("failed to get last log index: %w", err)
}

r.logger.Info("removing all old logs from log store", "first", firstLogIdx, "last", lastLogIdx)

if err := r.logs.DeleteRange(firstLogIdx, lastLogIdx); err != nil {
return fmt.Errorf("log truncation failed: %v", err)
}

return nil
}