diff --git a/pkg/kv/kvserver/client_relocate_range_test.go b/pkg/kv/kvserver/client_relocate_range_test.go index a49646800fbf..eb1869986d98 100644 --- a/pkg/kv/kvserver/client_relocate_range_test.go +++ b/pkg/kv/kvserver/client_relocate_range_test.go @@ -27,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -166,7 +165,6 @@ func usesAtomicReplicationChange(ops []kvpb.ReplicationChange) bool { func TestAdminRelocateRange(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 84242, "flaky test") defer log.Scope(t).Close(t) ctx := context.Background() diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index ac8030298d90..eccb862fa541 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -287,6 +287,14 @@ message SnapshotResponse { // // MIGRATION: only guaranteed to be set when the message field is no longer there. errorspb.EncodedError encoded_error = 5 [(gogoproto.nullable) = false]; + + // msg_app_resp stores an optional MsgAppResp the receiving RawNode may have + // generated in response to applying the snapshot. This message will also have + // been handed to the raft transport, but it is helpful to step it into the + // sender manually to avoid the race described in: + // + // https://github.com/cockroachdb/cockroach/issues/97971 + raftpb.Message msg_app_resp = 6; } // TODO(baptist): Extend this if necessary to separate out the request for the throttle. @@ -356,6 +364,14 @@ message DelegateSnapshotResponse { // collected_spans stores trace spans recorded during the execution of this // request. repeated util.tracing.tracingpb.RecordedSpan collected_spans = 3 [(gogoproto.nullable) = false]; + + // msg_app_resp stores an optional MsgAppResp the receiving RawNode may have + // generated in response to applying the snapshot. This message will also have + // been handed to the raft transport, but it is helpful to step it into the + // sender manually to avoid the race described in: + // + // https://github.com/cockroachdb/cockroach/issues/97971 + raftpb.Message msg_app_resp = 4; } // ConfChangeContext is encoded in the raftpb.ConfChange.Context field. diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 7923d84323aa..0db457c010d8 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -1099,6 +1099,10 @@ func (t *RaftTransport) dropFlowTokensForDisconnectedNodes() { // SendSnapshot streams the given outgoing snapshot. The caller is responsible // for closing the OutgoingSnapshot. +// +// The optional (but usually present) returned message is an MsgAppResp that +// results from the follower applying the snapshot, acking the log at the index +// of the snapshot. func (t *RaftTransport) SendSnapshot( ctx context.Context, storePool *storepool.StorePool, @@ -1107,17 +1111,17 @@ func (t *RaftTransport) SendSnapshot( newWriteBatch func() storage.WriteBatch, sent func(), recordBytesSent snapshotRecordMetrics, -) error { +) (*kvserverpb.SnapshotResponse, error) { nodeID := header.RaftMessageRequest.ToReplica.NodeID conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass) if err != nil { - return err + return nil, err } client := NewMultiRaftClient(conn) stream, err := client.RaftSnapshot(ctx) if err != nil { - return err + return nil, err } defer func() { @@ -1132,18 +1136,18 @@ func (t *RaftTransport) SendSnapshot( // and determines if it encountered any errors when sending the snapshot. func (t *RaftTransport) DelegateSnapshot( ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest, -) error { +) (*kvserverpb.DelegateSnapshotResponse, error) { nodeID := req.DelegatedSender.NodeID conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass) if err != nil { - return errors.Mark(err, errMarkSnapshotError) + return nil, errors.Mark(err, errMarkSnapshotError) } client := NewMultiRaftClient(conn) // Creates a rpc stream between the leaseholder and sender. stream, err := client.DelegateRaftSnapshot(ctx) if err != nil { - return errors.Mark(err, errMarkSnapshotError) + return nil, errors.Mark(err, errMarkSnapshotError) } defer func() { if err := stream.CloseSend(); err != nil { @@ -1154,12 +1158,12 @@ func (t *RaftTransport) DelegateSnapshot( // Send the request. wrappedRequest := &kvserverpb.DelegateSnapshotRequest{Value: &kvserverpb.DelegateSnapshotRequest_Send{Send: req}} if err := stream.Send(wrappedRequest); err != nil { - return errors.Mark(err, errMarkSnapshotError) + return nil, errors.Mark(err, errMarkSnapshotError) } // Wait for response to see if the receiver successfully applied the snapshot. resp, err := stream.Recv() if err != nil { - return errors.Mark( + return nil, errors.Mark( errors.Wrapf(err, "%v: remote failed to send snapshot", req), errMarkSnapshotError, ) } @@ -1175,14 +1179,14 @@ func (t *RaftTransport) DelegateSnapshot( switch resp.Status { case kvserverpb.DelegateSnapshotResponse_ERROR: - return errors.Mark( + return nil, errors.Mark( errors.Wrapf(resp.Error(), "error sending couldn't accept %v", req), errMarkSnapshotError) case kvserverpb.DelegateSnapshotResponse_APPLIED: // This is the response we're expecting. Snapshot successfully applied. log.VEventf(ctx, 3, "%s: delegated snapshot was successfully applied", resp) - return nil + return resp, nil default: - return err + return nil, err } } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index c7fd8ccb2fde..155eff5736d2 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2872,7 +2872,23 @@ func (r *Replica) sendSnapshotUsingDelegate( retErr = timeutil.RunWithTimeout( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { // Sending snapshot - return r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest) + resp, err := r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest) + if err != nil { + return err + } + if resp.MsgAppResp != nil { + _ = r.withRaftGroup(func(rn *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) { + msg := *resp.MsgAppResp + // With a delegated snapshot, the recipient received the snapshot + // from another replica and will thus respond to it instead. But the + // message is valid for the actual originator of the send as well. + msg.To = rn.BasicStatus().ID + // We do want to unquiesce here - we wouldn't ever want state transitions + // on a quiesced replica. + return true, rn.Step(*resp.MsgAppResp) + }) + } + return nil }, ) if !selfDelegate { @@ -3053,7 +3069,7 @@ func (r *Replica) followerSendSnapshot( ctx context.Context, recipient roachpb.ReplicaDescriptor, req *kvserverpb.DelegateSendSnapshotRequest, -) error { +) (*raftpb.Message, error) { ctx = r.AnnotateCtx(ctx) sendThreshold := traceSnapshotThreshold.Get(&r.ClusterSettings().SV) if sendThreshold > 0 { @@ -3082,14 +3098,14 @@ func (r *Replica) followerSendSnapshot( // expensive to send. err := r.validateSnapshotDelegationRequest(ctx, req) if err != nil { - return err + return nil, err } // Throttle snapshot sending. Obtain the send semaphore and determine the rate limit. rangeSize := r.GetMVCCStats().Total() cleanup, err := r.store.reserveSendSnapshot(ctx, req, rangeSize) if err != nil { - return errors.Wrap(err, "Unable to reserve space for sending this snapshot") + return nil, errors.Wrap(err, "Unable to reserve space for sending this snapshot") } defer cleanup() @@ -3097,13 +3113,13 @@ func (r *Replica) followerSendSnapshot( // sent after we are doing waiting. err = r.validateSnapshotDelegationRequest(ctx, req) if err != nil { - return err + return nil, err } snapType := req.Type snap, err := r.GetSnapshot(ctx, snapType, req.SnapId) if err != nil { - return errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) + return nil, errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) } defer snap.Close() log.Event(ctx, "generated snapshot") @@ -3174,9 +3190,10 @@ func (r *Replica) followerSendSnapshot( } } - return timeutil.RunWithTimeout( + var msgAppResp *raftpb.Message + if err := timeutil.RunWithTimeout( ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { - return r.store.cfg.Transport.SendSnapshot( + resp, err := r.store.cfg.Transport.SendSnapshot( ctx, r.store.cfg.StorePool, header, @@ -3185,8 +3202,16 @@ func (r *Replica) followerSendSnapshot( sent, recordBytesSent, ) + if err != nil { + return err + } + msgAppResp = resp.MsgAppResp + return nil }, - ) + ); err != nil { + return nil, err + } + return msgAppResp, nil } // replicasCollocated is used in AdminMerge to ensure that the ranges are diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 9cccd955400b..83b85b9bdfcd 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -2441,3 +2441,52 @@ func TestRebalancingAndCrossRegionZoneSnapshotMetrics(t *testing.T) { }) } + +// TestAddVotersWithoutRaftQueue verifies that in normal operations Raft +// snapshots are not required. This test creates a range with a single voter, +// then adds two additional voters. Most of the time this succeeds, however it +// fails (today) occasionally due to the addition of the first voter being +// "incomplete" and therefore the second voter is not able to be added because +// there is no quorum. +// +// Specifically the following sequence of events happens when the leader adds +// the first voter: +// 1. AdminChangeReplicasRequest is processed on n1. +// a) Adds a n2 as a LEARNER to raft. +// b) Sends an initial snapshot to n2. +// c) n2 receives and applies the snapshot. +// d) n2 responds that it successfully applied the snapshot. +// e) n1 receives the response and updates state to Follower. +// 2. Before step c above, n1 sends a MsgApp to n2 +// a) MsgApp - entries up-to and including the conf change. +// b) The MsgApp is received and REJECTED because the term is wrong. +// c) After 1e above, n1 receives the rejection. +// d) n1 updates n2 from StateReplicate to StateProbe and then StateSnapshot. +// +// From n2's perspective, it receives the MsgApp prior to the initial snapshot. +// This results in it responding with a rejected MsgApp. Later it receives the +// snapshot and correctly applies it. However, when n1 sees the rejected MsgApp, +// it moves n2 status to StateProbe and stops sending Raft updates to it as it +// plans to fix it with a Raft Snapshot. As the raft snapshot queue is disabled +// this never happens and the state is stuck as a non-Learner in StateProbe. At +// this point, the Raft group is wedged since it only has 1/2 nodes available +// for Raft consensus. +func TestAddVotersWithoutRaftQueue(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + // Disable the raft snapshot queue to make sure we don't require a raft snapshot. + tc := testcluster.StartTestCluster( + t, 3, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{DisableRaftSnapshotQueue: true}}, + }, + ReplicationMode: base.ReplicationManual, + }, + ) + defer tc.Stopper().Stop(ctx) + + key := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, key, tc.Target(1)) + tc.AddVotersOrFatal(t, key, tc.Target(2)) +} diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index f384fa99f762..dceb3024973b 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -930,6 +930,18 @@ func (r *Replica) handleRaftReadyRaftMuLocked( if err := r.applySnapshot(ctx, inSnap, snap, hs, subsumedRepls); err != nil { return stats, errors.Wrap(err, "while applying snapshot") } + for _, msg := range msgStorageAppend.Responses { + // The caller would like to see the MsgAppResp that usually results from + // applying the snapshot synchronously, so fish it out. + if msg.To == uint64(inSnap.FromReplica.ReplicaID) && + msg.Type == raftpb.MsgAppResp && + !msg.Reject && + msg.Index == snap.Metadata.Index { + + inSnap.msgAppRespCh <- msg + break + } + } stats.tSnapEnd = timeutil.Now() stats.snap.applied = true @@ -1827,9 +1839,7 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID // index it requested is now actually durable on the follower. Note also that // the follower will generate an MsgAppResp reflecting the applied snapshot // which typically moves the follower to StateReplicate when (if) received - // by the leader. - // - // See: https://github.com/cockroachdb/cockroach/issues/87581 + // by the leader, which as of #106793 we do synchronously. if err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) { raftGroup.ReportSnapshot(uint64(to), snapStatus) return true, nil diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index bf7d374a2c9a..4865725f64ea 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -313,7 +313,8 @@ type IncomingSnapshot struct { DataSize int64 snapType kvserverpb.SnapshotRequest_Type placeholder *ReplicaPlaceholder - raftAppliedIndex kvpb.RaftIndex // logging only + raftAppliedIndex kvpb.RaftIndex // logging only + msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied } func (s IncomingSnapshot) String() string { diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 82d2065730c3..0c347ba7e020 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -183,7 +183,8 @@ func (s *Store) HandleDelegatedSnapshot( } // Pass the request to the sender replica. - if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req); err != nil { + msgAppResp, err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req) + if err != nil { // If an error occurred during snapshot sending, send an error response. return &kvserverpb.DelegateSnapshotResponse{ Status: kvserverpb.DelegateSnapshotResponse_ERROR, @@ -195,6 +196,7 @@ func (s *Store) HandleDelegatedSnapshot( return &kvserverpb.DelegateSnapshotResponse{ Status: kvserverpb.DelegateSnapshotResponse_APPLIED, CollectedSpans: sp.GetConfiguredRecording(), + MsgAppResp: msgAppResp, } } @@ -426,8 +428,9 @@ func (s *Store) processRaftRequestWithReplica( // will have been removed. func (s *Store) processRaftSnapshotRequest( ctx context.Context, snapHeader *kvserverpb.SnapshotRequest_Header, inSnap IncomingSnapshot, -) *kvpb.Error { - return s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func( +) (*raftpb.Message, *kvpb.Error) { + var msgAppResp *raftpb.Message + pErr := s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func( ctx context.Context, r *Replica, ) (pErr *kvpb.Error) { ctx = r.AnnotateCtx(ctx) @@ -496,8 +499,24 @@ func (s *Store) processRaftSnapshotRequest( log.Infof(ctx, "ignored stale snapshot at index %d", snapHeader.RaftMessageRequest.Message.Snapshot.Metadata.Index) s.metrics.RangeSnapshotRecvUnusable.Inc(1) } + // If the snapshot was applied and acked with an MsgAppResp, return that + // message up the stack. We're using msgAppRespCh as a shortcut to avoid + // plumbing return parameters through an additional few layers of raft + // handling. + // + // NB: in practice there's always an MsgAppResp here, but it is better not + // to rely on what is essentially discretionary raft behavior. + select { + case msg := <-inSnap.msgAppRespCh: + msgAppResp = &msg + default: + } return nil }) + if pErr != nil { + return nil, pErr + } + return msgAppResp, nil } // HandleRaftResponse implements the IncomingRaftMessageHandler interface. Per diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 930d686f1b78..2c4f268cd04a 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -504,6 +504,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( DataSize: dataSize, snapType: header.Type, raftAppliedIndex: header.State.RaftAppliedIndex, + msgAppRespCh: make(chan raftpb.Message, 1), } timingTag.stop("totalTime") @@ -1140,7 +1141,8 @@ func (s *Store) receiveSnapshot( // already received the entire snapshot here, so there's no point in // abandoning application half-way through if the caller goes away. applyCtx := s.AnnotateCtx(context.Background()) - if pErr := s.processRaftSnapshotRequest(applyCtx, header, inSnap); pErr != nil { + msgAppResp, pErr := s.processRaftSnapshotRequest(applyCtx, header, inSnap) + if pErr != nil { err := pErr.GoError() // We mark this error as a snapshot error which will be interpreted by the // sender as this being a retriable error, see isSnapshotError(). @@ -1151,6 +1153,7 @@ func (s *Store) receiveSnapshot( return stream.Send(&kvserverpb.SnapshotResponse{ Status: kvserverpb.SnapshotResponse_APPLIED, CollectedSpans: tracing.SpanFromContext(ctx).GetConfiguredRecording(), + MsgAppResp: msgAppResp, }) } @@ -1482,7 +1485,7 @@ func SendEmptySnapshot( } }() - return sendSnapshot( + if _, err := sendSnapshot( ctx, st, tracer, @@ -1493,7 +1496,10 @@ func SendEmptySnapshot( eng.NewWriteBatch, func() {}, nil, /* recordBytesSent */ - ) + ); err != nil { + return err + } + return nil } // noopStorePool is a hollowed out StorePool that does not throttle. It's used in recovery scenarios. @@ -1513,7 +1519,7 @@ func sendSnapshot( newWriteBatch func() storage.WriteBatch, sent func(), recordBytesSent snapshotRecordMetrics, -) error { +) (*kvserverpb.SnapshotResponse, error) { if recordBytesSent == nil { // NB: Some tests and an offline tool (ResetQuorum) call into `sendSnapshotUsingDelegate` // with a nil metrics tracking function. We pass in a fake metrics tracking function here that isn't @@ -1526,7 +1532,7 @@ func sendSnapshot( start := timeutil.Now() to := header.RaftMessageRequest.ToReplica if err := stream.Send(&kvserverpb.SnapshotRequest{Header: &header}); err != nil { - return err + return nil, err } log.Event(ctx, "sent SNAPSHOT_REQUEST message to server") // Wait until we get a response from the server. The recipient may queue us @@ -1536,13 +1542,13 @@ func sendSnapshot( resp, err := stream.Recv() if err != nil { storePool.Throttle(storepool.ThrottleFailed, err.Error(), to.StoreID) - return err + return nil, err } switch resp.Status { case kvserverpb.SnapshotResponse_ERROR: sp.ImportRemoteRecording(resp.CollectedSpans) storePool.Throttle(storepool.ThrottleFailed, resp.DeprecatedMessage, to.StoreID) - return errors.Wrapf(maybeHandleDeprecatedSnapErr(resp.Error()), "%s: remote couldn't accept %s", to, snap) + return nil, errors.Wrapf(maybeHandleDeprecatedSnapErr(resp.Error()), "%s: remote couldn't accept %s", to, snap) case kvserverpb.SnapshotResponse_ACCEPTED: // This is the response we're expecting. Continue with snapshot sending. log.Event(ctx, "received SnapshotResponse_ACCEPTED message from server") @@ -1550,7 +1556,7 @@ func sendSnapshot( err := errors.Errorf("%s: server sent an invalid status while negotiating %s: %s", to, snap, resp.Status) storePool.Throttle(storepool.ThrottleFailed, err.Error(), to.StoreID) - return err + return nil, err } durQueued := timeutil.Since(start) @@ -1586,7 +1592,7 @@ func sendSnapshot( // Record timings for snapshot send if kv.trace.snapshot.enable_threshold is enabled numBytesSent, err := ss.Send(ctx, stream, header, snap, recordBytesSent) if err != nil { - return err + return nil, err } durSent := timeutil.Since(start) @@ -1595,7 +1601,7 @@ func sendSnapshot( // applied. sent() if err := stream.Send(&kvserverpb.SnapshotRequest{Final: true}); err != nil { - return err + return nil, err } log.KvDistribution.Infof( ctx, @@ -1612,7 +1618,7 @@ func sendSnapshot( resp, err = stream.Recv() if err != nil { - return errors.Wrapf(err, "%s: remote failed to apply snapshot", to) + return nil, errors.Wrapf(err, "%s: remote failed to apply snapshot", to) } sp.ImportRemoteRecording(resp.CollectedSpans) // NB: wait for EOF which ensures that all processing on the server side has @@ -1620,19 +1626,19 @@ func sendSnapshot( // received). if unexpectedResp, err := stream.Recv(); err != io.EOF { if err != nil { - return errors.Wrapf(err, "%s: expected EOF, got resp=%v with error", to, unexpectedResp) + return nil, errors.Wrapf(err, "%s: expected EOF, got resp=%v with error", to, unexpectedResp) } - return errors.Newf("%s: expected EOF, got resp=%v", to, unexpectedResp) + return nil, errors.Newf("%s: expected EOF, got resp=%v", to, unexpectedResp) } switch resp.Status { case kvserverpb.SnapshotResponse_ERROR: - return errors.Wrapf( + return nil, errors.Wrapf( maybeHandleDeprecatedSnapErr(resp.Error()), "%s: remote failed to apply snapshot", to, ) case kvserverpb.SnapshotResponse_APPLIED: - return nil + return resp, nil default: - return errors.Errorf("%s: server sent an invalid status during finalization: %s", + return nil, errors.Errorf("%s: server sent an invalid status during finalization: %s", to, resp.Status, ) } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 18621ab9a4e3..0810d6c385ee 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -3048,13 +3048,14 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { require.NoError(t, err) } - require.NoError(t, s.processRaftSnapshotRequest(ctx, req, + _, pErr := s.processRaftSnapshotRequest(ctx, req, IncomingSnapshot{ SnapUUID: uuid.MakeV4(), Desc: desc, placeholder: placeholder, }, - ).GoError()) + ) + require.NoError(t, pErr.GoError()) testutils.SucceedsSoon(t, func() error { s.mu.Lock() @@ -3127,7 +3128,7 @@ func TestSendSnapshotThrottling(t *testing.T) { sp := &fakeStorePool{} expectedErr := errors.New("") c := fakeSnapshotStream{nil, expectedErr} - err := sendSnapshot( + _, err := sendSnapshot( ctx, st, tr, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */ ) if sp.failedThrottles != 1 { @@ -3146,7 +3147,7 @@ func TestSendSnapshotThrottling(t *testing.T) { EncodedError: errors.EncodeError(ctx, errors.New("boom")), } c := fakeSnapshotStream{resp, nil} - err := sendSnapshot( + _, err := sendSnapshot( ctx, st, tr, c, sp, header, nil /* snap */, newBatch, nil /* sent */, nil, /* recordBytesSent */ ) if sp.failedThrottles != 1 {