Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: simplify and track entire set of gossiped IOThresholds #85739

Merged
merged 1 commit into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ type Replica struct {
// Followers to which replication traffic is currently dropped.
//
// Never mutated in place (always replaced wholesale), so can be leaked
// outside of the surrounding mutex.
// outside the surrounding mutex.
pausedFollowers map[roachpb.ReplicaID]struct{}
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2601,12 +2601,15 @@ func (r *Replica) sendSnapshot(
return err
}

if ioThresh := r.store.ioOverloadedStores.Load()[recipient.StoreID]; ioThresh != nil && destPaused {
if destPaused {
// If the destination is paused, be more hesitant to send snapshots. The destination being
// paused implies that we have recently checked that it's not required for quorum, and that
// we wish to conserve I/O on that store, which sending a snapshot counteracts. So hold back on
// the snapshot as well.
return errors.Errorf("skipping snapshot; %s is overloaded: %s", recipient, ioThresh)
return errors.Errorf(
"skipping snapshot; %s is overloaded: %s",
recipient, r.store.ioThresholds.Current().IOThreshold(recipient.StoreID),
)
}

// Check follower snapshots cluster setting.
Expand Down
7 changes: 2 additions & 5 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
Expand Down Expand Up @@ -1164,9 +1163,7 @@ func maybeFatalOnRaftReadyErr(ctx context.Context, expl string, err error) (remo
// tick the Raft group, returning true if the raft group exists and should
// be queued for Ready processing; false otherwise.
func (r *Replica) tick(
ctx context.Context,
livenessMap liveness.IsLiveMap,
ioOverloadMap map[roachpb.StoreID]*admissionpb.IOThreshold,
ctx context.Context, livenessMap liveness.IsLiveMap, ioThresholdMap *ioThresholdMap,
) (bool, error) {
r.unreachablesMu.Lock()
remotes := r.unreachablesMu.remotes
Expand All @@ -1191,7 +1188,7 @@ func (r *Replica) tick(
return false, nil
}

r.updatePausedFollowersLocked(ctx, ioOverloadMap)
r.updatePausedFollowersLocked(ctx, ioThresholdMap)

now := r.store.Clock().NowAsClockTimestamp()
if r.maybeQuiesceRaftMuLockedReplicaMuLocked(ctx, now, livenessMap) {
Expand Down
144 changes: 120 additions & 24 deletions pkg/kv/kvserver/replica_raft_overload.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,23 @@ import (
"context"
"math/rand"
"sort"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"go.etcd.io/etcd/raft/v3/tracker"
)

var pauseReplicationIOThreshold = settings.RegisterFloatSetting(
settings.SystemOnly,
"admission.kv.pause_replication_io_threshold",
"pause replication to non-essential followers when their I/O admission control score exceeds the given threshold (zero to disable)",
// TODO(tbg): set a nonzero default.
// TODO(tbg): set a sub-one default.
// See: https://github.com/cockroachdb/cockroach/issues/83920
0.0,
10000,
func(v float64) error {
if v == 0 {
return nil
Expand All @@ -42,12 +43,16 @@ var pauseReplicationIOThreshold = settings.RegisterFloatSetting(
},
)

type ioThresholdMapI interface {
// AbovePauseThreshold returns true if the store's score exceeds the threshold
// set for trying to pause replication traffic to followers on it.
AbovePauseThreshold(_ roachpb.StoreID) bool
}

type computeExpendableOverloadedFollowersInput struct {
self roachpb.ReplicaID
replDescs roachpb.ReplicaSet
// TODO(tbg): all entries are overloaded, so consdier removing the IOThreshold here
// because it's confusing.
ioOverloadMap map[roachpb.StoreID]*admissionpb.IOThreshold
self roachpb.ReplicaID
replDescs roachpb.ReplicaSet
ioOverloadMap ioThresholdMapI
// getProgressMap returns Raft's view of the progress map. This is only called
// when needed, and at most once.
getProgressMap func(context.Context) map[uint64]tracker.Progress
Expand Down Expand Up @@ -86,7 +91,7 @@ const (
// overload), this method does very little work.
//
// If at least one follower is (close to being) overloaded, we determine the
// maximum set of such followers that we can afford not replicating to without
// maximum set of such followers that we can afford not to replicate to without
// losing quorum by successively reducing the set of overloaded followers by one
// randomly selected overloaded voter. The randomness makes it more likely that
// when there are multiple overloaded stores in the system that cannot be
Expand All @@ -108,7 +113,7 @@ func computeExpendableOverloadedFollowers(
var prs map[uint64]tracker.Progress

for _, replDesc := range d.replDescs.AsProto() {
if _, overloaded := d.ioOverloadMap[replDesc.StoreID]; !overloaded || replDesc.ReplicaID == d.self {
if pausable := d.ioOverloadMap.AbovePauseThreshold(replDesc.StoreID); !pausable || replDesc.ReplicaID == d.self {
continue
}
// There's at least one overloaded follower, so initialize
Expand Down Expand Up @@ -193,26 +198,117 @@ func computeExpendableOverloadedFollowers(
return liveOverloadedVoterCandidates, nonLive
}

type overloadedStoresMap atomic.Value // map[roachpb.StoreID]*admissionpb.IOThreshold
type ioThresholdMap struct {
threshold float64 // threshold at which the score indicates pausability
seq int // bumped on creation if pausable set changed
m map[roachpb.StoreID]*admissionpb.IOThreshold
}

func (osm ioThresholdMap) String() string {
return redact.StringWithoutMarkers(osm)
}

func (osm *overloadedStoresMap) Load() map[roachpb.StoreID]*admissionpb.IOThreshold {
v, _ := (*atomic.Value)(osm).Load().(map[roachpb.StoreID]*admissionpb.IOThreshold)
return v
var _ redact.SafeFormatter = (*ioThresholdMap)(nil)

func (osm ioThresholdMap) SafeFormat(s redact.SafePrinter, verb rune) {
var sl []roachpb.StoreID
for id := range osm.m {
sl = append(sl, id)
}
sort.Slice(sl, func(i, j int) bool {
a, _ := osm.m[sl[i]].Score()
b, _ := osm.m[sl[j]].Score()
return a < b
})
for i, id := range sl {
if i > 0 {
s.SafeString(", ")
}
s.Printf("s%d: %s", id, osm.m[id])
}
if len(sl) > 0 {
s.Printf(" [pausable-threshold=%.2f]", osm.threshold)
}
}

var _ ioThresholdMapI = (*ioThresholdMap)(nil)

// AbovePauseThreshold implements ioThresholdMapI.
func (osm *ioThresholdMap) AbovePauseThreshold(id roachpb.StoreID) bool {
sc, _ := osm.m[id].Score()
return sc > osm.threshold
}

func (osm *ioThresholdMap) NumAbovePauseThreshold() int {
var n int
for id := range osm.m {
if osm.AbovePauseThreshold(id) {
n++
}
}
return n
}

func (osm *overloadedStoresMap) Swap(
m map[roachpb.StoreID]*admissionpb.IOThreshold,
) map[roachpb.StoreID]*admissionpb.IOThreshold {
v, _ := (*atomic.Value)(osm).Swap(m).(map[roachpb.StoreID]*admissionpb.IOThreshold)
return v
func (osm *ioThresholdMap) IOThreshold(id roachpb.StoreID) *admissionpb.IOThreshold {
return osm.m[id]
}

// Sequence allows distinguishing sets of overloaded stores. Whenever an
// ioThresholdMap is created, it inherits the sequence of its predecessor,
// incrementing only when the set of pausable stores has changed in the
// transition.
func (osm *ioThresholdMap) Sequence() int {
return osm.seq
}

type ioThresholds struct {
mu struct {
syncutil.Mutex
inner *ioThresholdMap // always replaced wholesale, so can leak out of mu
}
}

func (osm *ioThresholds) Current() *ioThresholdMap {
osm.mu.Lock()
defer osm.mu.Unlock()
return osm.mu.inner
}

// Replace replaces the stored view of stores for which we track IOThresholds.
// If the set of overloaded stores (i.e. with a score of >= seqThreshold)
// changes in the process, the updated view will have an incremented Sequence().
func (osm *ioThresholds) Replace(
m map[roachpb.StoreID]*admissionpb.IOThreshold, seqThreshold float64,
) (prev, cur *ioThresholdMap) {
osm.mu.Lock()
defer osm.mu.Unlock()
last := osm.mu.inner
if last == nil {
last = &ioThresholdMap{}
}
next := &ioThresholdMap{threshold: seqThreshold, seq: last.seq, m: m}
var delta int
for id := range last.m {
if last.AbovePauseThreshold(id) != next.AbovePauseThreshold(id) {
delta = 1
break
}
}
for id := range next.m {
if last.AbovePauseThreshold(id) != next.AbovePauseThreshold(id) {
delta = 1
break
}
}
next.seq += delta
osm.mu.inner = next
return last, next
}

func (r *Replica) updatePausedFollowersLocked(
ctx context.Context, ioOverloadMap map[roachpb.StoreID]*admissionpb.IOThreshold,
) {
func (r *Replica) updatePausedFollowersLocked(ctx context.Context, ioThresholdMap *ioThresholdMap) {
r.mu.pausedFollowers = nil

if len(ioOverloadMap) == 0 {
if ioThresholdMap.NumAbovePauseThreshold() == 0 {
return
}

Expand Down Expand Up @@ -254,7 +350,7 @@ func (r *Replica) updatePausedFollowersLocked(
d := computeExpendableOverloadedFollowersInput{
self: r.replicaID,
replDescs: r.descRLocked().Replicas(),
ioOverloadMap: ioOverloadMap,
ioOverloadMap: ioThresholdMap,
getProgressMap: func(_ context.Context) map[uint64]tracker.Progress {
prs := r.mu.internalRaftGroup.Status().Progress
updateRaftProgressFromActivity(ctx, prs, r.descRLocked().Replicas().AsProto(), func(id roachpb.ReplicaID) bool {
Expand Down
31 changes: 29 additions & 2 deletions pkg/kv/kvserver/replica_raft_overload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/echotest"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/redact"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/v3/tracker"
)
Expand All @@ -47,7 +49,7 @@ func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T)
var seed uint64
var replDescs roachpb.ReplicaSet
var self roachpb.ReplicaID
ioOverloadMap := map[roachpb.StoreID]*admissionpb.IOThreshold{}
ioOverloadMap := &ioThresholdMap{threshold: 1.0, m: map[roachpb.StoreID]*admissionpb.IOThreshold{}}
snapshotMap := map[roachpb.ReplicaID]struct{}{}
downMap := map[roachpb.ReplicaID]struct{}{}
match := map[roachpb.ReplicaID]uint64{}
Expand Down Expand Up @@ -87,7 +89,7 @@ func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T)
}
replDescs.AddReplica(replDesc)
case "overloaded":
ioOverloadMap[roachpb.StoreID(id)] = &admissionpb.IOThreshold{
ioOverloadMap.m[roachpb.StoreID(id)] = &admissionpb.IOThreshold{
L0NumSubLevels: 1000,
L0NumSubLevelsThreshold: 20,
L0NumFiles: 1,
Expand Down Expand Up @@ -156,3 +158,28 @@ func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T)
})
})
}

func TestIoThresholdMap_SafeFormat(t *testing.T) {
defer leaktest.AfterTest(t)()
m := ioThresholdMap{threshold: 0.8, seq: 1, m: map[roachpb.StoreID]*admissionpb.IOThreshold{
1: { // score 0.7
L0NumSubLevels: 100,
L0NumSubLevelsThreshold: 1000,
L0NumFiles: 700,
L0NumFilesThreshold: 1000,
},
7: { // score 0.9
L0NumSubLevels: 90,
L0NumSubLevelsThreshold: 100,
L0NumFiles: 100,
L0NumFilesThreshold: 1000,
},
9: { // score 1.1
L0NumSubLevels: 110,
L0NumSubLevelsThreshold: 100,
L0NumFiles: 100,
L0NumFilesThreshold: 1000,
},
}}
echotest.Require(t, string(redact.Sprint(m)), testutils.TestDataPath(t, "io_threshold_map.txt"))
}
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/echotest"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -8117,7 +8118,8 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
r.mu.Unlock()

// Tick raft.
if _, err := r.tick(ctx, nil, nil); err != nil {
iot := ioThresholdMap{m: map[roachpb.StoreID]*admissionpb.IOThreshold{}}
if _, err := r.tick(ctx, nil, &iot); err != nil {
t.Fatal(err)
}

Expand Down
21 changes: 12 additions & 9 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,10 +932,10 @@ type Store struct {
// liveness. It is updated periodically in raftTickLoop()
// and reactively in nodeIsLiveCallback() on liveness updates.
livenessMap atomic.Value
// ioOverloadedStores is analogous to livenessMap, but stores a
// map[StoreID]*IOThreshold. It is gossip-backed but is not updated
// ioThresholds is analogous to livenessMap, but stores the *IOThresholds for
// the stores in the cluster . It is gossip-backed but is not updated
// reactively, i.e. will refresh on each tick loop iteration only.
ioOverloadedStores overloadedStoresMap
ioThresholds *ioThresholds

// cachedCapacity caches information on store capacity to prevent
// expensive recomputations in case leases or replicas are rapidly
Expand Down Expand Up @@ -1177,13 +1177,16 @@ func NewStore(
if !cfg.Valid() {
log.Fatalf(ctx, "invalid store configuration: %+v", &cfg)
}
iot := ioThresholds{}
iot.Replace(nil, 1.0) // init as empty
s := &Store{
cfg: cfg,
db: cfg.DB, // TODO(tschottdorf): remove redundancy.
engine: eng,
nodeDesc: nodeDesc,
metrics: newStoreMetrics(cfg.HistogramWindowInterval),
ctSender: cfg.ClosedTimestampSender,
cfg: cfg,
db: cfg.DB, // TODO(tschottdorf): remove redundancy.
engine: eng,
nodeDesc: nodeDesc,
metrics: newStoreMetrics(cfg.HistogramWindowInterval),
ctSender: cfg.ClosedTimestampSender,
ioThresholds: &iot,
}
s.ioThreshold.t = &admissionpb.IOThreshold{}
if cfg.RPCContext != nil {
Expand Down
Loading