From 9d75c37725ce47cc6cdf02f2ba8e301999594ef8 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Fri, 14 Jul 2023 11:36:12 +0100 Subject: [PATCH 1/6] kvserverpb: add MsgAppResp field to {Delegate,}SnapshotResponse This will be used to hand the MsgAppResp generated at snapshot application back to the sender. --- pkg/kv/kvserver/kvserverpb/raft.proto | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index ac8030298d90..eccb862fa541 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -287,6 +287,14 @@ message SnapshotResponse { // // MIGRATION: only guaranteed to be set when the message field is no longer there. errorspb.EncodedError encoded_error = 5 [(gogoproto.nullable) = false]; + + // msg_app_resp stores an optional MsgAppResp the receiving RawNode may have + // generated in response to applying the snapshot. This message will also have + // been handed to the raft transport, but it is helpful to step it into the + // sender manually to avoid the race described in: + // + // https://github.com/cockroachdb/cockroach/issues/97971 + raftpb.Message msg_app_resp = 6; } // TODO(baptist): Extend this if necessary to separate out the request for the throttle. @@ -356,6 +364,14 @@ message DelegateSnapshotResponse { // collected_spans stores trace spans recorded during the execution of this // request. repeated util.tracing.tracingpb.RecordedSpan collected_spans = 3 [(gogoproto.nullable) = false]; + + // msg_app_resp stores an optional MsgAppResp the receiving RawNode may have + // generated in response to applying the snapshot. This message will also have + // been handed to the raft transport, but it is helpful to step it into the + // sender manually to avoid the race described in: + // + // https://github.com/cockroachdb/cockroach/issues/97971 + raftpb.Message msg_app_resp = 4; } // ConfChangeContext is encoded in the raftpb.ConfChange.Context field. From 378c91a0a9720e652ebe799695150a19804743c8 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Fri, 14 Jul 2023 11:40:42 +0100 Subject: [PATCH 2/6] kvserver: plumb MsgAppResp back to snapshot sender We want the initiator of a (potentially delegated) snapshot to be able to see the MsgAppResp that is generated on the recipient of the snapshot as a result of application. This commit does the plumbing, but the `*MsgAppResp` is always `nil`, i.e. no actual logic was added yet. --- pkg/kv/kvserver/raft_transport.go | 26 ++++++++++++--------- pkg/kv/kvserver/replica_command.go | 28 ++++++++++++++-------- pkg/kv/kvserver/store_raft.go | 13 ++++++++--- pkg/kv/kvserver/store_snapshot.go | 37 +++++++++++++++++------------- pkg/kv/kvserver/store_test.go | 9 ++++---- 5 files changed, 70 insertions(+), 43 deletions(-) diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 7923d84323aa..0db457c010d8 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -1099,6 +1099,10 @@ func (t *RaftTransport) dropFlowTokensForDisconnectedNodes() { // SendSnapshot streams the given outgoing snapshot. The caller is responsible // for closing the OutgoingSnapshot. +// +// The optional (but usually present) returned message is an MsgAppResp that +// results from the follower applying the snapshot, acking the log at the index +// of the snapshot. func (t *RaftTransport) SendSnapshot( ctx context.Context, storePool *storepool.StorePool, @@ -1107,17 +1111,17 @@ func (t *RaftTransport) SendSnapshot( newWriteBatch func() storage.WriteBatch, sent func(), recordBytesSent snapshotRecordMetrics, -) error { +) (*kvserverpb.SnapshotResponse, error) { nodeID := header.RaftMessageRequest.ToReplica.NodeID conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass) if err != nil { - return err + return nil, err } client := NewMultiRaftClient(conn) stream, err := client.RaftSnapshot(ctx) if err != nil { - return err + return nil, err } defer func() { @@ -1132,18 +1136,18 @@ func (t *RaftTransport) SendSnapshot( // and determines if it encountered any errors when sending the snapshot. func (t *RaftTransport) DelegateSnapshot( ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, -) error { +) (*kvserverpb.DelegateSnapshotResponse, error) { nodeID := req.DelegatedSender.NodeID conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass) if err != nil { - return errors.Mark(err, errMarkSnapshotError) + return nil, errors.Mark(err, errMarkSnapshotError) } client := NewMultiRaftClient(conn) // Creates a rpc stream between the leaseholder and sender. stream, err := client.DelegateRaftSnapshot(ctx) if err != nil { - return errors.Mark(err, errMarkSnapshotError) + return nil, errors.Mark(err, errMarkSnapshotError) } defer func() { if err := stream.CloseSend(); err != nil { @@ -1154,12 +1158,12 @@ func (t *RaftTransport) DelegateSnapshot( // Send the request. wrappedRequest := &kvserverpb.DelegateSnapshotRequest{Value: &kvserverpb.DelegateSnapshotRequest_Send{Send: req}} if err := stream.Send(wrappedRequest); err != nil { - return errors.Mark(err, errMarkSnapshotError) + return nil, errors.Mark(err, errMarkSnapshotError) } // Wait for response to see if the receiver successfully applied the snapshot. resp, err := stream.Recv() if err != nil { - return errors.Mark( + return nil, errors.Mark( errors.Wrapf(err, "%v: remote failed to send snapshot", req), errMarkSnapshotError, ) } @@ -1175,14 +1179,14 @@ func (t *RaftTransport) DelegateSnapshot( switch resp.Status { case kvserverpb.DelegateSnapshotResponse_ERROR: - return errors.Mark( + return nil, errors.Mark( errors.Wrapf(resp.Error(), "error sending couldn't accept %v", req), errMarkSnapshotError) case kvserverpb.DelegateSnapshotResponse_APPLIED: // This is the response we're expecting. Snapshot successfully applied. log.VEventf(ctx, 3, "%s: delegated snapshot was successfully applied", resp) - return nil + return resp, nil default: - return err + return nil, err } } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index c7fd8ccb2fde..b163fd210aa4 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2872,7 +2872,8 @@ func (r *Replica) sendSnapshotUsingDelegate( retErr = timeutil.RunWithTimeout( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { // Sending snapshot - return r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest) + _, err := r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest) + return err }, ) if !selfDelegate { @@ -3053,7 +3054,7 @@ func (r *Replica) followerSendSnapshot( ctx context.Context, recipient roachpb.ReplicaDescriptor, req *kvserverpb.DelegateSendSnapshotRequest, -) error { +) (*raftpb.Message, error) { ctx = r.AnnotateCtx(ctx) sendThreshold := traceSnapshotThreshold.Get(&r.ClusterSettings().SV) if sendThreshold > 0 { @@ -3082,14 +3083,14 @@ func (r *Replica) followerSendSnapshot( // expensive to send. err := r.validateSnapshotDelegationRequest(ctx, req) if err != nil { - return err + return nil, err } // Throttle snapshot sending. Obtain the send semaphore and determine the rate limit. rangeSize := r.GetMVCCStats().Total() cleanup, err := r.store.reserveSendSnapshot(ctx, req, rangeSize) if err != nil { - return errors.Wrap(err, "Unable to reserve space for sending this snapshot") + return nil, errors.Wrap(err, "Unable to reserve space for sending this snapshot") } defer cleanup() @@ -3097,13 +3098,13 @@ func (r *Replica) followerSendSnapshot( // sent after we are doing waiting. err = r.validateSnapshotDelegationRequest(ctx, req) if err != nil { - return err + return nil, err } snapType := req.Type snap, err := r.GetSnapshot(ctx, snapType, req.SnapId) if err != nil { - return errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) + return nil, errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) } defer snap.Close() log.Event(ctx, "generated snapshot") @@ -3174,9 +3175,10 @@ func (r *Replica) followerSendSnapshot( } } - return timeutil.RunWithTimeout( + var msgAppResp *raftpb.Message + if err := timeutil.RunWithTimeout( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { - return r.store.cfg.Transport.SendSnapshot( + resp, err := r.store.cfg.Transport.SendSnapshot( ctx, r.store.cfg.StorePool, header, @@ -3185,8 +3187,16 @@ func (r *Replica) followerSendSnapshot( sent, recordBytesSent, ) + if err != nil { + return err + } + msgAppResp = resp.MsgAppResp + return nil }, - ) + ); err != nil { + return nil, err + } + return msgAppResp, nil } // replicasCollocated is used in AdminMerge to ensure that the ranges are diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 82d2065730c3..2deb6426ca0e 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -183,7 +183,8 @@ func (s *Store) HandleDelegatedSnapshot( } // Pass the request to the sender replica. - if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req); err != nil { + msgAppResp, err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req) + if err != nil { // If an error occurred during snapshot sending, send an error response. return &kvserverpb.DelegateSnapshotResponse{ Status: kvserverpb.DelegateSnapshotResponse_ERROR, @@ -195,6 +196,7 @@ func (s *Store) HandleDelegatedSnapshot( return &kvserverpb.DelegateSnapshotResponse{ Status: kvserverpb.DelegateSnapshotResponse_APPLIED, CollectedSpans: sp.GetConfiguredRecording(), + MsgAppResp: msgAppResp, } } @@ -426,8 +428,9 @@ func (s *Store) processRaftRequestWithReplica( // will have been removed. func (s *Store) processRaftSnapshotRequest( ctx context.Context, snapHeader *kvserverpb.SnapshotRequest_Header, inSnap IncomingSnapshot, -) *kvpb.Error { - return s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func( +) (*raftpb.Message, *kvpb.Error) { + var msgAppResp *raftpb.Message + pErr := s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func( ctx context.Context, r *Replica, ) (pErr *kvpb.Error) { ctx = r.AnnotateCtx(ctx) @@ -498,6 +501,10 @@ func (s *Store) processRaftSnapshotRequest( } return nil }) + if pErr != nil { + return nil, pErr + } + return msgAppResp, nil } // HandleRaftResponse implements the IncomingRaftMessageHandler interface. Per diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 930d686f1b78..5ed684fe3ff4 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -1140,7 +1140,8 @@ func (s *Store) receiveSnapshot( // already received the entire snapshot here, so there's no point in // abandoning application half-way through if the caller goes away. applyCtx := s.AnnotateCtx(context.Background()) - if pErr := s.processRaftSnapshotRequest(applyCtx, header, inSnap); pErr != nil { + msgAppResp, pErr := s.processRaftSnapshotRequest(applyCtx, header, inSnap) + if pErr != nil { err := pErr.GoError() // We mark this error as a snapshot error which will be interpreted by the // sender as this being a retriable error, see isSnapshotError(). @@ -1151,6 +1152,7 @@ func (s *Store) receiveSnapshot( return stream.Send(&kvserverpb.SnapshotResponse{ Status: kvserverpb.SnapshotResponse_APPLIED, CollectedSpans: tracing.SpanFromContext(ctx).GetConfiguredRecording(), + MsgAppResp: msgAppResp, }) } @@ -1482,7 +1484,7 @@ func SendEmptySnapshot( } }() - return sendSnapshot( + if _, err := sendSnapshot( ctx, st, tracer, @@ -1493,7 +1495,10 @@ func SendEmptySnapshot( eng.NewWriteBatch, func() {}, nil, /* recordBytesSent */ - ) + ); err != nil { + return err + } + return nil } // noopStorePool is a hollowed out StorePool that does not throttle. It's used in recovery scenarios. @@ -1513,7 +1518,7 @@ func sendSnapshot( newWriteBatch func() storage.WriteBatch, sent func(), recordBytesSent snapshotRecordMetrics, -) error { +) (*kvserverpb.SnapshotResponse, error) { if recordBytesSent == nil { // NB: Some tests and an offline tool (ResetQuorum) call into `sendSnapshotUsingDelegate` // with a nil metrics tracking function. We pass in a fake metrics tracking function here that isn't @@ -1526,7 +1531,7 @@ func sendSnapshot( start := timeutil.Now() to := header.RaftMessageRequest.ToReplica if err := stream.Send(&kvserverpb.SnapshotRequest{Header: &header}); err != nil { - return err + return nil, err } log.Event(ctx, "sent SNAPSHOT_REQUEST message to server") // Wait until we get a response from the server. The recipient may queue us @@ -1536,13 +1541,13 @@ func sendSnapshot( resp, err := stream.Recv() if err != nil { storePool.Throttle(storepool.ThrottleFailed, err.Error(), to.StoreID) - return err + return nil, err } switch resp.Status { case kvserverpb.SnapshotResponse_ERROR: sp.ImportRemoteRecording(resp.CollectedSpans) storePool.Throttle(storepool.ThrottleFailed, resp.DeprecatedMessage, to.StoreID) - return errors.Wrapf(maybeHandleDeprecatedSnapErr(resp.Error()), "%s: remote couldn't accept %s", to, snap) + return nil, errors.Wrapf(maybeHandleDeprecatedSnapErr(resp.Error()), "%s: remote couldn't accept %s", to, snap) case kvserverpb.SnapshotResponse_ACCEPTED: // This is the response we're expecting. Continue with snapshot sending. log.Event(ctx, "received SnapshotResponse_ACCEPTED message from server") @@ -1550,7 +1555,7 @@ func sendSnapshot( err := errors.Errorf("%s: server sent an invalid status while negotiating %s: %s", to, snap, resp.Status) storePool.Throttle(storepool.ThrottleFailed, err.Error(), to.StoreID) - return err + return nil, err } durQueued := timeutil.Since(start) @@ -1586,7 +1591,7 @@ func sendSnapshot( // Record timings for snapshot send if kv.trace.snapshot.enable_threshold is enabled numBytesSent, err := ss.Send(ctx, stream, header, snap, recordBytesSent) if err != nil { - return err + return nil, err } durSent := timeutil.Since(start) @@ -1595,7 +1600,7 @@ func sendSnapshot( // applied. sent() if err := stream.Send(&kvserverpb.SnapshotRequest{Final: true}); err != nil { - return err + return nil, err } log.KvDistribution.Infof( ctx, @@ -1612,7 +1617,7 @@ func sendSnapshot( resp, err = stream.Recv() if err != nil { - return errors.Wrapf(err, "%s: remote failed to apply snapshot", to) + return nil, errors.Wrapf(err, "%s: remote failed to apply snapshot", to) } sp.ImportRemoteRecording(resp.CollectedSpans) // NB: wait for EOF which ensures that all processing on the server side has @@ -1620,19 +1625,19 @@ func sendSnapshot( // received). if unexpectedResp, err := stream.Recv(); err != io.EOF { if err != nil { - return errors.Wrapf(err, "%s: expected EOF, got resp=%v with error", to, unexpectedResp) + return nil, errors.Wrapf(err, "%s: expected EOF, got resp=%v with error", to, unexpectedResp) } - return errors.Newf("%s: expected EOF, got resp=%v", to, unexpectedResp) + return nil, errors.Newf("%s: expected EOF, got resp=%v", to, unexpectedResp) } switch resp.Status { case kvserverpb.SnapshotResponse_ERROR: - return errors.Wrapf( + return nil, errors.Wrapf( maybeHandleDeprecatedSnapErr(resp.Error()), "%s: remote failed to apply snapshot", to, ) case kvserverpb.SnapshotResponse_APPLIED: - return nil + return resp, nil default: - return errors.Errorf("%s: server sent an invalid status during finalization: %s", + return nil, errors.Errorf("%s: server sent an invalid status during finalization: %s", to, resp.Status, ) } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 18621ab9a4e3..0810d6c385ee 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -3048,13 +3048,14 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { require.NoError(t, err) } - require.NoError(t, s.processRaftSnapshotRequest(ctx, req, + _, pErr := s.processRaftSnapshotRequest(ctx, req, IncomingSnapshot{ SnapUUID: uuid.MakeV4(), Desc: desc, placeholder: placeholder, }, - ).GoError()) + ) + require.NoError(t, pErr.GoError()) testutils.SucceedsSoon(t, func() error { s.mu.Lock() @@ -3127,7 +3128,7 @@ func TestSendSnapshotThrottling(t *testing.T) { sp := &fakeStorePool{} expectedErr := errors.New("") c := fakeSnapshotStream{nil, expectedErr} - err := sendSnapshot( + _, err := sendSnapshot( ctx, st, tr, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */ ) if sp.failedThrottles != 1 { @@ -3146,7 +3147,7 @@ func TestSendSnapshotThrottling(t *testing.T) { EncodedError: errors.EncodeError(ctx, errors.New("boom")), } c := fakeSnapshotStream{resp, nil} - err := sendSnapshot( + _, err := sendSnapshot( ctx, st, tr, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */ ) if sp.failedThrottles != 1 { From e96860479b6597a3ea4811ab21c3d5d3393901f8 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Fri, 14 Jul 2023 12:31:22 +0100 Subject: [PATCH 3/6] kvserver: send MsgAppResp back to sender This addresses the following race: - n1 runs a ConfChange that adds n2 as a learner. - n1 sends MsgApp to the learner. - n1 starts the INITIAL snapshot, say at index 100. - n2 receives n1's MsgApp. Since it's an uninitialized Replica and its log is empty, it rejects this MsgApp. - n2 receives and applies the INITIAL snapshot, which prompts it to send an affirmative MsgAppResp to n1. - n1's RawNode now tracks n2 as StateProbe (via call to ReportSnapshot(success)) - n1 receives the MsgApp rejection; n2 regresses to StateSnapshot because the rejection comes with a RejectHint (suggested next index to try) of zero, which is not in n1's log. In particular, the SnapshotIndex will likely be higher than the index of the snapshot actually sent, say 101. - n1 receives the affirmative MsgAppResp (for index 100). However, 100 < 101 so this is ignored and the follower remains in StateSnapshot. With this commit, the last two steps cannot happen: n2 transitions straight to StateReplicate because we step a copy of the affirmative MsgAppResp in. The later rejection will be dropped, since it is stale (you can't hint at index zero when you already have a positive index confirmed). I will add that there is no great testing for the above other than stressing the test with additional logging, noting the symptoms, and noting that they disappear with this commit. Scripted testing of this code is within reach[^1] but is outside of the scope of this PR. [^1]: https://github.com/cockroachdb/cockroach/issues/105177 There is an additional bit of brittleness that is silently suppressed by this commit, but which deserves to be fixed independently because how the problem gets avoided seems accidental and incomplete. When raft requests a snapshot, it notes its current LastIndex and uses it as the PendingSnapshot for the follower's Progress. At the time of writing, MsgAppResp that reconnect the follower to the log but which are not greater than or equal to PendingSnapshot are ignored. In effect, this means that perfectly good snapshots are thrown away if they happen to be a little bit stale. In the example above, the snapshot is stale: PendingSnapshot is 101, but the snapshot is at index 100. Then how does this commit (mostly) fix the problem, i.e. why isn't the snapshot discarded? The key is that when we synchronously step the MsgAppResp(100) into the leader's RawNode, the rejection hasn't arrived yet and so the follower transitions into StateReplicate with a Match of 100. This is then enough so that raft recognizes the rejected MsgApp as stale (since it would regress on durably stored entries). However, there is an alternative example where the rejection arrives earlier: after the snapshot index has been picked, but before the follower has been transitioned into StateReplicate. For this to have a negative effect, an entry has to be appended to the leader's log between generating the snapshot and handling the rejection. Without the combination of delegated snapshots and sustained write activity on the leader, this window is small, and this combination is usually not present in tests but it may well be relevant in "real" clusters. We track addressing this in #106813. Closes #87554. Closes #97971. Closes #84242. Epic: None Release note (bug fix): removed a source of unnecessary Raft snapshots during replica movement. --- pkg/kv/kvserver/replica_command.go | 19 +++++++++++++++++-- pkg/kv/kvserver/replica_raft.go | 16 +++++++++++++--- pkg/kv/kvserver/replica_raftstorage.go | 3 ++- pkg/kv/kvserver/store_raft.go | 12 ++++++++++++ pkg/kv/kvserver/store_snapshot.go | 1 + 5 files changed, 45 insertions(+), 6 deletions(-) diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index b163fd210aa4..155eff5736d2 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2872,8 +2872,23 @@ func (r *Replica) sendSnapshotUsingDelegate( retErr = timeutil.RunWithTimeout( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { // Sending snapshot - _, err := r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest) - return err + resp, err := r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest) + if err != nil { + return err + } + if resp.MsgAppResp != nil { + _ = r.withRaftGroup(func(rn *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) { + msg := *resp.MsgAppResp + // With a delegated snapshot, the recipient received the snapshot + // from another replica and will thus respond to it instead. But the + // message is valid for the actual originator of the send as well. + msg.To = rn.BasicStatus().ID + // We do want to unquiesce here - we wouldn't ever want state transitions + // on a quiesced replica. + return true, rn.Step(*resp.MsgAppResp) + }) + } + return nil }, ) if !selfDelegate { diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index c8ee59bb8885..df990844beca 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -930,6 +930,18 @@ func (r *Replica) handleRaftReadyRaftMuLocked( if err := r.applySnapshot(ctx, inSnap, snap, hs, subsumedRepls); err != nil { return stats, errors.Wrap(err, "while applying snapshot") } + for _, msg := range msgStorageAppend.Responses { + // The caller would like to see the MsgAppResp that usually results from + // applying the snapshot synchronously, so fish it out. + if msg.To == uint64(inSnap.FromReplica.ReplicaID) && + msg.Type == raftpb.MsgAppResp && + !msg.Reject && + msg.Index == snap.Metadata.Index { + + inSnap.msgAppRespCh <- msg + break + } + } stats.tSnapEnd = timeutil.Now() stats.snap.applied = true @@ -1827,9 +1839,7 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID // index it requested is now actually durable on the follower. Note also that // the follower will generate an MsgAppResp reflecting the applied snapshot // which typically moves the follower to StateReplicate when (if) received - // by the leader. - // - // See: https://github.com/cockroachdb/cockroach/issues/87581 + // by the leader, which as of #106793 we do synchronously. if err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) { raftGroup.ReportSnapshot(uint64(to), snapStatus) return true, nil diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index bf7d374a2c9a..4865725f64ea 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -313,7 +313,8 @@ type IncomingSnapshot struct { DataSize int64 snapType kvserverpb.SnapshotRequest_Type placeholder *ReplicaPlaceholder - raftAppliedIndex kvpb.RaftIndex // logging only + raftAppliedIndex kvpb.RaftIndex // logging only + msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied } func (s IncomingSnapshot) String() string { diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 2deb6426ca0e..0c347ba7e020 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -499,6 +499,18 @@ func (s *Store) processRaftSnapshotRequest( log.Infof(ctx, "ignored stale snapshot at index %d", snapHeader.RaftMessageRequest.Message.Snapshot.Metadata.Index) s.metrics.RangeSnapshotRecvUnusable.Inc(1) } + // If the snapshot was applied and acked with an MsgAppResp, return that + // message up the stack. We're using msgAppRespCh as a shortcut to avoid + // plumbing return parameters through an additional few layers of raft + // handling. + // + // NB: in practice there's always an MsgAppResp here, but it is better not + // to rely on what is essentially discretionary raft behavior. + select { + case msg := <-inSnap.msgAppRespCh: + msgAppResp = &msg + default: + } return nil }) if pErr != nil { diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 5ed684fe3ff4..2c4f268cd04a 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -504,6 +504,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( DataSize: dataSize, snapType: header.Type, raftAppliedIndex: header.State.RaftAppliedIndex, + msgAppRespCh: make(chan raftpb.Message, 1), } timingTag.stop("totalTime") From 24428625a59bf0cb97d3019587249ecaf61e3b6c Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Wed, 7 Sep 2022 20:47:56 -0400 Subject: [PATCH 4/6] kvserver: add TestAddVotersWithoutRaftQueue This commit introduces a simple test that attempts to create a single range and add it to two followers. The expectation is that this should succeed using the replicate queue to send the snapshots. When run under `--stress`, in the past it would occasionally fail due to an incorrect state transition related to the StateProbe state. Release note: None Epic: none --- pkg/kv/kvserver/replica_learner_test.go | 50 +++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 9cccd955400b..29f2d84283fd 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -2441,3 +2441,53 @@ func TestRebalancingAndCrossRegionZoneSnapshotMetrics(t *testing.T) { }) } + +// TestAddVotersWithoutRaftQueue verifies that in normal operations Raft +// snapshots are not required. This test creates a range with a single voter, +// then adds two additional voters. Most of the time this succeeds, however it +// fails (today) occasionally due to the addition of the first voter being +// "incomplete" and therefore the second voter is not able to be added because +// there is no quorum. +// +// Specifically the following sequence of events happens when the leader adds +// the first voter: +// 1. AdminChangeReplicasRequest is processed on n1. +// a) Adds a n2 as a LEARNER to raft. +// b) Sends an initial snapshot to n2. +// c) n2 receives and applies the snapshot. +// d) n2 responds that it successfully applied the snapshot. +// e) n1 receives the response and updates state to Follower. +// 2. Before step c above, n1 sends a MsgApp to n2 +// a) MsgApp - entries up-to and including the conf change. +// b) The MsgApp is received and REJECTED because the term is wrong. +// c) After 1e above, n1 receives the rejection. +// d) n1 updates n2 from StateReplicate to StateProbe and then StateSnapshot. +// +// From n2's perspective, it receives the MsgApp prior to the initial snapshot. +// This results in it responding with a rejected MsgApp. Later it receives the +// snapshot and correctly applies it. However, when n1 sees the rejected MsgApp, +// it moves n2 status to StateProbe and stops sending Raft updates to it as it +// plans to fix it with a Raft Snapshot. As the raft snapshot queue is disabled +// this never happens and the state is stuck as a non-Learner in StateProbe. At +// this point, the Raft group is wedged since it only has 1/2 nodes available +// for Raft consensus. +func TestAddVotersWithoutRaftQueue(t *testing.T) { + defer leaktest.AfterTest(t)() + skip.WithIssue(t, 87553) + ctx := context.Background() + + // Disable the raft snapshot queue to make sure we don't require a raft snapshot. + tc := testcluster.StartTestCluster( + t, 3, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{DisableRaftSnapshotQueue: true}}, + }, + ReplicationMode: base.ReplicationManual, + }, + ) + defer tc.Stopper().Stop(ctx) + + key := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, key, tc.Target(1)) + tc.AddVotersOrFatal(t, key, tc.Target(2)) +} From 5cb40f2889ae6c17f9feb405c76c13d98e02638b Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Fri, 14 Jul 2023 10:44:09 +0100 Subject: [PATCH 5/6] kvserver: unskip TestAddVotersWithoutRaftQueue --- pkg/kv/kvserver/replica_learner_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 29f2d84283fd..83b85b9bdfcd 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -2473,7 +2473,6 @@ func TestRebalancingAndCrossRegionZoneSnapshotMetrics(t *testing.T) { // for Raft consensus. func TestAddVotersWithoutRaftQueue(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 87553) ctx := context.Background() // Disable the raft snapshot queue to make sure we don't require a raft snapshot. From 0e3b84b1c903289cb652b8fd135c02683f7016c1 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 13 Jul 2023 22:56:58 +0100 Subject: [PATCH 6/6] kvserver: unskip TestAdminRelocateRange --- pkg/kv/kvserver/client_relocate_range_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/kv/kvserver/client_relocate_range_test.go b/pkg/kv/kvserver/client_relocate_range_test.go index a49646800fbf..eb1869986d98 100644 --- a/pkg/kv/kvserver/client_relocate_range_test.go +++ b/pkg/kv/kvserver/client_relocate_range_test.go @@ -27,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -166,7 +165,6 @@ func usesAtomicReplicationChange(ops []kvpb.ReplicationChange) bool { func TestAdminRelocateRange(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 84242, "flaky test") defer log.Scope(t).Close(t) ctx := context.Background()