diff --git a/log.go b/log.go index f75e5d8c..4ae21932 100644 --- a/log.go +++ b/log.go @@ -122,13 +122,26 @@ type LogStore interface { // StoreLog stores a log entry. StoreLog(log *Log) error - // StoreLogs stores multiple log entries. + // StoreLogs stores multiple log entries. By default the logs stored may not be contiguous with previous logs (i.e. may have a gap in Index since the last log written). If an implementation can't tolerate this it may optionally implement `MonotonicLogStore` to indicate that this is not allowed. This changes Raft's behaviour after restoring a user snapshot to remove all previous logs instead of relying on a "gap" to signal the discontinuity between logs before the snapshot and logs after. StoreLogs(logs []*Log) error // DeleteRange deletes a range of log entries. The range is inclusive. DeleteRange(min, max uint64) error } +// MonotonicLogStore is an optional interface for LogStore implementations that +// cannot tolerate gaps in between the Index values of consecutive log entries. For example, +// this may allow more efficient indexing because the Index values are densely populated. If true is +// returned, Raft will avoid relying on gaps to trigger re-synching logs on followers after a +// snapshot is restored. The LogStore must have an efficient implementation of +// DeleteLogs for the case where all logs are removed, as this must be called after snapshot restore when gaps are not allowed. +// We avoid deleting all records for LogStores that do not implement MonotonicLogStore +// because although it's always correct to do so, it has a major negative performance impact on the BoltDB store that is currently +// the most widely used. +type MonotonicLogStore interface { + IsMonotonic() bool +} + func oldestLog(s LogStore) (Log, error) { var l Log diff --git a/log_cache.go b/log_cache.go index 4c067e29..2cc3885a 100644 --- a/log_cache.go +++ b/log_cache.go @@ -33,6 +33,16 @@ func NewLogCache(capacity int, store LogStore) (*LogCache, error) { return c, nil } +// IsMonotonic implements the MonotonicLogStore interface. This is a shim to +// expose the underyling store as monotonically indexed or not. +func (c *LogCache) IsMonotonic() bool { + if store, ok := c.store.(MonotonicLogStore); ok { + return store.IsMonotonic() + } + + return false +} + func (c *LogCache) GetLog(idx uint64, log *Log) error { // Check the buffer for an entry c.l.RLock() diff --git a/raft.go b/raft.go index 8017b370..8594f442 100644 --- a/raft.go +++ b/raft.go @@ -1122,7 +1122,14 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error { r.setLastApplied(lastIndex) r.setLastSnapshot(lastIndex, term) - r.logger.Info("restored user snapshot", "index", latestIndex) + // Remove old logs if r.logs is a MonotonicLogStore. Log any errors and continue. + if logs, ok := r.logs.(MonotonicLogStore); ok && logs.IsMonotonic() { + if err := r.removeOldLogs(); err != nil { + r.logger.Error("failed to remove old logs", "error", err) + } + } + + r.logger.Info("restored user snapshot", "index", lastIndex) return nil } @@ -1790,15 +1797,19 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { r.setLatestConfiguration(reqConfiguration, reqConfigurationIndex) r.setCommittedConfiguration(reqConfiguration, reqConfigurationIndex) - // Compact logs, continue even if this fails - if err := r.compactLogs(req.LastLogIndex); err != nil { + // Clear old logs if r.logs is a MonotonicLogStore. Otherwise compact the + // logs. In both cases, log any errors and continue. + if mlogs, ok := r.logs.(MonotonicLogStore); ok && mlogs.IsMonotonic() { + if err := r.removeOldLogs(); err != nil { + r.logger.Error("failed to reset logs", "error", err) + } + } else if err := r.compactLogs(req.LastLogIndex); err != nil { r.logger.Error("failed to compact logs", "error", err) } r.logger.Info("Installed remote snapshot") resp.Success = true r.setLastContact() - return } // setLastContact is used to set the last contact time to now diff --git a/raft_test.go b/raft_test.go index f2d6ef73..d941c5a2 100644 --- a/raft_test.go +++ b/raft_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -1019,6 +1020,88 @@ func TestRaft_SnapshotRestore(t *testing.T) { } } +func TestRaft_RestoreSnapshotOnStartup_Monotonic(t *testing.T) { + // Make the cluster + conf := inmemConfig(t) + conf.TrailingLogs = 10 + opts := &MakeClusterOpts{ + Peers: 1, + Bootstrap: true, + Conf: conf, + MonotonicLogs: true, + } + c := MakeClusterCustom(t, opts) + defer c.Close() + + leader := c.Leader() + + // Commit a lot of things + var future Future + for i := 0; i < 100; i++ { + future = leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) + } + + // Wait for the last future to apply + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Take a snapshot + snapFuture := leader.Snapshot() + if err := snapFuture.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Check for snapshot + snaps, _ := leader.snapshots.List() + if len(snaps) != 1 { + t.Fatalf("should have a snapshot") + } + snap := snaps[0] + + // Logs should be trimmed + firstIdx, err := leader.logs.FirstIndex() + if err != nil { + t.Fatalf("err: %v", err) + } + lastIdx, err := leader.logs.LastIndex() + if err != nil { + t.Fatalf("err: %v", err) + } + + if firstIdx != snap.Index-conf.TrailingLogs+1 { + t.Fatalf("should trim logs to %d: but is %d", snap.Index-conf.TrailingLogs+1, firstIdx) + } + + // Shutdown + shutdown := leader.Shutdown() + if err := shutdown.Error(); err != nil { + t.Fatalf("err: %v", err) + } + + // Restart the Raft + r := leader + // Can't just reuse the old transport as it will be closed + _, trans2 := NewInmemTransport(r.trans.LocalAddr()) + cfg := r.config() + r, err = NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2) + if err != nil { + t.Fatalf("err: %v", err) + } + c.rafts[0] = r + + // We should have restored from the snapshot! + if last := r.getLastApplied(); last != snap.Index { + t.Fatalf("bad last index: %d, expecting %d", last, snap.Index) + } + + // Verify that logs have not been reset + first, _ := r.logs.FirstIndex() + last, _ := r.logs.LastIndex() + assert.Equal(t, firstIdx, first) + assert.Equal(t, lastIdx, last) +} + func TestRaft_SnapshotRestore_Progress(t *testing.T) { // Make the cluster conf := inmemConfig(t) @@ -1342,7 +1425,9 @@ func TestRaft_UserSnapshot(t *testing.T) { // snapshotAndRestore does a snapshot and restore sequence and applies the given // offset to the snapshot index, so we can try out different situations. -func snapshotAndRestore(t *testing.T, offset uint64) { +func snapshotAndRestore(t *testing.T, offset uint64, monotonicLogStore bool, restoreNewCluster bool) { + t.Helper() + // Make the cluster. conf := inmemConfig(t) @@ -1352,7 +1437,19 @@ func snapshotAndRestore(t *testing.T, offset uint64) { conf.ElectionTimeout = 500 * time.Millisecond conf.LeaderLeaseTimeout = 500 * time.Millisecond - c := MakeCluster(3, t, conf) + var c *cluster + numPeers := 3 + optsMonotonic := &MakeClusterOpts{ + Peers: numPeers, + Bootstrap: true, + Conf: conf, + MonotonicLogs: true, + } + if monotonicLogStore { + c = MakeClusterCustom(t, optsMonotonic) + } else { + c = MakeCluster(numPeers, t, conf) + } defer c.Close() // Wait for things to get stable and commit some things. @@ -1382,6 +1479,17 @@ func snapshotAndRestore(t *testing.T, offset uint64) { // Get the last index before the restore. preIndex := leader.getLastIndex() + if restoreNewCluster { + var c2 *cluster + if monotonicLogStore { + c2 = MakeClusterCustom(t, optsMonotonic) + } else { + c2 = MakeCluster(numPeers, t, conf) + } + c = c2 + leader = c.Leader() + } + // Restore the snapshot, twiddling the index with the offset. meta, reader, err := snap.Open() meta.Index += offset @@ -1397,17 +1505,40 @@ func snapshotAndRestore(t *testing.T, offset uint64) { // an index to create a hole, and then we apply a no-op after the // restore. var expected uint64 - if meta.Index < preIndex { + if !restoreNewCluster && meta.Index < preIndex { expected = preIndex + 2 } else { + // restoring onto a new cluster should always have a last index based + // off of the snaphsot meta index expected = meta.Index + 2 } + lastIndex := leader.getLastIndex() if lastIndex != expected { t.Fatalf("Index was not updated correctly: %d vs. %d", lastIndex, expected) } - // Ensure all the logs are the same and that we have everything that was + // Ensure raft logs are removed for monotonic log stores but remain + // untouched for non-monotic (BoltDB) logstores. + // When first index = 1, then logs have remained untouched. + // When first indext is set to the next commit index / last index, then + // it means logs have been removed. + raftNodes := make([]*Raft, 0, numPeers+1) + raftNodes = append(raftNodes, leader) + raftNodes = append(raftNodes, c.Followers()...) + for _, raftNode := range raftNodes { + firstLogIndex, err := raftNode.logs.FirstIndex() + require.NoError(t, err) + lastLogIndex, err := raftNode.logs.LastIndex() + require.NoError(t, err) + if monotonicLogStore { + require.Equal(t, expected, firstLogIndex) + } else { + require.Equal(t, uint64(1), firstLogIndex) + } + require.Equal(t, expected, lastLogIndex) + } + // Ensure all the fsm logs are the same and that we have everything that was // part of the original snapshot, and that the contents after were // reverted. c.EnsureSame(t) @@ -1418,7 +1549,7 @@ func snapshotAndRestore(t *testing.T, offset uint64) { } for i, entry := range fsm.logs { expected := []byte(fmt.Sprintf("test %d", i)) - if bytes.Compare(entry, expected) != 0 { + if !bytes.Equal(entry, expected) { t.Fatalf("Log entry bad: %v", entry) } } @@ -1446,10 +1577,17 @@ func TestRaft_UserRestore(t *testing.T) { 10000, } + restoreToNewClusterCases := []bool{false, true} + for _, c := range cases { - t.Run(fmt.Sprintf("case %v", c), func(t *testing.T) { - snapshotAndRestore(t, c) - }) + for _, restoreNewCluster := range restoreToNewClusterCases { + t.Run(fmt.Sprintf("case %v | restored to new cluster: %t", c, restoreNewCluster), func(t *testing.T) { + snapshotAndRestore(t, c, false, restoreNewCluster) + }) + t.Run(fmt.Sprintf("monotonic case %v | restored to new cluster: %t", c, restoreNewCluster), func(t *testing.T) { + snapshotAndRestore(t, c, true, restoreNewCluster) + }) + } } } @@ -2380,6 +2518,7 @@ func TestRaft_LeadershipTransferStopRightAway(t *testing.T) { t.Errorf("leadership shouldn't have started, but instead it error with: %v", err) } } + func TestRaft_GetConfigurationNoBootstrap(t *testing.T) { c := MakeCluster(2, t, nil) defer c.Close() @@ -2417,6 +2556,41 @@ func TestRaft_GetConfigurationNoBootstrap(t *testing.T) { } } +func TestRaft_LogStoreIsMonotonic(t *testing.T) { + c := MakeCluster(1, t, nil) + defer c.Close() + + // Should be one leader + leader := c.Leader() + c.EnsureLeader(t, leader.localAddr) + + // Test the monotonic type assertion on the InmemStore. + _, ok := leader.logs.(MonotonicLogStore) + assert.False(t, ok) + + var log LogStore + + // Wrapping the non-monotonic store as a LogCache should make it pass the + // type assertion, but the underlying store is still non-monotonic. + log, _ = NewLogCache(100, leader.logs) + mcast, ok := log.(MonotonicLogStore) + require.True(t, ok) + assert.False(t, mcast.IsMonotonic()) + + // Now create a new MockMonotonicLogStore using the leader logs and expect + // it to work. + log = &MockMonotonicLogStore{s: leader.logs} + mcast, ok = log.(MonotonicLogStore) + require.True(t, ok) + assert.True(t, mcast.IsMonotonic()) + + // Wrap the mock logstore in a LogCache and check again. + log, _ = NewLogCache(100, log) + mcast, ok = log.(MonotonicLogStore) + require.True(t, ok) + assert.True(t, mcast.IsMonotonic()) +} + func TestRaft_CacheLogWithStoreError(t *testing.T) { c := MakeCluster(2, t, nil) defer c.Close() diff --git a/snapshot.go b/snapshot.go index d0d9934b..89d11fda 100644 --- a/snapshot.go +++ b/snapshot.go @@ -210,10 +210,10 @@ func (r *Raft) takeSnapshot() (string, error) { return sink.ID(), nil } -// compactLogs takes the last inclusive index of a snapshot -// and trims the logs that are no longer needed. -func (r *Raft) compactLogs(snapIdx uint64) error { - defer metrics.MeasureSince([]string{"raft", "compactLogs"}, time.Now()) +// compactLogsWithTrailing takes the last inclusive index of a snapshot, +// the lastLogIdx, and and the trailingLogs and trims the logs that +// are no longer needed. +func (r *Raft) compactLogsWithTrailing(snapIdx uint64, lastLogIdx uint64, trailingLogs uint64) error { // Determine log ranges to compact minLog, err := r.logs.FirstIndex() if err != nil { @@ -221,11 +221,8 @@ func (r *Raft) compactLogs(snapIdx uint64) error { } // Check if we have enough logs to truncate - lastLogIdx, _ := r.getLastLog() - // Use a consistent value for trailingLogs for the duration of this method // call to avoid surprising behaviour. - trailingLogs := r.config().TrailingLogs if lastLogIdx <= trailingLogs { return nil } @@ -249,3 +246,33 @@ func (r *Raft) compactLogs(snapIdx uint64) error { } return nil } + +// compactLogs takes the last inclusive index of a snapshot +// and trims the logs that are no longer needed. +func (r *Raft) compactLogs(snapIdx uint64) error { + defer metrics.MeasureSince([]string{"raft", "compactLogs"}, time.Now()) + + lastLogIdx, _ := r.getLastLog() + trailingLogs := r.config().TrailingLogs + + return r.compactLogsWithTrailing(snapIdx, lastLogIdx, trailingLogs) +} + +// removeOldLogs removes all old logs from the store. This is used for +// MonotonicLogStores after restore. Callers should verify that the store +// implementation is monotonic prior to calling. +func (r *Raft) removeOldLogs() error { + defer metrics.MeasureSince([]string{"raft", "removeOldLogs"}, time.Now()) + + lastLogIdx, err := r.logs.LastIndex() + if err != nil { + return fmt.Errorf("failed to get last log index: %w", err) + } + + r.logger.Info("removing all old logs from log store") + + // call compactLogsWithTrailing with lastLogIdx for snapIdx since + // it will take the lesser of lastLogIdx and snapIdx to figure out + // the end for which to apply trailingLogs. + return r.compactLogsWithTrailing(lastLogIdx, lastLogIdx, 0) +} diff --git a/testing.go b/testing.go index 4f59a1d0..3eb0ac59 100644 --- a/testing.go +++ b/testing.go @@ -133,6 +133,47 @@ func (m *MockSnapshot) Persist(sink SnapshotSink) error { func (m *MockSnapshot) Release() { } +// MockMonotonicLogStore is a LogStore wrapper for testing the +// MonotonicLogStore interface. +type MockMonotonicLogStore struct { + s LogStore +} + +// IsMonotonic implements the MonotonicLogStore interface. +func (m *MockMonotonicLogStore) IsMonotonic() bool { + return true +} + +// FirstIndex implements the LogStore interface. +func (m *MockMonotonicLogStore) FirstIndex() (uint64, error) { + return m.s.FirstIndex() +} + +// LastIndex implements the LogStore interface. +func (m *MockMonotonicLogStore) LastIndex() (uint64, error) { + return m.s.LastIndex() +} + +// GetLog implements the LogStore interface. +func (m *MockMonotonicLogStore) GetLog(index uint64, log *Log) error { + return m.s.GetLog(index, log) +} + +// StoreLog implements the LogStore interface. +func (m *MockMonotonicLogStore) StoreLog(log *Log) error { + return m.s.StoreLog(log) +} + +// StoreLogs implements the LogStore interface. +func (m *MockMonotonicLogStore) StoreLogs(logs []*Log) error { + return m.s.StoreLogs(logs) +} + +// DeleteRange implements the LogStore interface. +func (m *MockMonotonicLogStore) DeleteRange(min uint64, max uint64) error { + return m.s.DeleteRange(min, max) +} + // This can be used as the destination for a logger and it'll // map them into calls to testing.T.Log, so that you only see // the logging for failed tests. @@ -673,6 +714,7 @@ type MakeClusterOpts struct { ConfigStoreFSM bool MakeFSMFunc func() FSM LongstopTimeout time.Duration + MonotonicLogs bool } // makeCluster will return a cluster with the given config and number of peers. @@ -748,11 +790,16 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster { // Create all the rafts c.startTime = time.Now() for i := 0; i < opts.Peers; i++ { - logs := c.stores[i] + var logs LogStore + logs = c.stores[i] store := c.stores[i] snap := c.snaps[i] trans := c.trans[i] + if opts.MonotonicLogs { + logs = &MockMonotonicLogStore{s: logs} + } + peerConf := opts.Conf peerConf.LocalID = configuration.Servers[i].ID peerConf.Logger = newTestLoggerWithPrefix(t, string(configuration.Servers[i].ID))