Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,10 +772,9 @@ type storeState struct {
storeLoad
StoreAttributesAndLocality
adjusted struct {
// TODO: these adjusted load values can become negative due to applying
// pending changes. We need to let them be negative to retain the ability
// to undo pending changes. Audit the mean computation and rebalancing
// code to ensure that we bump up to a lower bound of zero.
// NB: these load values can become negative due to applying pending
// changes. We need to let them be negative to retain the ability to undo
// pending changes.
load LoadVector
secondaryLoad SecondaryLoadVector
// Pending changes for computing loadReplicas and load.
Expand Down Expand Up @@ -977,6 +976,7 @@ func newStoreState() *storeState {
type nodeState struct {
stores []roachpb.StoreID
NodeLoad
// NB: adjustedCPU can be negative.
adjustedCPU LoadValue
}

Expand Down Expand Up @@ -1780,10 +1780,14 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
// will start shedding replicas, so this is just a heuristic.
fraction = minLeaseLoadFraction
}
threshold := LoadValue(float64(ss.adjusted.load[topk.dim]) * fraction)
// The max(0, ...) is defensive, in case the adjusted load is negative.
// Given that this is a most overloaded dim, the likelihood of the
// adjusted load being negative is very low.
adjustedStoreLoadValue := max(0, ss.adjusted.load[topk.dim])
threshold := LoadValue(float64(adjustedStoreLoadValue) * fraction)
if ss.reportedSecondaryLoad[ReplicaCount] > 0 {
// Allow all ranges above 90% of the mean. This is quite arbitrary.
meanLoad := (ss.adjusted.load[topk.dim] * 9) / (ss.reportedSecondaryLoad[ReplicaCount] * 10)
meanLoad := (adjustedStoreLoadValue * 9) / (ss.reportedSecondaryLoad[ReplicaCount] * 10)
threshold = min(meanLoad, threshold)
}
topk.threshold = threshold
Expand Down Expand Up @@ -2209,11 +2213,6 @@ func (cs *clusterState) undoReplicaChange(change ReplicaChange) {
cs.undoChangeLoadDelta(change)
}

// TODO(kvoli,sumeerbhola): The load of the store and node can become negative
// when applying or undoing load adjustments. For load adjustments to be
// reversible quickly, we aren't able to zero out the value when negative. We
// should handle the negative values when using them.

// applyChangeLoadDelta adds the change load delta to the adjusted load of the
// store and node affected.
func (cs *clusterState) applyChangeLoadDelta(change ReplicaChange) {
Expand Down Expand Up @@ -2416,6 +2415,11 @@ func (cs *clusterState) canShedAndAddLoad(
if targetSS.adjusted.load[overloadedDim] > 0 {
overloadedDimFractionIncrease = float64(deltaToAdd[overloadedDim]) /
float64(targetSS.adjusted.load[overloadedDim])
} else {
// Else, the adjusted load on the overloadedDim is zero or negative, which
// is possible, but extremely rare in practice. We arbitrarily set the
// fraction increase to 1.0 in this case.
overloadedDimFractionIncrease = 1.0
}
otherDimensionsBecameWorseInTarget := false
for i := range targetSLS.dimSummary {
Expand All @@ -2428,16 +2432,18 @@ func (cs *clusterState) canShedAndAddLoad(
}
// This is an overloaded dimension in the target. Only allow small
// increases along this dimension.
dimFractionIncrease := math.MaxFloat64
if targetSS.adjusted.load[dim] > 0 {
dimFractionIncrease = float64(deltaToAdd[dim]) / float64(targetSS.adjusted.load[dim])
}
// The use of 33% is arbitrary.
if dimFractionIncrease > overloadedDimFractionIncrease/3 {
log.KvDistribution.Infof(ctx, "%v: %f > %f/3", dim, dimFractionIncrease, overloadedDimFractionIncrease)
otherDimensionsBecameWorseInTarget = true
break
dimFractionIncrease := float64(deltaToAdd[dim]) / float64(targetSS.adjusted.load[dim])
// The use of 33% is arbitrary.
if dimFractionIncrease > overloadedDimFractionIncrease/3 {
log.KvDistribution.Infof(ctx, "%v: %f > %f/3", dim, dimFractionIncrease, overloadedDimFractionIncrease)
otherDimensionsBecameWorseInTarget = true
break
}
}
// Else the adjusted load in dimension dim is zero or negative, which is
// possible, but extremely rare in practice. We ignore this dimension in
// that case.
}
canAddLoad = overloadedDimPermitsChange && !otherDimensionsBecameWorseInTarget &&
targetSLS.maxFractionPendingIncrease < epsilon &&
Expand Down Expand Up @@ -2527,7 +2533,6 @@ func computeLoadSummary(
var dimSummary [NumLoadDimensions]loadSummary
var worstDim LoadDimension
for i := range msl.load {
// TODO(kvoli,sumeerbhola): Handle negative adjusted store/node loads.
const nodeIDForLogging = 0
ls := loadSummaryForDimension(ctx, ss.StoreID, nodeIDForLogging, LoadDimension(i), ss.adjusted.load[i], ss.capacity[i],
msl.load[i], msl.util[i])
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/allocator/mmaprototype/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ func computeMeansForStoreSet(
clear(scratchStores)
n := 0
for _, storeID := range stores {
// NB: using reported load and not adjusted load, so cannot be
// negative.
nodeID, sload := loadProvider.getStoreReportedLoad(storeID)
if _, ok := scratchStores[storeID]; ok {
continue
Expand All @@ -389,6 +391,8 @@ func computeMeansForStoreSet(
}
nLoad := scratchNodes[nodeID]
if nLoad == nil {
// NB: using reported load and not adjusted load, so cannot be
// negative.
scratchNodes[nodeID] = loadProvider.getNodeReportedLoad(nodeID)
}
}
Expand Down Expand Up @@ -467,6 +471,8 @@ func (ls loadSummary) SafeFormat(w redact.SafePrinter, _ rune) {
}

// Computes the loadSummary for a particular load dimension.
//
// NB: load can be negative since it may be adjusted load.
func loadSummaryForDimension(
ctx context.Context,
storeID roachpb.StoreID,
Expand Down