diff --git a/Gopkg.lock b/Gopkg.lock index a814715fdc71..d421371c1723 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1339,14 +1339,14 @@ [[projects]] branch = "master" - digest = "1:2750995dfc0b02de879216bf6cab6e57c494df4048e58543fc3798bf6aefe44d" + digest = "1:b66436e3c460ee4a9fab4f7dc473c36c51d3263e04838756378e72cc90058b6c" name = "go.etcd.io/etcd" packages = [ "raft", "raft/raftpb", ] pruneopts = "UT" - revision = "f32bc507658e287a69d89e0e1a4d083a01d9da3e" + revision = "dac8c6fcc05ba42a8032d5b720f6c1704965c269" [[projects]] digest = "1:f163a34487229f36dfdb298191d8e17c0e3e6a899aa2cddb020f2ac61ca364ab" diff --git a/pkg/base/config.go b/pkg/base/config.go index 24af703d1cc7..0c79f28b028f 100644 --- a/pkg/base/config.go +++ b/pkg/base/config.go @@ -90,8 +90,28 @@ const ( DefaultTableDescriptorLeaseRenewalTimeout = time.Minute ) -var defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt( - "COCKROACH_RAFT_ELECTION_TIMEOUT_TICKS", 15) +var ( + // defaultRaftElectionTimeoutTicks specifies the number of Raft Tick + // invocations that must pass between elections. + defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt( + "COCKROACH_RAFT_ELECTION_TIMEOUT_TICKS", 15) + + // defaultRaftLogTruncationThreshold specifies the upper bound that a single + // Range's Raft log can grow to before log truncations are triggered, even + // if that means a snapshot will be required for a straggling follower. + defaultRaftLogTruncationThreshold = envutil.EnvOrDefaultInt64( + "COCKROACH_RAFT_LOG_TRUNCATION_THRESHOLD", 4<<20 /* 4 MB */) + + // defaultRaftMaxSizePerMsg specifies the maximum number of Raft log entries + // that a leader will send to followers in a single MsgApp. + defaultRaftMaxSizePerMsg = envutil.EnvOrDefaultInt( + "COCKROACH_RAFT_MAX_SIZE_PER_MSG", 16<<10 /* 16 KB */) + + // defaultRaftMaxSizePerMsg specifies how many "inflight" messages a leader + // will send to a follower without hearing a response. + defaultRaftMaxInflightMsgs = envutil.EnvOrDefaultInt( + "COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 64) +) type lazyHTTPClient struct { once sync.Once @@ -421,6 +441,41 @@ type RaftConfig struct { // RangeLeaseRaftElectionTimeoutMultiplier specifies what multiple the leader // lease active duration should be of the raft election timeout. RangeLeaseRaftElectionTimeoutMultiplier float64 + + // RaftLogTruncationThreshold controls how large a single Range's Raft log + // can grow. When a Range's Raft log grows above this size, the Range will + // begin performing log truncations. + RaftLogTruncationThreshold int64 + + // RaftProposalQuota controls the maximum aggregate size of Raft commands + // that a leader is allowed to propose concurrently. + // + // By default, the quota is set to a fraction of the Raft log truncation + // threshold. In doing so, we ensure all replicas have sufficiently up to + // date logs so that when the log gets truncated, the followers do not need + // non-preemptive snapshots. Changing this deserves care. Too low and + // everything comes to a grinding halt, too high and we're not really + // throttling anything (we'll still generate snapshots). + RaftProposalQuota int64 + + // RaftMaxUncommittedEntriesSize controls how large the uncommitted tail of + // the Raft log can grow. The limit is meant to provide protection against + // unbounded Raft log growth when quorum is lost and entries stop being + // committed but continue to be proposed. + RaftMaxUncommittedEntriesSize uint64 + + // RaftMaxSizePerMsg controls how many Raft log entries the leader will send to + // followers in a single MsgApp. + RaftMaxSizePerMsg uint64 + + // RaftMaxInflightMsgs controls how many "inflight" messages Raft will send + // to a follower without hearing a response. The total number of Raft log + // entries is a combination of this setting and RaftMaxSizePerMsg. The + // current default settings provide for up to 1 MB of raft log to be sent + // without acknowledgement. With an average entry size of 1 KB that + // translates to ~1024 commands that might be executed in the handling of a + // single raft.Ready operation. + RaftMaxInflightMsgs int } // SetDefaults initializes unset fields. @@ -434,6 +489,27 @@ func (cfg *RaftConfig) SetDefaults() { if cfg.RangeLeaseRaftElectionTimeoutMultiplier == 0 { cfg.RangeLeaseRaftElectionTimeoutMultiplier = defaultRangeLeaseRaftElectionTimeoutMultiplier } + if cfg.RaftLogTruncationThreshold == 0 { + cfg.RaftLogTruncationThreshold = defaultRaftLogTruncationThreshold + } + if cfg.RaftProposalQuota == 0 { + // By default, set this to a fraction of RaftLogMaxSize. See the comment + // on the field for the tradeoffs of setting this higher or lower. + cfg.RaftProposalQuota = cfg.RaftLogTruncationThreshold / 4 + } + if cfg.RaftMaxUncommittedEntriesSize == 0 { + // By default, set this to twice the RaftProposalQuota. The logic here + // is that the quotaPool should be responsible for throttling proposals + // in all cases except for unbounded Raft re-proposals because it queues + // efficiently instead of dropping proposals on the floor indiscriminately. + cfg.RaftMaxUncommittedEntriesSize = uint64(2 * cfg.RaftProposalQuota) + } + if cfg.RaftMaxSizePerMsg == 0 { + cfg.RaftMaxSizePerMsg = uint64(defaultRaftMaxSizePerMsg) + } + if cfg.RaftMaxInflightMsgs == 0 { + cfg.RaftMaxInflightMsgs = defaultRaftMaxInflightMsgs + } } // RaftElectionTimeout returns the raft election timeout, as computed from the diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index fb0ad9b08c98..fce86f5e7beb 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -1155,6 +1155,8 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { sc.RaftTickInterval = 10 * time.Millisecond // Don't timeout raft leader. We don't want leadership moving. sc.RaftElectionTimeoutTicks = 1000000 + // Reduce the max uncommitted entry size. + sc.RaftMaxUncommittedEntriesSize = 64 << 10 // 64 KB // Disable leader transfers during leaseholder changes so that we // can easily create leader-not-leaseholder scenarios. sc.TestingKnobs.DisableLeaderFollowsLeaseholder = true @@ -1233,7 +1235,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { // While a majority nodes are down, write some data. putRes := make(chan *roachpb.Error) go func() { - putArgs := putArgs([]byte("b"), make([]byte, 8<<10 /* 8 KB */)) + putArgs := putArgs([]byte("b"), make([]byte, sc.RaftMaxUncommittedEntriesSize/8)) _, err := client.SendWrapped(context.Background(), propNode, putArgs) putRes <- err }() @@ -1254,11 +1256,10 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { } // Check raft log size. - const logSizeLimit = 64 << 10 // 64 KB curlogSize := leaderRepl.GetRaftLogSize() logSize := curlogSize - initLogSize logSizeStr := humanizeutil.IBytes(logSize) - if logSize > logSizeLimit { + if uint64(logSize) > sc.RaftMaxUncommittedEntriesSize { t.Fatalf("raft log size grew to %s", logSizeStr) } t.Logf("raft log size grew to %s", logSizeStr) diff --git a/pkg/storage/raft_log_queue.go b/pkg/storage/raft_log_queue.go index 2178dde8065b..4a361fbdadd6 100644 --- a/pkg/storage/raft_log_queue.go +++ b/pkg/storage/raft_log_queue.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -49,9 +48,6 @@ const ( raftLogQueueConcurrency = 4 ) -// raftLogMaxSize limits the maximum size of the Raft log. -var raftLogMaxSize = envutil.EnvOrDefaultInt64("COCKROACH_RAFT_LOG_MAX_SIZE", 4<<20 /* 4 MB */) - // raftLogQueue manages a queue of replicas slated to have their raft logs // truncated by removing unneeded entries. type raftLogQueue struct { @@ -118,8 +114,8 @@ func getTruncatableIndexes(ctx context.Context, r *Replica) (uint64, uint64, int if targetSize > *r.mu.zone.RangeMaxBytes { targetSize = *r.mu.zone.RangeMaxBytes } - if targetSize > raftLogMaxSize { - targetSize = raftLogMaxSize + if targetSize > r.store.cfg.RaftLogTruncationThreshold { + targetSize = r.store.cfg.RaftLogTruncationThreshold } firstIndex, err := r.raftFirstIndexLocked() pendingSnapshotIndex := r.mu.pendingSnapshotIndex diff --git a/pkg/storage/raft_transport.go b/pkg/storage/raft_transport.go index 96d7f6573c30..8d858e47b5b5 100644 --- a/pkg/storage/raft_transport.go +++ b/pkg/storage/raft_transport.go @@ -29,6 +29,7 @@ import ( "go.etcd.io/etcd/raft/raftpb" "google.golang.org/grpc" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -615,6 +616,7 @@ func (t *RaftTransport) startProcessNewQueue( // for closing the OutgoingSnapshot. func (t *RaftTransport) SendSnapshot( ctx context.Context, + raftCfg *base.RaftConfig, storePool *StorePool, header SnapshotRequest_Header, snap *OutgoingSnapshot, @@ -640,5 +642,5 @@ func (t *RaftTransport) SendSnapshot( log.Warningf(ctx, "failed to close snapshot stream: %s", err) } }() - return sendSnapshot(ctx, t.st, stream, storePool, header, snap, newBatch, sent) + return sendSnapshot(ctx, raftCfg, t.st, stream, storePool, header, snap, newBatch, sent) } diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index b0a2acac555d..a49aa8a5b14f 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -93,19 +93,8 @@ const ( defaultReplicaRaftMuWarnThreshold = 500 * time.Millisecond ) -var raftLogTooLargeSize = 4 * raftLogMaxSize - var testingDisableQuiescence = envutil.EnvOrDefaultBool("COCKROACH_DISABLE_QUIESCENCE", false) -// TODO(irfansharif, peter): What's a good default? Too low and everything comes -// to a grinding halt, too high and we're not really throttling anything -// (we'll still generate snapshots). Should it be adjusted dynamically? -// -// We set the defaultProposalQuota to be less than raftLogMaxSize, in doing so -// we ensure all replicas have sufficiently up to date logs so that when the -// log gets truncated, the followers do not need non-preemptive snapshots. -var defaultProposalQuota = raftLogMaxSize / 4 - var syncRaftLog = settings.RegisterBoolSetting( "kv.raft_log.synchronize", "set to true to synchronize on Raft log writes to persistent storage ('false' risks data loss)", @@ -390,7 +379,7 @@ type Replica struct { minLeaseProposedTS hlc.Timestamp // A pointer to the zone config for this replica. zone *config.ZoneConfig - // localProposals stores the Raft in-flight commands which originated at + // proposals stores the Raft in-flight commands which originated at // this Replica, i.e. all commands for which propose has been called, // but which have not yet applied. // @@ -398,12 +387,7 @@ type Replica struct { // map must only be referenced while Replica.mu is held, except if the // element is removed from the map first. The notable exception is the // contained RaftCommand, which we treat as immutable. - localProposals map[storagebase.CmdIDKey]*ProposalData - // remoteProposals is maintained by Raft leaders and stores in-flight - // commands that were forwarded to the leader during its current term. - // The set allows leaders to detect duplicate forwarded commands and - // avoid re-proposing the same forwarded command multiple times. - remoteProposals map[storagebase.CmdIDKey]struct{} + proposals map[storagebase.CmdIDKey]*ProposalData internalRaftGroup *raft.RawNode // The ID of the replica within the Raft group. May be 0 if the replica has // been created from a preemptive snapshot (i.e. before being added to the @@ -719,7 +703,7 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked( r.cmdQMu.queues[spanset.SpanLocal] = NewCommandQueue(false /* optimizeOverlap */) r.cmdQMu.Unlock() - r.mu.localProposals = map[storagebase.CmdIDKey]*ProposalData{} + r.mu.proposals = map[storagebase.CmdIDKey]*ProposalData{} r.mu.checksums = map[uuid.UUID]ReplicaChecksum{} // Clear the internal raft group in case we're being reset. Since we're // reloading the raft state below, it isn't safe to use the existing raft @@ -890,11 +874,10 @@ func (r *Replica) cancelPendingCommandsLocked() { Err: roachpb.NewError(roachpb.NewAmbiguousResultError("removing replica")), ProposalRetry: proposalRangeNoLongerExists, } - for _, p := range r.mu.localProposals { + for _, p := range r.mu.proposals { r.cleanupFailedProposalLocked(p) p.finishApplication(pr) } - r.mu.remoteProposals = nil } // cleanupFailedProposalLocked cleans up after a proposal that has failed. It @@ -902,7 +885,7 @@ func (r *Replica) cancelPendingCommandsLocked() { func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) { // Clear the proposal from the proposals map. May be a no-op if the // proposal has not yet been inserted into the map. - delete(r.mu.localProposals, p.idKey) + delete(r.mu.proposals, p.idKey) // Release associated quota pool resources if we have been tracking // this command. // @@ -1134,7 +1117,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( // through the code paths where we acquire quota from the pool. To // offset this we reset the quota pool whenever leadership changes // hands. - r.mu.proposalQuota = newQuotaPool(defaultProposalQuota) + r.mu.proposalQuota = newQuotaPool(r.store.cfg.RaftProposalQuota) r.mu.lastUpdateTimes = make(map[roachpb.ReplicaID]time.Time) r.mu.commandSizes = make(map[storagebase.CmdIDKey]int) } else if r.mu.proposalQuota != nil { @@ -1913,8 +1896,7 @@ func (r *Replica) State() storagepb.RangeInfo { var ri storagepb.RangeInfo ri.ReplicaState = *(protoutil.Clone(&r.mu.state)).(*storagepb.ReplicaState) ri.LastIndex = r.mu.lastIndex - ri.NumPending = uint64(len(r.mu.localProposals)) - ri.NumRemotePending = uint64(len(r.mu.remoteProposals)) + ri.NumPending = uint64(len(r.mu.proposals)) ri.RaftLogSize = r.mu.raftLogSize ri.NumDropped = uint64(r.mu.droppedMessages) if r.mu.proposalQuota != nil { @@ -3602,11 +3584,11 @@ func (r *Replica) insertProposalLocked( proposal.idKey, proposal.command.MaxLeaseIndex) } - if _, ok := r.mu.localProposals[proposal.idKey]; ok { + if _, ok := r.mu.proposals[proposal.idKey]; ok { ctx := r.AnnotateCtx(context.TODO()) log.Fatalf(ctx, "pending command already exists for %s", proposal.idKey) } - r.mu.localProposals[proposal.idKey] = proposal + r.mu.proposals[proposal.idKey] = proposal if isLease { // For lease requests, we return zero because no real MaxLeaseIndex is assigned. // We could also return the lastAssignedIndex but this invites confusion. @@ -3830,7 +3812,7 @@ func (r *Replica) propose( } // Must not use `proposal` in the closure below as a proposal which is not - // present in r.mu.localProposals is no longer protected by the mutex. Abandoning + // present in r.mu.proposals is no longer protected by the mutex. Abandoning // a command only abandons the associated context. As soon as we propose a // command to Raft, ownership passes to the "below Raft" machinery. In // particular, endCmds will be invoked when the command is applied. There are @@ -3839,7 +3821,7 @@ func (r *Replica) propose( // range. tryAbandon := func() bool { r.mu.Lock() - p, ok := r.mu.localProposals[idKey] + p, ok := r.mu.proposals[idKey] if ok { // TODO(radu): Should this context be created via tracer.ForkCtxSpan? // We'd need to make sure the span is finished eventually. @@ -3851,7 +3833,7 @@ func (r *Replica) propose( return proposalCh, tryAbandon, maxLeaseIndex, nil } -// submitProposalLocked proposes or re-proposes a command in r.mu.localProposals. +// submitProposalLocked proposes or re-proposes a command in r.mu.proposals. // The replica lock must be held. func (r *Replica) submitProposalLocked(p *ProposalData) error { p.proposedAtTicks = r.mu.ticks @@ -3966,9 +3948,9 @@ func (r *Replica) quiesce() bool { func (r *Replica) quiesceLocked() bool { ctx := r.AnnotateCtx(context.TODO()) - if len(r.mu.localProposals) != 0 { + if len(r.mu.proposals) != 0 { if log.V(3) { - log.Infof(ctx, "not quiescing: %d pending commands", len(r.mu.localProposals)) + log.Infof(ctx, "not quiescing: %d pending commands", len(r.mu.proposals)) } return false } @@ -4043,20 +4025,7 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error { // we expect the originator to campaign instead. r.unquiesceWithOptionsLocked(false /* campaignOnWake */) r.refreshLastUpdateTimeForReplicaLocked(req.FromReplica.ReplicaID) - - // Check if the message is a proposal that should be dropped. - if r.shouldDropForwardedProposalLocked(req) { - // If we could signal to the sender that its proposal was accepted - // or dropped then we wouldn't need to track anything. - return false /* unquiesceAndWakeLeader */, nil - } - err := raftGroup.Step(req.Message) - if err == nil { - // If we stepped successfully and the request is a proposal, consider - // tracking it so that we can ignore identical proposals in the future. - r.maybeTrackForwardedProposalLocked(raftGroup, req) - } if err == raft.ErrProposalDropped { // A proposal was forwarded to this replica but we couldn't propose it. // Swallow the error since we don't have an effective way of signaling @@ -4069,68 +4038,6 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error { }) } -func (r *Replica) shouldDropForwardedProposalLocked(req *RaftMessageRequest) bool { - if req.Message.Type != raftpb.MsgProp { - // Not a proposal. - return false - } - - for _, e := range req.Message.Entries { - switch e.Type { - case raftpb.EntryNormal: - cmdID, _ := DecodeRaftCommand(e.Data) - if _, ok := r.mu.remoteProposals[cmdID]; !ok { - // Untracked remote proposal. Don't drop. - return false - } - case raftpb.EntryConfChange: - // Never drop EntryConfChange proposals. - return false - default: - log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e) - } - } - // All entries tracked. - return true -} - -func (r *Replica) maybeTrackForwardedProposalLocked(rg *raft.RawNode, req *RaftMessageRequest) { - if req.Message.Type != raftpb.MsgProp { - // Not a proposal. - return - } - - if rg.Status().RaftState != raft.StateLeader { - // We're not the leader. We can't be sure that the proposal made it into - // the Raft log, so don't track it. - return - } - - // Record that each of the proposal's entries was seen and appended. This - // allows us to catch duplicate forwarded proposals in the future and - // prevent them from being repeatedly appended to a leader's raft log. - for _, e := range req.Message.Entries { - switch e.Type { - case raftpb.EntryNormal: - cmdID, data := DecodeRaftCommand(e.Data) - if len(data) == 0 { - // An empty command is proposed to unquiesce a range and - // wake the leader. Don't keep track of these forwarded - // proposals because they will never be cleaned up. - } else { - if r.mu.remoteProposals == nil { - r.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{} - } - r.mu.remoteProposals[cmdID] = struct{}{} - } - case raftpb.EntryConfChange: - // Don't track EntryConfChanges. - default: - log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e) - } - } -} - type handleRaftReadyStats struct { processed int } @@ -4395,7 +4302,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.mu.leaderID = leaderID // Clear the remote proposal set. Would have been nil already if not // previously the leader. - r.mu.remoteProposals = nil becameLeader = r.mu.leaderID == r.mu.replicaID } r.mu.Unlock() @@ -4600,22 +4506,13 @@ func (r *Replica) tick(livenessMap IsLiveMap) (bool, error) { if knob := r.store.TestingKnobs().RefreshReasonTicksPeriod; knob > 0 { refreshAtDelta = knob } - if !r.store.TestingKnobs().DisableRefreshReasonTicks && - r.mu.replicaID != r.mu.leaderID && - r.mu.ticks%refreshAtDelta == 0 { + if !r.store.TestingKnobs().DisableRefreshReasonTicks && r.mu.ticks%refreshAtDelta == 0 { // RaftElectionTimeoutTicks is a reasonable approximation of how long we // should wait before deciding that our previous proposal didn't go // through. Note that the combination of the above condition and passing // RaftElectionTimeoutTicks to refreshProposalsLocked means that commands // will be refreshed when they have been pending for 1 to 2 election // cycles. - // - // However, we don't refresh proposals if we are the leader because - // doing so would be useless. The commands tracked by a leader replica - // were either all proposed when the replica was a leader or were - // re-proposed when the replica became a leader. Either way, they are - // guaranteed to be in the leader's Raft log so re-proposing won't do - // anything. r.refreshProposalsLocked(refreshAtDelta, reasonTicks) } return true, nil @@ -4680,7 +4577,7 @@ func (r *Replica) tick(livenessMap IsLiveMap) (bool, error) { // correctness issues. func (r *Replica) maybeQuiesceLocked(livenessMap IsLiveMap) bool { ctx := r.AnnotateCtx(context.TODO()) - status, ok := shouldReplicaQuiesce(ctx, r, r.store.Clock().Now(), len(r.mu.localProposals), livenessMap) + status, ok := shouldReplicaQuiesce(ctx, r, r.store.Clock().Now(), len(r.mu.proposals), livenessMap) if !ok { return false } @@ -4931,7 +4828,7 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR numShouldRetry := 0 var reproposals pendingCmdSlice - for _, p := range r.mu.localProposals { + for _, p := range r.mu.proposals { if p.command.MaxLeaseIndex == 0 { // Commands without a MaxLeaseIndex cannot be reproposed, as they might // apply twice. We also don't want to ask the proposer to retry these @@ -4946,7 +4843,7 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR } else if cannotApplyAnyMore := !p.command.ReplicatedEvalResult.IsLeaseRequest && p.command.MaxLeaseIndex <= r.mu.state.LeaseAppliedIndex; cannotApplyAnyMore { // The command's designated lease index slot was filled up. We got to - // LeaseAppliedIndex and p is still pending in r.mu.localProposals; generally + // LeaseAppliedIndex and p is still pending in r.mu.proposals; generally // this means that proposal p didn't commit, and it will be sent back to // the proposer for a retry - the request needs to be re-evaluated and the // command re-proposed with a new MaxLeaseIndex. Note that this branch is not @@ -4957,7 +4854,7 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR // reasonSnapshotApplied - in that case we don't know if p or some other // command filled the p.command.MaxLeaseIndex slot (i.e. p might have been // applied, but we're not watching for every proposal when applying a - // snapshot, so nobody removed p from r.mu.localProposals). In this + // snapshot, so nobody removed p from r.mu.proposals). In this // ambiguous case, we'll also send the command back to the proposer for a // retry, but the proposer needs to be aware that, if the retry fails, an // AmbiguousResultError needs to be returned to the higher layers. @@ -5018,7 +4915,7 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR // that they can make it in the right place. Reproposing in order is // definitely required, however. // - // TODO(tschottdorf): evaluate whether `r.mu.localProposals` should + // TODO(tschottdorf): evaluate whether `r.mu.proposals` should // be a list/slice. sort.Sort(reproposals) for _, p := range reproposals { @@ -5398,19 +5295,16 @@ func (r *Replica) processRaftCommand( } r.mu.Lock() - proposal, proposedLocally := r.mu.localProposals[idKey] + proposal, proposedLocally := r.mu.proposals[idKey] // TODO(tschottdorf): consider the Trace situation here. if proposedLocally { // We initiated this command, so use the caller-supplied context. ctx = proposal.ctx proposal.ctx = nil // avoid confusion - delete(r.mu.localProposals, idKey) + delete(r.mu.proposals, idKey) } - // Delete the entry for a forwarded proposal set. - delete(r.mu.remoteProposals, idKey) - leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally) r.mu.Unlock() @@ -6967,6 +6861,7 @@ func (r *Replica) Metrics( return calcReplicaMetrics( ctx, now, + &r.store.cfg.RaftConfig, zone, livenessMap, availableNodes, @@ -6994,6 +6889,7 @@ func HasRaftLeader(raftStatus *raft.Status) bool { func calcReplicaMetrics( ctx context.Context, now hlc.Timestamp, + raftCfg *base.RaftConfig, zone *config.ZoneConfig, livenessMap IsLiveMap, availableNodes int, @@ -7033,7 +6929,8 @@ func calcReplicaMetrics( m.CmdQMetricsLocal = cmdQMetricsLocal m.CmdQMetricsGlobal = cmdQMetricsGlobal - m.RaftLogTooLarge = raftLogSize > raftLogTooLargeSize + const raftLogTooLargeMultiple = 4 + m.RaftLogTooLarge = raftLogSize > (raftLogTooLargeMultiple * raftCfg.RaftLogTruncationThreshold) return m } diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 75467eb00c50..9809313a62d9 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -944,7 +944,14 @@ func (r *Replica) sendSnapshot( r.store.metrics.RangeSnapshotsGenerated.Inc(1) } if err := r.store.cfg.Transport.SendSnapshot( - ctx, r.store.allocator.storePool, req, snap, r.store.Engine().NewBatch, sent); err != nil { + ctx, + &r.store.cfg.RaftConfig, + r.store.allocator.storePool, + req, + snap, + r.store.Engine().NewBatch, + sent, + ); err != nil { return &snapshotError{err} } return nil diff --git a/pkg/storage/replica_sideload.go b/pkg/storage/replica_sideload.go index d06913ea963d..43383ab4ca12 100644 --- a/pkg/storage/replica_sideload.go +++ b/pkg/storage/replica_sideload.go @@ -71,7 +71,7 @@ func (r *Replica) maybeSideloadEntriesRaftMuLocked( maybeRaftCommand := func(cmdID storagebase.CmdIDKey) (storagepb.RaftCommand, bool) { r.mu.Lock() defer r.mu.Unlock() - cmd, ok := r.mu.localProposals[cmdID] + cmd, ok := r.mu.proposals[cmdID] if ok { return *cmd.command, true } diff --git a/pkg/storage/replica_sideload_test.go b/pkg/storage/replica_sideload_test.go index c8d06846f0f8..b31d5bc9483c 100644 --- a/pkg/storage/replica_sideload_test.go +++ b/pkg/storage/replica_sideload_test.go @@ -783,6 +783,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) { mockSender := &mockSender{} if err := sendSnapshot( ctx, + &tc.store.cfg.RaftConfig, tc.store.cfg.Settings, mockSender, &fakeStorePool{}, @@ -904,6 +905,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) { mockSender := &mockSender{} err = sendSnapshot( ctx, + &tc.store.cfg.RaftConfig, tc.store.cfg.Settings, mockSender, &fakeStorePool{}, diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index cdaa28e771b7..3a7fb0a108ee 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7698,7 +7698,7 @@ func TestReplicaTryAbandon(t *testing.T) { func() { tc.repl.mu.Lock() defer tc.repl.mu.Unlock() - if len(tc.repl.mu.localProposals) == 0 { + if len(tc.repl.mu.proposals) == 0 { t.Fatal("expected non-empty proposals map") } }() @@ -8254,7 +8254,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { } tc.repl.mu.Lock() - for _, p := range tc.repl.mu.localProposals { + for _, p := range tc.repl.mu.proposals { if v := p.ctx.Value(magicKey{}); v != nil { origIndexes = append(origIndexes, int(p.command.MaxLeaseIndex)) } @@ -8286,13 +8286,13 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { tc.repl.mu.Lock() defer tc.repl.mu.Unlock() - nonePending := len(tc.repl.mu.localProposals) == 0 + nonePending := len(tc.repl.mu.proposals) == 0 c := int(tc.repl.mu.lastAssignedLeaseIndex) - int(tc.repl.mu.state.LeaseAppliedIndex) if nonePending && c > 0 { t.Errorf("no pending cmds, but have required index offset %d", c) } if !nonePending { - t.Fatalf("still pending commands: %+v", tc.repl.mu.localProposals) + t.Fatalf("still pending commands: %+v", tc.repl.mu.proposals) } } @@ -8450,7 +8450,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { } // Build the map of expected reproposals at this stage. m := map[storagebase.CmdIDKey]int{} - for id, p := range r.mu.localProposals { + for id, p := range r.mu.proposals { m[id] = p.proposedAtTicks } r.mu.Unlock() @@ -8489,151 +8489,6 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { } } -func TestReplicaShouldDropForwardedProposal(t *testing.T) { - defer leaktest.AfterTest(t)() - - cmdSeen, cmdNotSeen := makeIDKey(), makeIDKey() - data, noData := []byte("data"), []byte("") - - testCases := []struct { - name string - leader bool - msg raftpb.Message - expDrop bool - expRemotePropsAfter int - }{ - { - name: "new proposal", - leader: true, - msg: raftpb.Message{ - Type: raftpb.MsgProp, - Entries: []raftpb.Entry{ - {Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdNotSeen, data)}, - }, - }, - expDrop: false, - expRemotePropsAfter: 2, - }, - { - name: "duplicate proposal", - leader: true, - msg: raftpb.Message{ - Type: raftpb.MsgProp, - Entries: []raftpb.Entry{ - {Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdSeen, data)}, - }, - }, - expDrop: true, - expRemotePropsAfter: 1, - }, - { - name: "partially new proposal", - leader: true, - msg: raftpb.Message{ - Type: raftpb.MsgProp, - Entries: []raftpb.Entry{ - {Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdNotSeen, data)}, - {Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdSeen, data)}, - }, - }, - expDrop: false, - expRemotePropsAfter: 2, - }, - { - name: "empty proposal", - leader: true, - msg: raftpb.Message{ - Type: raftpb.MsgProp, - Entries: []raftpb.Entry{ - {Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdNotSeen, noData)}, - }, - }, - expDrop: false, - expRemotePropsAfter: 1, - }, - { - name: "conf change", - leader: true, - msg: raftpb.Message{ - Type: raftpb.MsgProp, - Entries: []raftpb.Entry{ - {Type: raftpb.EntryConfChange, Data: encodeRaftCommandV1(cmdNotSeen, data)}, - }, - }, - expDrop: false, - expRemotePropsAfter: 1, - }, - { - name: "non proposal", - leader: true, - msg: raftpb.Message{ - Type: raftpb.MsgApp, - }, - expDrop: false, - expRemotePropsAfter: 1, - }, - { - name: "not leader", - leader: false, - msg: raftpb.Message{ - Type: raftpb.MsgProp, - Entries: []raftpb.Entry{ - {Type: raftpb.EntryNormal, Data: encodeRaftCommandV1(cmdNotSeen, data)}, - }, - }, - expDrop: false, - expRemotePropsAfter: 0, - }, - } - for _, c := range testCases { - t.Run(c.name, func(t *testing.T) { - var tc testContext - stopper := stop.NewStopper() - defer stopper.Stop(context.TODO()) - tc.Start(t, stopper) - tc.repl.mu.Lock() - defer tc.repl.mu.Unlock() - - rg := tc.repl.mu.internalRaftGroup - if c.leader { - // Set the remoteProposals map to only contain cmdSeen. - tc.repl.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{ - cmdSeen: {}, - } - // Make sure the replica is the leader. - if s := rg.Status(); s.RaftState != raft.StateLeader { - t.Errorf("Replica not leader: %v", s) - } - } else { - // Clear the remoteProposals map. - tc.repl.mu.remoteProposals = nil - // Force the replica to step down as the leader by sending it a - // heartbeat at a high term. - if err := rg.Step(raftpb.Message{ - Type: raftpb.MsgHeartbeat, - Term: 999, - }); err != nil { - t.Error(err) - } - if s := rg.Status(); s.RaftState != raft.StateFollower { - t.Errorf("Replica not follower: %v", s) - } - } - - req := &RaftMessageRequest{Message: c.msg} - drop := tc.repl.shouldDropForwardedProposalLocked(req) - if c.expDrop != drop { - t.Errorf("expected drop=%t, found %t", c.expDrop, drop) - } - - tc.repl.maybeTrackForwardedProposalLocked(rg, req) - if l := len(tc.repl.mu.remoteProposals); c.expRemotePropsAfter != l { - t.Errorf("expected %d tracked remote proposals, found %d", c.expRemotePropsAfter, l) - } - }) - } -} - // checkValue asserts that the value for a key is the expected one. // The function will attempt to resolve the intent present on the key, if any. func checkValue(ctx context.Context, tc *testContext, key []byte, expectedVal []byte) error { @@ -9272,7 +9127,7 @@ func TestReplicaMetrics(t *testing.T) { Underreplicated: false, }}, // The leader of a 1-replica range is up and raft log is too large. - {1, 1, desc(1), status(1, progress(2)), live(1), 5 * raftLogMaxSize, + {1, 1, desc(1), status(1, progress(2)), live(1), 5 * cfg.RaftLogTruncationThreshold, ReplicaMetrics{ Leader: true, RangeCounter: true, @@ -9294,7 +9149,7 @@ func TestReplicaMetrics(t *testing.T) { c.expected.Quiescent = i%2 == 0 c.expected.Ticking = !c.expected.Quiescent metrics := calcReplicaMetrics( - context.Background(), hlc.Timestamp{}, &zoneConfig, + context.Background(), hlc.Timestamp{}, &cfg.RaftConfig, &zoneConfig, c.liveness, 0, &c.desc, c.raftStatus, storagepb.LeaseStatus{}, c.storeID, c.expected.Quiescent, c.expected.Ticking, CommandQueueMetrics{}, CommandQueueMetrics{}, c.raftLogSize) diff --git a/pkg/storage/storagepb/state.pb.go b/pkg/storage/storagepb/state.pb.go index 9b54b101a2df..b33d3885397b 100644 --- a/pkg/storage/storagepb/state.pb.go +++ b/pkg/storage/storagepb/state.pb.go @@ -84,10 +84,9 @@ func (*ReplicaState) Descriptor() ([]byte, []int) { return fileDescriptorState, type RangeInfo struct { ReplicaState `protobuf:"bytes,1,opt,name=state,embedded=state" json:"state"` // The highest (and last) index in the Raft log. - LastIndex uint64 `protobuf:"varint,2,opt,name=last_index,json=lastIndex,proto3" json:"last_index,omitempty"` - NumPending uint64 `protobuf:"varint,3,opt,name=num_pending,json=numPending,proto3" json:"num_pending,omitempty"` - NumRemotePending uint64 `protobuf:"varint,9,opt,name=num_remote_pending,json=numRemotePending,proto3" json:"num_remote_pending,omitempty"` - NumDropped uint64 `protobuf:"varint,5,opt,name=num_dropped,json=numDropped,proto3" json:"num_dropped,omitempty"` + LastIndex uint64 `protobuf:"varint,2,opt,name=last_index,json=lastIndex,proto3" json:"last_index,omitempty"` + NumPending uint64 `protobuf:"varint,3,opt,name=num_pending,json=numPending,proto3" json:"num_pending,omitempty"` + NumDropped uint64 `protobuf:"varint,5,opt,name=num_dropped,json=numDropped,proto3" json:"num_dropped,omitempty"` // raft_log_size may be initially inaccurate after a server restart. // See storage.Replica.mu.raftLogSize. RaftLogSize int64 `protobuf:"varint,6,opt,name=raft_log_size,json=raftLogSize,proto3" json:"raft_log_size,omitempty"` @@ -216,9 +215,6 @@ func (this *RangeInfo) Equal(that interface{}) bool { if this.NumPending != that1.NumPending { return false } - if this.NumRemotePending != that1.NumRemotePending { - return false - } if this.NumDropped != that1.NumDropped { return false } @@ -384,11 +380,6 @@ func (m *RangeInfo) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintState(dAtA, i, uint64(m.RangeMaxBytes)) } - if m.NumRemotePending != 0 { - dAtA[i] = 0x48 - i++ - i = encodeVarintState(dAtA, i, uint64(m.NumRemotePending)) - } return i, nil } @@ -620,9 +611,6 @@ func (m *RangeInfo) Size() (n int) { if m.RangeMaxBytes != 0 { n += 1 + sovState(uint64(m.RangeMaxBytes)) } - if m.NumRemotePending != 0 { - n += 1 + sovState(uint64(m.NumRemotePending)) - } return n } @@ -1173,25 +1161,6 @@ func (m *RangeInfo) Unmarshal(dAtA []byte) error { break } } - case 9: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field NumRemotePending", wireType) - } - m.NumRemotePending = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowState - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.NumRemotePending |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } default: iNdEx = preIndex skippy, err := skipState(dAtA[iNdEx:]) @@ -1864,64 +1833,64 @@ var ( func init() { proto.RegisterFile("storage/storagepb/state.proto", fileDescriptorState) } var fileDescriptorState = []byte{ - // 944 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x55, 0x3f, 0x6f, 0xdb, 0x46, - 0x1c, 0x35, 0x45, 0xc9, 0x96, 0x8e, 0x49, 0xac, 0x5e, 0x93, 0x86, 0x50, 0x60, 0x49, 0x50, 0xd1, - 0x42, 0x05, 0x52, 0x0a, 0x70, 0xff, 0xa0, 0x30, 0xba, 0x44, 0x4a, 0x11, 0x38, 0x71, 0x0a, 0xe7, - 0x64, 0x74, 0xe8, 0x42, 0x9c, 0xc8, 0x33, 0x45, 0xf8, 0x78, 0x77, 0x39, 0x1e, 0x03, 0xc9, 0x1f, - 0xa0, 0x73, 0x3f, 0x42, 0xf7, 0x7e, 0x86, 0xee, 0x1e, 0x33, 0x76, 0x12, 0x5a, 0x75, 0xc9, 0xdc, - 0xb5, 0x4b, 0x71, 0x47, 0x52, 0x96, 0x5c, 0x23, 0x0d, 0x3c, 0x64, 0x12, 0xf9, 0x7e, 0xef, 0xf7, - 0xef, 0xdd, 0x3b, 0x0a, 0xec, 0xa5, 0x8a, 0x4b, 0x1c, 0x91, 0x41, 0xf1, 0x2b, 0x26, 0x83, 0x54, - 0x61, 0x45, 0x3c, 0x21, 0xb9, 0xe2, 0xf0, 0x41, 0xc0, 0x83, 0x33, 0xc9, 0x71, 0x30, 0xf5, 0x0a, - 0x82, 0xb7, 0x22, 0xb6, 0x7a, 0x65, 0x2e, 0x61, 0x51, 0xcc, 0xca, 0x1f, 0x31, 0x19, 0x24, 0xaf, - 0x82, 0x20, 0x2f, 0xd0, 0x7a, 0x60, 0x92, 0xc5, 0x64, 0x10, 0x33, 0x45, 0x24, 0xc3, 0xd4, 0x97, - 0xf8, 0x54, 0x15, 0xc1, 0x8f, 0xca, 0x60, 0x42, 0x14, 0x0e, 0xb1, 0xc2, 0x05, 0x0e, 0x4b, 0x7c, - 0x0d, 0x73, 0x33, 0x15, 0xd3, 0xc1, 0x94, 0x06, 0x03, 0x15, 0x27, 0x24, 0x55, 0x38, 0x11, 0x45, - 0xe4, 0x6e, 0xc4, 0x23, 0x6e, 0x1e, 0x07, 0xfa, 0x29, 0x47, 0x7b, 0xbf, 0xd6, 0xc0, 0x2d, 0x44, - 0x04, 0x8d, 0x03, 0x3c, 0xd6, 0x0b, 0xc1, 0x87, 0x00, 0xea, 0xd6, 0x3e, 0x16, 0x82, 0xc6, 0x24, - 0xf4, 0x63, 0x16, 0x92, 0x99, 0x6b, 0x75, 0xad, 0x7e, 0x15, 0x35, 0x75, 0xe4, 0x51, 0x1e, 0x38, - 0xd4, 0x38, 0xf4, 0xc0, 0x87, 0x94, 0xe0, 0x94, 0x5c, 0xa1, 0x57, 0x0c, 0xfd, 0x03, 0x13, 0xda, - 0xe0, 0x7f, 0x0d, 0xaa, 0x21, 0x49, 0x03, 0xd7, 0xee, 0x5a, 0x7d, 0x67, 0xbf, 0xe7, 0x5d, 0xea, - 0x56, 0xec, 0xe2, 0x21, 0xcc, 0x22, 0xf2, 0x98, 0xa4, 0x81, 0x8c, 0x85, 0xe2, 0x12, 0x19, 0x3e, - 0xf4, 0x40, 0xcd, 0x14, 0x73, 0xab, 0x26, 0xd1, 0xbd, 0x26, 0xf1, 0x48, 0xc7, 0x51, 0x4e, 0x83, - 0xdf, 0x83, 0x5d, 0x25, 0x33, 0x16, 0x60, 0x45, 0x42, 0xdf, 0x9c, 0x94, 0x5b, 0x33, 0x99, 0x9f, - 0x5c, 0xdb, 0xf2, 0x54, 0x9d, 0x94, 0x6c, 0xa3, 0x02, 0xba, 0xa3, 0x36, 0xde, 0x21, 0x07, 0xb7, - 0xa2, 0xc0, 0x57, 0x53, 0x49, 0xd2, 0x29, 0xa7, 0xa1, 0xbb, 0x6d, 0x8a, 0xed, 0xad, 0x15, 0xd3, - 0xba, 0x7b, 0x53, 0x1a, 0x78, 0x27, 0xa5, 0xee, 0xc3, 0xcf, 0x97, 0x8b, 0x8e, 0xf3, 0x64, 0x74, - 0x52, 0x66, 0xfd, 0xbd, 0xe8, 0xb4, 0x56, 0x09, 0xe1, 0xe4, 0xa0, 0x27, 0x31, 0x0b, 0x59, 0x46, - 0x29, 0x9e, 0x50, 0xd2, 0x43, 0x4e, 0x14, 0xac, 0xa8, 0x70, 0x08, 0x6a, 0x7a, 0xec, 0xd4, 0xdd, - 0x31, 0x9d, 0x1e, 0x7a, 0xff, 0x75, 0x58, 0xee, 0x23, 0xaf, 0xb4, 0x93, 0xf7, 0xfc, 0x87, 0xd1, - 0x48, 0x4f, 0x9b, 0xa2, 0x3c, 0x15, 0xfe, 0x64, 0x81, 0x7b, 0x6a, 0xc6, 0xfc, 0x54, 0x60, 0xe6, - 0x6f, 0x8c, 0xdf, 0x78, 0x97, 0xf1, 0xbf, 0x5c, 0x2e, 0x3a, 0xf0, 0x64, 0xc6, 0xc6, 0x02, 0xb3, - 0x77, 0xdf, 0x02, 0xaa, 0x22, 0x63, 0x6d, 0x99, 0xaf, 0xc0, 0xfd, 0x2c, 0x8d, 0x59, 0xb4, 0x72, - 0x89, 0x39, 0x11, 0xff, 0x8c, 0xcc, 0x5d, 0xa7, 0x6b, 0xf5, 0xeb, 0xe8, 0xae, 0x09, 0x17, 0x4e, - 0x31, 0x8a, 0x3f, 0x23, 0xf3, 0x83, 0xea, 0x9b, 0x5f, 0x3a, 0xd6, 0xd3, 0x6a, 0xbd, 0xde, 0x6c, - 0x3c, 0xad, 0xd6, 0x41, 0xd3, 0xe9, 0xfd, 0x53, 0x01, 0x0d, 0x63, 0x90, 0x43, 0x76, 0xca, 0xe1, - 0x61, 0xae, 0x11, 0x31, 0xee, 0x74, 0xf6, 0x3f, 0xf3, 0xde, 0x72, 0x0b, 0xbd, 0x75, 0x93, 0x0f, - 0xeb, 0x17, 0x8b, 0xce, 0xd6, 0xeb, 0x45, 0xc7, 0xca, 0xa5, 0x22, 0x70, 0x0f, 0x00, 0x8a, 0x53, - 0xb5, 0x61, 0xdf, 0x86, 0x46, 0x72, 0xdb, 0x76, 0x80, 0xc3, 0xb2, 0xc4, 0x17, 0x84, 0x85, 0x31, - 0x8b, 0x8c, 0x7b, 0xab, 0x08, 0xb0, 0x2c, 0x39, 0xce, 0x91, 0x92, 0x10, 0x4a, 0x2e, 0x04, 0x09, - 0x8d, 0xd7, 0x72, 0xc2, 0xe3, 0x1c, 0x81, 0x3d, 0x70, 0xdb, 0x5c, 0x2b, 0xca, 0x23, 0x3f, 0x8d, - 0xcf, 0x89, 0x71, 0x90, 0x8d, 0x1c, 0x0d, 0x1e, 0xf1, 0x68, 0x1c, 0x9f, 0x13, 0xf8, 0x2d, 0x68, - 0x61, 0x21, 0x24, 0x9f, 0xc5, 0x89, 0x96, 0x47, 0x48, 0x2e, 0x78, 0x8a, 0xa9, 0xff, 0x32, 0xe3, - 0x0a, 0x1b, 0x23, 0xd8, 0xc8, 0x5d, 0x63, 0x1c, 0x17, 0x84, 0x17, 0x3a, 0x0e, 0x3f, 0x05, 0xbb, - 0x52, 0x4b, 0xe3, 0x27, 0x78, 0xe6, 0x4f, 0xe6, 0x8a, 0xa4, 0x6e, 0xdd, 0xa4, 0xdc, 0x36, 0xf0, - 0x73, 0x3c, 0x1b, 0x6a, 0x50, 0x5f, 0x70, 0x3d, 0xaa, 0x24, 0x09, 0xd7, 0x4d, 0x8a, 0x95, 0x1a, - 0xf9, 0x05, 0x67, 0x59, 0x82, 0x4c, 0xa0, 0x58, 0x6c, 0x75, 0x06, 0xd5, 0x66, 0xad, 0xf7, 0xa6, - 0x06, 0xee, 0x8d, 0x78, 0x92, 0x60, 0x16, 0xbe, 0xc8, 0x48, 0x46, 0xd2, 0x31, 0xc3, 0x22, 0x9d, - 0x72, 0x05, 0x1f, 0x81, 0xc6, 0xea, 0x73, 0x53, 0x9c, 0xc6, 0xff, 0x98, 0xab, 0xaa, 0x4f, 0x00, - 0x5d, 0x66, 0xc1, 0x29, 0x00, 0x94, 0x07, 0x98, 0x8e, 0x03, 0x2e, 0x88, 0x5b, 0xe9, 0xda, 0x7d, - 0x67, 0x7f, 0xf8, 0xd6, 0x13, 0xbd, 0x76, 0x14, 0xef, 0x68, 0x55, 0xe4, 0x3b, 0xa6, 0xe4, 0xbc, - 0x68, 0xb4, 0x56, 0x1b, 0x9e, 0x01, 0x27, 0xa2, 0x7c, 0x52, 0xb6, 0xb2, 0x4d, 0xab, 0xd1, 0x0d, - 0x5a, 0x3d, 0xb9, 0xac, 0xb2, 0xde, 0x6b, 0xbd, 0x7a, 0xeb, 0x37, 0x0b, 0xec, 0x14, 0xd9, 0xf0, - 0x0e, 0xa8, 0xc4, 0xa1, 0x91, 0xc7, 0x46, 0x95, 0x38, 0x84, 0x4d, 0x60, 0xeb, 0x2b, 0xa0, 0xdd, - 0xd6, 0x40, 0xfa, 0x11, 0xde, 0x07, 0x3b, 0x84, 0x85, 0xe6, 0x62, 0xd8, 0x06, 0xdd, 0x26, 0x2c, - 0x7c, 0x46, 0xe6, 0xb0, 0x05, 0xea, 0x92, 0xe0, 0x90, 0x33, 0x3a, 0x37, 0x9f, 0xc0, 0x3a, 0x5a, - 0xbd, 0x6f, 0x8a, 0x5f, 0xbb, 0x91, 0xf8, 0x2e, 0xd8, 0x11, 0x92, 0x48, 0xf2, 0x32, 0x75, 0xb7, - 0xbb, 0x76, 0xdf, 0x46, 0xe5, 0x6b, 0x6b, 0x0e, 0x76, 0xaf, 0x28, 0x5a, 0x8e, 0x9d, 0xef, 0x61, - 0xc6, 0x3e, 0x06, 0xb5, 0x57, 0x98, 0x66, 0xc4, 0xac, 0xe2, 0xec, 0x1f, 0xdc, 0x40, 0xcb, 0x02, - 0x45, 0x79, 0xa1, 0x83, 0xca, 0x37, 0x56, 0xeb, 0x1c, 0x34, 0xaf, 0x2a, 0xfc, 0xbe, 0x7a, 0x0f, - 0x3f, 0xbe, 0xf8, 0xb3, 0xbd, 0x75, 0xb1, 0x6c, 0x5b, 0xaf, 0x97, 0x6d, 0xeb, 0xf7, 0x65, 0xdb, - 0xfa, 0x63, 0xd9, 0xb6, 0x7e, 0xfe, 0xab, 0xbd, 0xf5, 0x63, 0x63, 0x55, 0x6d, 0xb2, 0x6d, 0xfe, - 0x42, 0xbf, 0xf8, 0x37, 0x00, 0x00, 0xff, 0xff, 0xf9, 0xc1, 0x7f, 0xb8, 0x1d, 0x08, 0x00, 0x00, + // 931 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x55, 0xcd, 0x6e, 0xdb, 0x46, + 0x17, 0x35, 0x45, 0xca, 0x96, 0x86, 0x49, 0xac, 0x6f, 0xbe, 0xa4, 0x21, 0x14, 0x58, 0x12, 0x54, + 0xb4, 0x50, 0x81, 0x94, 0x02, 0xdc, 0x1f, 0x14, 0x46, 0x37, 0x91, 0x52, 0x04, 0x76, 0x9c, 0xc2, + 0xa1, 0x8c, 0x2e, 0xba, 0x21, 0x46, 0xe4, 0x98, 0x22, 0x3c, 0x9c, 0x99, 0x70, 0x86, 0x81, 0xe4, + 0x07, 0xe8, 0xba, 0x8f, 0xd0, 0x7d, 0x9f, 0xa1, 0x7b, 0x2f, 0x03, 0x74, 0xd3, 0x95, 0xd0, 0xaa, + 0x9b, 0xac, 0xfb, 0x04, 0xc5, 0x0c, 0x7f, 0x2c, 0xb9, 0x46, 0x1a, 0x78, 0xd1, 0x95, 0xc8, 0x73, + 0xcf, 0xb9, 0xf7, 0xce, 0xe1, 0x21, 0x05, 0xf6, 0x84, 0x64, 0x29, 0x8a, 0xf0, 0xb0, 0xf8, 0xe5, + 0xd3, 0xa1, 0x90, 0x48, 0x62, 0x97, 0xa7, 0x4c, 0x32, 0xf8, 0x28, 0x60, 0xc1, 0x79, 0xca, 0x50, + 0x30, 0x73, 0x0b, 0x82, 0x5b, 0x11, 0xdb, 0xfd, 0x52, 0x8b, 0x69, 0x14, 0xd3, 0xf2, 0x87, 0x4f, + 0x87, 0xc9, 0xeb, 0x20, 0xc8, 0x1b, 0xb4, 0x1f, 0x69, 0x31, 0x9f, 0x0e, 0x63, 0x2a, 0x71, 0x4a, + 0x11, 0xf1, 0x53, 0x74, 0x26, 0x8b, 0xe2, 0x07, 0x65, 0x31, 0xc1, 0x12, 0x85, 0x48, 0xa2, 0x02, + 0x87, 0x25, 0xbe, 0x86, 0x39, 0x99, 0x8c, 0xc9, 0x70, 0x46, 0x82, 0xa1, 0x8c, 0x13, 0x2c, 0x24, + 0x4a, 0x78, 0x51, 0xb9, 0x1f, 0xb1, 0x88, 0xe9, 0xcb, 0xa1, 0xba, 0xca, 0xd1, 0xfe, 0xcf, 0x75, + 0x70, 0xc7, 0xc3, 0x9c, 0xc4, 0x01, 0x9a, 0xa8, 0x03, 0xc1, 0xc7, 0x00, 0xaa, 0xd1, 0x3e, 0xe2, + 0x9c, 0xc4, 0x38, 0xf4, 0x63, 0x1a, 0xe2, 0xb9, 0x63, 0xf4, 0x8c, 0x81, 0xe5, 0xb5, 0x54, 0xe5, + 0x49, 0x5e, 0x38, 0x54, 0x38, 0x74, 0xc1, 0xff, 0x09, 0x46, 0x02, 0x5f, 0xa3, 0xd7, 0x34, 0xfd, + 0x7f, 0xba, 0xb4, 0xc1, 0xff, 0x12, 0x58, 0x21, 0x16, 0x81, 0x63, 0xf6, 0x8c, 0x81, 0xbd, 0xdf, + 0x77, 0xaf, 0x7c, 0x2b, 0xce, 0xe2, 0x7a, 0x88, 0x46, 0xf8, 0x29, 0x16, 0x41, 0x1a, 0x73, 0xc9, + 0x52, 0x4f, 0xf3, 0xa1, 0x0b, 0xea, 0xba, 0x99, 0x63, 0x69, 0xa1, 0x73, 0x83, 0xf0, 0x58, 0xd5, + 0xbd, 0x9c, 0x06, 0xbf, 0x05, 0xbb, 0x32, 0xcd, 0x68, 0x80, 0x24, 0x0e, 0x7d, 0xfd, 0xa4, 0x9c, + 0xba, 0x56, 0x7e, 0x74, 0xe3, 0xc8, 0x33, 0x79, 0x5a, 0xb2, 0xb5, 0x0b, 0xde, 0x3d, 0xb9, 0x71, + 0x0f, 0x19, 0xb8, 0x13, 0x05, 0xbe, 0x9c, 0xa5, 0x58, 0xcc, 0x18, 0x09, 0x9d, 0x6d, 0xdd, 0x6c, + 0x6f, 0xad, 0x99, 0xf2, 0xdd, 0x9d, 0x91, 0xc0, 0x3d, 0x2d, 0x7d, 0x1f, 0x7d, 0xba, 0x5a, 0x76, + 0xed, 0x67, 0xe3, 0xd3, 0x52, 0xf5, 0xd7, 0xb2, 0xdb, 0xae, 0x04, 0xe1, 0xf4, 0xa0, 0x9f, 0x22, + 0x1a, 0xd2, 0x8c, 0x10, 0x34, 0x25, 0xb8, 0xef, 0xd9, 0x51, 0x50, 0x51, 0xe1, 0x08, 0xd4, 0xd5, + 0xda, 0xc2, 0xd9, 0xd1, 0x93, 0x1e, 0xbb, 0xff, 0x4c, 0x58, 0x9e, 0x23, 0xb7, 0x8c, 0x93, 0xfb, + 0xe2, 0xbb, 0xf1, 0x58, 0x6d, 0x2b, 0xbc, 0x5c, 0x0a, 0x7f, 0x30, 0xc0, 0x03, 0x39, 0xa7, 0xbe, + 0xe0, 0x88, 0xfa, 0x1b, 0xeb, 0x37, 0xdf, 0x67, 0xfd, 0xcf, 0x57, 0xcb, 0x2e, 0x3c, 0x9d, 0xd3, + 0x09, 0x47, 0xf4, 0xfd, 0x4f, 0x01, 0x65, 0xa1, 0x58, 0x3b, 0xcc, 0x17, 0xe0, 0x61, 0x26, 0x62, + 0x1a, 0x55, 0x29, 0xd1, 0x4f, 0xc4, 0x3f, 0xc7, 0x0b, 0xc7, 0xee, 0x19, 0x83, 0x86, 0x77, 0x5f, + 0x97, 0x8b, 0xa4, 0x68, 0xc7, 0x9f, 0xe3, 0xc5, 0x81, 0xf5, 0xf6, 0xa7, 0xae, 0x71, 0x64, 0x35, + 0x1a, 0xad, 0xe6, 0x91, 0xd5, 0x00, 0x2d, 0xbb, 0xff, 0x6b, 0x0d, 0x34, 0x75, 0x40, 0x0e, 0xe9, + 0x19, 0x83, 0x87, 0xb9, 0x47, 0x58, 0xa7, 0xd3, 0xde, 0xff, 0xc4, 0x7d, 0xc7, 0x5b, 0xe8, 0xae, + 0x87, 0x7c, 0xd4, 0xb8, 0x5c, 0x76, 0xb7, 0xde, 0x2c, 0xbb, 0x46, 0x6e, 0x15, 0x86, 0x7b, 0x00, + 0x10, 0x24, 0xe4, 0x46, 0x7c, 0x9b, 0x0a, 0xc9, 0x63, 0xdb, 0x05, 0x36, 0xcd, 0x12, 0x9f, 0x63, + 0x1a, 0xc6, 0x34, 0xd2, 0xe9, 0xb5, 0x3c, 0x40, 0xb3, 0xe4, 0x24, 0x47, 0x4a, 0x42, 0x98, 0x32, + 0xce, 0x71, 0xa8, 0xb3, 0x96, 0x13, 0x9e, 0xe6, 0x08, 0xec, 0x83, 0xbb, 0xfa, 0xb5, 0x22, 0x2c, + 0xf2, 0x45, 0x7c, 0x81, 0x75, 0x82, 0x4c, 0xcf, 0x56, 0xe0, 0x31, 0x8b, 0x26, 0xf1, 0x05, 0x86, + 0x5f, 0x83, 0x36, 0xe2, 0x3c, 0x65, 0xf3, 0x38, 0x51, 0xf6, 0xf0, 0x94, 0x71, 0x26, 0x10, 0xf1, + 0x5f, 0x65, 0x4c, 0x22, 0x1d, 0x04, 0xd3, 0x73, 0xd6, 0x18, 0x27, 0x05, 0xe1, 0xa5, 0xaa, 0xc3, + 0x8f, 0xc1, 0x6e, 0xaa, 0xac, 0xf1, 0x13, 0x34, 0xf7, 0xa7, 0x0b, 0x89, 0x85, 0xd3, 0xd0, 0x92, + 0xbb, 0x1a, 0x7e, 0x81, 0xe6, 0x23, 0x05, 0x56, 0xae, 0x5a, 0xad, 0xfa, 0x91, 0xd5, 0x68, 0xb6, + 0x40, 0xff, 0x6d, 0x1d, 0x3c, 0x18, 0xb3, 0x24, 0x41, 0x34, 0x7c, 0x99, 0xe1, 0x0c, 0x8b, 0x09, + 0x45, 0x5c, 0xcc, 0x98, 0x84, 0x4f, 0x40, 0xb3, 0xfa, 0x8c, 0x14, 0x2e, 0xff, 0x4b, 0x68, 0x2c, + 0xe5, 0xac, 0x77, 0xa5, 0x82, 0x33, 0x00, 0x08, 0x0b, 0x10, 0x99, 0x04, 0x8c, 0x63, 0xa7, 0xd6, + 0x33, 0x07, 0xf6, 0xfe, 0xe8, 0x9d, 0x4f, 0xea, 0xc6, 0x55, 0xdc, 0xe3, 0xaa, 0xc9, 0x37, 0x54, + 0xa6, 0x8b, 0x62, 0xd0, 0x5a, 0x6f, 0x78, 0x0e, 0xec, 0x88, 0xb0, 0x69, 0x39, 0xca, 0xd4, 0xa3, + 0xc6, 0xb7, 0x18, 0xf5, 0xec, 0xaa, 0xcb, 0xfa, 0xac, 0xf5, 0xee, 0xed, 0x5f, 0x0c, 0xb0, 0x53, + 0xa8, 0xe1, 0x3d, 0x50, 0x8b, 0x43, 0x6d, 0x8f, 0xe9, 0xd5, 0xe2, 0x10, 0xb6, 0x80, 0xa9, 0xa2, + 0xad, 0x52, 0xd4, 0xf4, 0xd4, 0x25, 0x7c, 0x08, 0x76, 0x30, 0x0d, 0x75, 0xe0, 0x4d, 0x8d, 0x6e, + 0x63, 0x1a, 0x3e, 0xc7, 0x0b, 0xd8, 0x06, 0x8d, 0x14, 0xa3, 0x90, 0x51, 0xb2, 0xd0, 0x9f, 0xb6, + 0x86, 0x57, 0xdd, 0x6f, 0x9a, 0x5f, 0xbf, 0x95, 0xf9, 0x0e, 0xd8, 0xe1, 0x29, 0x4e, 0xf1, 0x2b, + 0xe1, 0x6c, 0xf7, 0xcc, 0x81, 0xe9, 0x95, 0xb7, 0xed, 0x05, 0xd8, 0xbd, 0xe6, 0x68, 0xb9, 0x76, + 0x7e, 0x0e, 0xbd, 0xf6, 0x09, 0xa8, 0xbf, 0x46, 0x24, 0xc3, 0xfa, 0x28, 0xf6, 0xfe, 0xc1, 0x2d, + 0xbc, 0x2c, 0x50, 0x2f, 0x6f, 0x74, 0x50, 0xfb, 0xca, 0x68, 0x5f, 0x80, 0xd6, 0x75, 0x87, 0xff, + 0xab, 0xd9, 0xa3, 0x0f, 0x2f, 0xff, 0xe8, 0x6c, 0x5d, 0xae, 0x3a, 0xc6, 0x9b, 0x55, 0xc7, 0xf8, + 0x6d, 0xd5, 0x31, 0x7e, 0x5f, 0x75, 0x8c, 0x1f, 0xff, 0xec, 0x6c, 0x7d, 0xdf, 0xac, 0xba, 0x4d, + 0xb7, 0xf5, 0x5f, 0xe3, 0x67, 0x7f, 0x07, 0x00, 0x00, 0xff, 0xff, 0x24, 0x81, 0x28, 0xbf, 0xf5, + 0x07, 0x00, 0x00, } diff --git a/pkg/storage/storagepb/state.proto b/pkg/storage/storagepb/state.proto index 3d74ecac6332..326c1270195d 100644 --- a/pkg/storage/storagepb/state.proto +++ b/pkg/storage/storagepb/state.proto @@ -90,7 +90,6 @@ message RangeInfo { // The highest (and last) index in the Raft log. uint64 last_index = 2; uint64 num_pending = 3; - uint64 num_remote_pending = 9; reserved 4; // previously last verification timestamp for verify queue. uint64 num_dropped = 5; // raft_log_size may be initially inaccurate after a server restart. @@ -100,6 +99,7 @@ message RangeInfo { int64 approximate_proposal_quota = 7; // The max size the range can grow to before it will be split. int64 range_max_bytes = 8; + reserved 9; } // CommandQueueSnapshot is a snapshot of the command queue graph for rendering diff --git a/pkg/storage/store.go b/pkg/storage/store.go index ef9ebdeba1a3..b15472a0be18 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -163,35 +163,21 @@ func TestStoreConfig(clock *hlc.Clock) StoreConfig { return sc } -var ( - raftMaxSizePerMsg = envutil.EnvOrDefaultInt("COCKROACH_RAFT_MAX_SIZE_PER_MSG", 16*1024) - raftMaxInflightMsgs = envutil.EnvOrDefaultInt("COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 64) -) - func newRaftConfig( strg raft.Storage, id uint64, appliedIndex uint64, storeCfg StoreConfig, logger raft.Logger, ) *raft.Config { return &raft.Config{ - ID: id, - Applied: appliedIndex, - ElectionTick: storeCfg.RaftElectionTimeoutTicks, - HeartbeatTick: storeCfg.RaftHeartbeatIntervalTicks, - Storage: strg, - Logger: logger, + ID: id, + Applied: appliedIndex, + ElectionTick: storeCfg.RaftElectionTimeoutTicks, + HeartbeatTick: storeCfg.RaftHeartbeatIntervalTicks, + MaxUncommittedEntriesSize: storeCfg.RaftMaxUncommittedEntriesSize, + MaxSizePerMsg: storeCfg.RaftMaxSizePerMsg, + MaxInflightMsgs: storeCfg.RaftMaxInflightMsgs, + Storage: strg, + Logger: logger, PreVote: true, - - // MaxSizePerMsg controls how many Raft log entries the leader will send to - // followers in a single MsgApp. - MaxSizePerMsg: uint64(raftMaxSizePerMsg), - // MaxInflightMsgs controls how many "inflight" messages Raft will send to - // a follower without hearing a response. The total number of Raft log - // entries is a combination of this setting and MaxSizePerMsg. The current - // settings provide for up to 1 MB of raft log to be sent without - // acknowledgement. With an average entry size of 1 KB that translates to - // ~1024 commands that might be executed in the handling of a single - // raft.Ready operation. - MaxInflightMsgs: raftMaxInflightMsgs, } } diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index 26d076d81135..a48e58ffbac4 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -24,6 +24,7 @@ import ( "go.etcd.io/etcd/raft/raftpb" "golang.org/x/time/rate" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -98,7 +99,8 @@ func assertStrategy( // kvBatchSnapshotStrategy is an implementation of snapshotStrategy that streams // batches of KV pairs in the BatchRepr format. type kvBatchSnapshotStrategy struct { - status string + raftCfg *base.RaftConfig + status string // Fields used when sending snapshots. batchSize int64 @@ -228,7 +230,8 @@ func (kvSS *kvBatchSnapshotStrategy) Send( if err == nil { logEntries = append(logEntries, bytes) raftLogBytes += int64(len(bytes)) - if snap.snapType == snapTypePreemptive && raftLogBytes > 4*raftLogMaxSize { + if snap.snapType == snapTypePreemptive && + raftLogBytes > 4*kvSS.raftCfg.RaftLogTruncationThreshold { // If the raft log is too large, abort the snapshot instead of // potentially running out of memory. However, if this is a // raft-initiated snapshot (instead of a preemptive one), we @@ -492,7 +495,9 @@ func (s *Store) receiveSnapshot( var ss snapshotStrategy switch header.Strategy { case SnapshotRequest_KV_BATCH: - ss = &kvBatchSnapshotStrategy{} + ss = &kvBatchSnapshotStrategy{ + raftCfg: &s.cfg.RaftConfig, + } default: return sendSnapshotError(stream, errors.Errorf("%s,r%d: unknown snapshot strategy: %s", @@ -568,6 +573,7 @@ func (e *errMustRetrySnapshotDueToTruncation) Error() string { // sendSnapshot sends an outgoing snapshot via a pre-opened GRPC stream. func sendSnapshot( ctx context.Context, + raftCfg *base.RaftConfig, st *cluster.Settings, stream outgoingSnapshotStream, storePool SnapshotStorePool, @@ -635,6 +641,7 @@ func sendSnapshot( switch header.Strategy { case SnapshotRequest_KV_BATCH: ss = &kvBatchSnapshotStrategy{ + raftCfg: raftCfg, batchSize: batchSize, limiter: limiter, newBatch: newBatch, diff --git a/pkg/storage/store_snapshot_test.go b/pkg/storage/store_snapshot_test.go index c6a951349385..afaf11f667f6 100644 --- a/pkg/storage/store_snapshot_test.go +++ b/pkg/storage/store_snapshot_test.go @@ -45,7 +45,7 @@ func TestSnapshotRaftLogLimit(t *testing.T) { var bytesWritten int64 blob := []byte(strings.Repeat("a", 1024*1024)) - for i := 0; bytesWritten < 5*raftLogMaxSize; i++ { + for i := 0; bytesWritten < 5*store.cfg.RaftLogTruncationThreshold; i++ { pArgs := putArgs(roachpb.Key("a"), blob) _, pErr := client.SendWrappedWith(ctx, store, roachpb.Header{RangeID: 1}, &pArgs) if pErr != nil { @@ -65,6 +65,7 @@ func TestSnapshotRaftLogLimit(t *testing.T) { defer snap.Close() ss := kvBatchSnapshotStrategy{ + raftCfg: &store.cfg.RaftConfig, limiter: rate.NewLimiter(1<<10, 1), newBatch: eng.NewBatch, } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index ae3d5b046b13..81718e42f597 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -3051,6 +3051,8 @@ func TestSendSnapshotThrottling(t *testing.T) { defer e.Close() ctx := context.Background() + var cfg base.RaftConfig + cfg.SetDefaults() st := cluster.MakeTestingClusterSettings() header := SnapshotRequest_Header{ @@ -3066,7 +3068,7 @@ func TestSendSnapshotThrottling(t *testing.T) { sp := &fakeStorePool{} expectedErr := errors.New("") c := fakeSnapshotStream{nil, expectedErr} - err := sendSnapshot(ctx, st, c, sp, header, nil, newBatch, nil) + err := sendSnapshot(ctx, &cfg, st, c, sp, header, nil, newBatch, nil) if sp.failedThrottles != 1 { t.Fatalf("expected 1 failed throttle, but found %d", sp.failedThrottles) } @@ -3082,7 +3084,7 @@ func TestSendSnapshotThrottling(t *testing.T) { Status: SnapshotResponse_DECLINED, } c := fakeSnapshotStream{resp, nil} - err := sendSnapshot(ctx, st, c, sp, header, nil, newBatch, nil) + err := sendSnapshot(ctx, &cfg, st, c, sp, header, nil, newBatch, nil) if sp.declinedThrottles != 1 { t.Fatalf("expected 1 declined throttle, but found %d", sp.declinedThrottles) } @@ -3099,7 +3101,7 @@ func TestSendSnapshotThrottling(t *testing.T) { Status: SnapshotResponse_DECLINED, } c := fakeSnapshotStream{resp, nil} - err := sendSnapshot(ctx, st, c, sp, header, nil, newBatch, nil) + err := sendSnapshot(ctx, &cfg, st, c, sp, header, nil, newBatch, nil) if sp.failedThrottles != 1 { t.Fatalf("expected 1 failed throttle, but found %d", sp.failedThrottles) } @@ -3115,7 +3117,7 @@ func TestSendSnapshotThrottling(t *testing.T) { Status: SnapshotResponse_ERROR, } c := fakeSnapshotStream{resp, nil} - err := sendSnapshot(ctx, st, c, sp, header, nil, newBatch, nil) + err := sendSnapshot(ctx, &cfg, st, c, sp, header, nil, newBatch, nil) if sp.failedThrottles != 1 { t.Fatalf("expected 1 failed throttle, but found %d", sp.failedThrottles) } diff --git a/pkg/ui/src/views/reports/containers/range/rangeTable.tsx b/pkg/ui/src/views/reports/containers/range/rangeTable.tsx index 185d9aa949a4..7f205e38c6f2 100644 --- a/pkg/ui/src/views/reports/containers/range/rangeTable.tsx +++ b/pkg/ui/src/views/reports/containers/range/rangeTable.tsx @@ -74,8 +74,7 @@ const rangeTableDisplayList: RangeTableRow[] = [ { variable: "leaseHolderQPS", display: "Lease Holder QPS", compareToLeader: false }, { variable: "keysWrittenPS", display: "Average Keys Written Per Second", compareToLeader: false }, { variable: "approxProposalQuota", display: "Approx Proposal Quota", compareToLeader: false }, - { variable: "pendingCommands", display: "Pending Local Commands", compareToLeader: false }, - { variable: "remoteCommands", display: "Pending Remote Commands", compareToLeader: false }, + { variable: "pendingCommands", display: "Pending Commands", compareToLeader: false }, { variable: "droppedCommands", display: "Dropped Commands", compareToLeader: false }, { variable: "truncatedIndex", display: "Truncated Index", compareToLeader: true }, { variable: "truncatedTerm", display: "Truncated Term", compareToLeader: true }, @@ -506,7 +505,6 @@ export default class RangeTable extends React.Component { keysWrittenPS: this.createContent(info.stats.writes_per_second.toFixed(4)), approxProposalQuota: raftLeader ? this.createContent(FixLong(info.state.approximate_proposal_quota)) : rangeTableEmptyContent, pendingCommands: this.createContent(FixLong(info.state.num_pending)), - remoteCommands: raftLeader ? this.createContent(FixLong(info.state.num_remote_pending)) : rangeTableEmptyContent, droppedCommands: this.createContent( FixLong(info.state.num_dropped), FixLong(info.state.num_dropped).greaterThan(0) ? "range-table__cell--warning" : "", diff --git a/vendor b/vendor index 9ea837c41565..ca4276c48c28 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit 9ea837c41565a41f28f23ab543725666fcf6758e +Subproject commit ca4276c48c28ef08550bc0014dd021a0081e9278