From b0b7fb8c236425d931a52623fcfb0a0ca273f2cc Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Fri, 14 Jul 2023 12:31:22 +0100 Subject: [PATCH] kvserver: send MsgAppResp back to sender This addresses the following race: - n1 runs a ConfChange that adds n2 as a learner. - n1 sends MsgApp to the learner. - n1 starts the INITIAL snapshot, say at index 100. - n2 receives n1's MsgApp. Since it's an uninitialized Replica and its log is empty, it rejects this MsgApp. - n2 receives and applies the INITIAL snapshot, which prompts it to send an affirmative MsgAppResp to n1. - n1's RawNode now tracks n2 as StateProbe (via call to ReportSnapshot(success)) - n1 receives the MsgApp rejection; n2 regresses to StateSnapshot because the rejection comes with a RejectHint (suggested next index to try) of zero, which is not in n1's log. In particular, the SnapshotIndex will likely be higher than the index of the snapshot actually sent, say 101. - n1 receives the affirmative MsgAppResp (for index 100). However, 100 < 101 so this is ignored and the follower remains in StateSnapshot. With this commit, the last two steps cannot happen: n2 transitions straight to StateReplicate because we step a copy of the affirmative MsgAppResp in. The later rejection will be dropped, since it is stale (you can't hint at index zero when you already have a positive index confirmed). I will add that there is no great testing for the above other than stressing the test with additional logging, noting the symptoms, and noting that they disappear with this commit. Scripted testing of this code is within reach[^1] but is outside of the scope of this PR. [^1]: https://github.com/cockroachdb/cockroach/issues/105177 There is an additional bit of brittleness that is silently suppressed by this commit, but which deserves to be fixed independently because how the problem gets avoided seems accidental and incomplete. When raft requests a snapshot, it notes its current LastIndex and uses it as the PendingSnapshot for the follower's Progress. At the time of writing, MsgAppResp that reconnect the follower to the log but which are not greater than or equal to PendingSnapshot are ignored. In effect, this means that perfectly good snapshots are thrown away if they happen to be a little bit stale. In the example above, the snapshot is stale: PendingSnapshot is 101, but the snapshot is at index 100. Then how does this commit (mostly) fix the problem, i.e. why isn't the snapshot discarded? The key is that when we synchronously step the MsgAppResp(100) into the leader's RawNode, the rejection hasn't arrived yet and so the follower transitions into StateReplicate with a Match of 100. This is then enough so that raft recognizes the rejected MsgApp as stale (since it would regress on durably stored entries). However, there is an alternative example where the rejection arrives earlier: after the snapshot index has been picked, but before the follower has been transitioned into StateReplicate. For this to have a negative effect, an entry has to be appended to the leader's log between generating the snapshot and handling the rejection. Without the combination of delegated snapshots and sustained write activity on the leader, this window is small, and this combination is usually not present in tests but it may well be relevant in "real" clusters. We track addressing this in #106813. Closes #87554. Closes #97971. Closes #84242. Epic: None Release note (bug fix): removed a source of unnecessary Raft snapshots during replica movement. --- pkg/kv/kvserver/replica_command.go | 19 +++++++++++++++++-- pkg/kv/kvserver/replica_raft.go | 16 +++++++++++++--- pkg/kv/kvserver/replica_raftstorage.go | 3 ++- pkg/kv/kvserver/store_raft.go | 12 ++++++++++++ pkg/kv/kvserver/store_snapshot.go | 1 + 5 files changed, 45 insertions(+), 6 deletions(-) diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index b163fd210aa4..155eff5736d2 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2872,8 +2872,23 @@ func (r *Replica) sendSnapshotUsingDelegate( retErr = timeutil.RunWithTimeout( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { // Sending snapshot - _, err := r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest) - return err + resp, err := r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest) + if err != nil { + return err + } + if resp.MsgAppResp != nil { + _ = r.withRaftGroup(func(rn *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) { + msg := *resp.MsgAppResp + // With a delegated snapshot, the recipient received the snapshot + // from another replica and will thus respond to it instead. But the + // message is valid for the actual originator of the send as well. + msg.To = rn.BasicStatus().ID + // We do want to unquiesce here - we wouldn't ever want state transitions + // on a quiesced replica. + return true, rn.Step(*resp.MsgAppResp) + }) + } + return nil }, ) if !selfDelegate { diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index f384fa99f762..dceb3024973b 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -930,6 +930,18 @@ func (r *Replica) handleRaftReadyRaftMuLocked( if err := r.applySnapshot(ctx, inSnap, snap, hs, subsumedRepls); err != nil { return stats, errors.Wrap(err, "while applying snapshot") } + for _, msg := range msgStorageAppend.Responses { + // The caller would like to see the MsgAppResp that usually results from + // applying the snapshot synchronously, so fish it out. + if msg.To == uint64(inSnap.FromReplica.ReplicaID) && + msg.Type == raftpb.MsgAppResp && + !msg.Reject && + msg.Index == snap.Metadata.Index { + + inSnap.msgAppRespCh <- msg + break + } + } stats.tSnapEnd = timeutil.Now() stats.snap.applied = true @@ -1827,9 +1839,7 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID // index it requested is now actually durable on the follower. Note also that // the follower will generate an MsgAppResp reflecting the applied snapshot // which typically moves the follower to StateReplicate when (if) received - // by the leader. - // - // See: https://github.com/cockroachdb/cockroach/issues/87581 + // by the leader, which as of #106793 we do synchronously. if err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) { raftGroup.ReportSnapshot(uint64(to), snapStatus) return true, nil diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index bf7d374a2c9a..4865725f64ea 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -313,7 +313,8 @@ type IncomingSnapshot struct { DataSize int64 snapType kvserverpb.SnapshotRequest_Type placeholder *ReplicaPlaceholder - raftAppliedIndex kvpb.RaftIndex // logging only + raftAppliedIndex kvpb.RaftIndex // logging only + msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied } func (s IncomingSnapshot) String() string { diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 2deb6426ca0e..0c347ba7e020 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -499,6 +499,18 @@ func (s *Store) processRaftSnapshotRequest( log.Infof(ctx, "ignored stale snapshot at index %d", snapHeader.RaftMessageRequest.Message.Snapshot.Metadata.Index) s.metrics.RangeSnapshotRecvUnusable.Inc(1) } + // If the snapshot was applied and acked with an MsgAppResp, return that + // message up the stack. We're using msgAppRespCh as a shortcut to avoid + // plumbing return parameters through an additional few layers of raft + // handling. + // + // NB: in practice there's always an MsgAppResp here, but it is better not + // to rely on what is essentially discretionary raft behavior. + select { + case msg := <-inSnap.msgAppRespCh: + msgAppResp = &msg + default: + } return nil }) if pErr != nil { diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 5ed684fe3ff4..2c4f268cd04a 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -504,6 +504,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( DataSize: dataSize, snapType: header.Type, raftAppliedIndex: header.State.RaftAppliedIndex, + msgAppRespCh: make(chan raftpb.Message, 1), } timingTag.stop("totalTime")