Skip to content

Commit

Permalink
Merge pull request #106186 from wenyihu6/backport22.2-105924
Browse files Browse the repository at this point in the history
release-22.2: allocator: expose LeaseRebalanceThreshold as a cluster setting
  • Loading branch information
wenyihu6 committed Jul 10, 2023
2 parents dac1313 + 5c8c402 commit 92136d9
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 26 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<tr><td><div id="setting-feature-schema-change-enabled" class="anchored"><code>feature.schema_change.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to enable schema changes, false to disable; default is true</td></tr>
<tr><td><div id="setting-feature-stats-enabled" class="anchored"><code>feature.stats.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to enable CREATE STATISTICS/ANALYZE, false to disable; default is true</td></tr>
<tr><td><div id="setting-jobs-retention-time" class="anchored"><code>jobs.retention_time</code></div></td><td>duration</td><td><code>336h0m0s</code></td><td>the amount of time to retain records for completed jobs before</td></tr>
<tr><td><div id="setting-kv-allocator-lease-rebalance-threshold" class="anchored"><code>kv.allocator.lease_rebalance_threshold</code></div></td><td>float</td><td><code>0.05</code></td><td>minimum fraction away from the mean a store&#39;s lease count can be before it is considered for lease-transfers</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-lease-rebalancing-enabled" class="anchored"><code>kv.allocator.load_based_lease_rebalancing.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to enable rebalancing of range leases based on load and latency</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-rebalancing" class="anchored"><code>kv.allocator.load_based_rebalancing</code></div></td><td>enumeration</td><td><code>leases and replicas</code></td><td>whether to rebalance based on the distribution of QPS across stores [off = 0, leases = 1, leases and replicas = 2]</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-rebalancing-interval" class="anchored"><code>kv.allocator.load_based_rebalancing_interval</code></div></td><td>duration</td><td><code>1m0s</code></td><td>the rough interval at which each store will check for load-based lease / replica rebalancing opportunities</td></tr>
Expand Down
36 changes: 24 additions & 12 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,30 @@ import (
)

const (
// leaseRebalanceThreshold is the minimum ratio of a store's lease surplus
// to the mean range/lease count that permits lease-transfers away from that
// store.
leaseRebalanceThreshold = 0.05

// baseLoadBasedLeaseRebalanceThreshold is the equivalent of
// leaseRebalanceThreshold for load-based lease rebalance decisions (i.e.
// "follow-the-workload"). It's the base threshold for decisions that get
// adjusted based on the load and latency of the involved ranges/nodes.
baseLoadBasedLeaseRebalanceThreshold = 2 * leaseRebalanceThreshold

// minReplicaWeight sets a floor for how low a replica weight can be. This is
// needed because a weight of zero doesn't work in the current lease scoring
// algorithm.
minReplicaWeight = 0.001
)

// LeaseRebalanceThreshold is the minimum ratio of a store's lease surplus to
// the mean range/lease count that permits lease-transfers away from that store.
var LeaseRebalanceThreshold = settings.RegisterFloatSetting(
settings.SystemOnly,
"kv.allocator.lease_rebalance_threshold",
"minimum fraction away from the mean a store's lease count can be before "+
"it is considered for lease-transfers",
0.05,
).WithPublic()

// getBaseLoadBasedLeaseRebalanceThreshold returns the equivalent of
// LeaseRebalanceThreshold for load-based lease rebalance decisions (i.e.
// "follow-the-workload"). It's the base threshold for decisions that get
// adjusted based on the load and latency of the involved ranges/nodes.
func getBaseLoadBasedLeaseRebalanceThreshold(leaseRebalanceThreshold float64) float64 {
return 2 * leaseRebalanceThreshold
}

// MinLeaseTransferStatsDuration configures the minimum amount of time a
// replica must wait for stats about request counts to accumulate before
// making decisions based on them. The higher this is, the less likely
Expand Down Expand Up @@ -463,6 +470,7 @@ type AllocatorMetrics struct {
// Allocator tries to spread replicas as evenly as possible across the stores
// in the cluster.
type Allocator struct {
st *cluster.Settings
StorePool *storepool.StorePool
nodeLatencyFn func(addr string) (time.Duration, bool)
// TODO(aayush): Let's replace this with a *rand.Rand that has a rand.Source
Expand Down Expand Up @@ -494,6 +502,7 @@ func makeAllocatorMetrics() AllocatorMetrics {

// MakeAllocator creates a new allocator using the specified StorePool.
func MakeAllocator(
st *cluster.Settings,
storePool *storepool.StorePool,
nodeLatencyFn func(addr string) (time.Duration, bool),
knobs *allocator.TestingKnobs,
Expand All @@ -508,6 +517,7 @@ func MakeAllocator(
randSource = rand.NewSource(rand.Int63())
}
allocator := Allocator{
st: st,
StorePool: storePool,
nodeLatencyFn: nodeLatencyFn,
randGen: makeAllocatorRand(randSource),
Expand Down Expand Up @@ -2024,6 +2034,7 @@ func (a Allocator) FollowTheWorkloadPrefersLocal(
return false
}
adjustment := adjustments[candidate]
baseLoadBasedLeaseRebalanceThreshold := getBaseLoadBasedLeaseRebalanceThreshold(LeaseRebalanceThreshold.Get(&a.st.SV))
if adjustment > baseLoadBasedLeaseRebalanceThreshold {
log.KvDistribution.VEventf(ctx, 3,
"s%d is a better fit than s%d due to follow-the-workload (score: %.2f; threshold: %.2f)",
Expand Down Expand Up @@ -2198,7 +2209,7 @@ func loadBasedLeaseRebalanceScore(
// Start with twice the base rebalance threshold in order to fight more
// strongly against thrashing caused by small variances in the distribution
// of request weights.
rebalanceThreshold := baseLoadBasedLeaseRebalanceThreshold - rebalanceAdjustment
rebalanceThreshold := getBaseLoadBasedLeaseRebalanceThreshold(LeaseRebalanceThreshold.Get(&st.SV)) - rebalanceAdjustment

overfullLeaseThreshold := int32(math.Ceil(meanLeases * (1 + rebalanceThreshold)))
overfullScore := source.Capacity.LeaseCount - overfullLeaseThreshold
Expand Down Expand Up @@ -2232,6 +2243,7 @@ func (a Allocator) shouldTransferLeaseForLeaseCountConvergence(

// Allow lease transfer if we're above the overfull threshold, which is
// mean*(1+leaseRebalanceThreshold).
leaseRebalanceThreshold := LeaseRebalanceThreshold.Get(&a.st.SV)
overfullLeaseThreshold := int32(math.Ceil(sl.CandidateLeases.Mean * (1 + leaseRebalanceThreshold)))
minOverfullThreshold := int32(math.Ceil(sl.CandidateLeases.Mean + 5))
if overfullLeaseThreshold < minOverfullThreshold {
Expand Down
21 changes: 13 additions & 8 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2029,11 +2029,12 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx,
storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */
func() int { return 10 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(storePool, func(string) (time.Duration, bool) {
a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) {
return 0, true
}, nil)
defer stopper.Stop(ctx)
Expand Down Expand Up @@ -2413,11 +2414,12 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
stopper, g, _, storePool, nl := storepool.CreateTestStorePool(ctx,
storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */
func() int { return 10 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(storePool, func(string) (time.Duration, bool) {
a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) {
return 0, true
}, nil)
defer stopper.Stop(context.Background())
Expand Down Expand Up @@ -2479,11 +2481,12 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
stopper, g, clock, storePool, nl := storepool.CreateTestStorePool(ctx,
storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */
func() int { return 10 }, /* nodeCount */
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(storePool, func(string) (time.Duration, bool) {
a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) {
return 0, true
}, nil)
defer stopper.Stop(context.Background())
Expand Down Expand Up @@ -5258,6 +5261,7 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
stopper, g, _, storePool, _ := storepool.CreateTestStorePool(ctx,
storepool.TestTimeUntilStoreDeadOff, true, /* deterministic */
func() int { return 10 }, /* nodeCount */
Expand Down Expand Up @@ -5403,7 +5407,7 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {

for _, c := range testCases {
t.Run("", func(t *testing.T) {
a := MakeAllocator(storePool, func(addr string) (time.Duration, bool) {
a := MakeAllocator(st, storePool, func(addr string) (time.Duration, bool) {
return c.latency[addr], true
}, nil)
target := a.TransferLeaseTarget(
Expand Down Expand Up @@ -6976,11 +6980,12 @@ func TestAllocatorComputeActionDynamicNumReplicas(t *testing.T) {

var numNodes int
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
stopper, _, _, sp, _ := storepool.CreateTestStorePool(ctx,
storepool.TestTimeUntilStoreDeadOff, false, /* deterministic */
func() int { return numNodes },
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(sp, func(string) (time.Duration, bool) {
a := MakeAllocator(st, sp, func(string) (time.Duration, bool) {
return 0, true
}, nil)

Expand Down Expand Up @@ -7086,7 +7091,7 @@ func TestAllocatorComputeActionNoStorePool(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

a := MakeAllocator(nil, nil, nil)
a := MakeAllocator(nil /* clusterSetting */, nil, nil, nil)
action, priority := a.ComputeAction(context.Background(), roachpb.SpanConfig{}, nil)
if action != AllocatorNoop {
t.Errorf("expected AllocatorNoop, but got %v", action)
Expand Down Expand Up @@ -7649,7 +7654,7 @@ func TestAllocatorFullDisks(t *testing.T) {
mockNodeLiveness.NodeLivenessFunc,
false, /* deterministic */
)
alloc := MakeAllocator(sp, func(string) (time.Duration, bool) {
alloc := MakeAllocator(st, sp, func(string) (time.Duration, bool) {
return 0, false
}, nil)

Expand Down Expand Up @@ -8095,7 +8100,7 @@ func exampleRebalancing(
storepool.NewMockNodeLiveness(livenesspb.NodeLivenessStatus_LIVE).NodeLivenessFunc,
/* deterministic */ true,
)
alloc := MakeAllocator(sp, func(string) (time.Duration, bool) {
alloc := MakeAllocator(st, sp, func(string) (time.Duration, bool) {
return 0, false
}, nil)

Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/allocator/allocatorimpl/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
Expand All @@ -36,11 +37,12 @@ func CreateTestAllocator(
func CreateTestAllocatorWithKnobs(
ctx context.Context, numNodes int, deterministic bool, knobs *allocator.TestingKnobs,
) (*stop.Stopper, *gossip.Gossip, *storepool.StorePool, Allocator, *timeutil.ManualTime) {
st := cluster.MakeTestingClusterSettings()
stopper, g, manual, storePool, _ := storepool.CreateTestStorePool(ctx,
storepool.TestTimeUntilStoreDeadOff, deterministic,
func() int { return numNodes },
livenesspb.NodeLivenessStatus_LIVE)
a := MakeAllocator(storePool, func(string) (time.Duration, bool) {
a := MakeAllocator(st, storePool, func(string) (time.Duration, bool) {
return 0, true
}, knobs)
return stopper, g, storePool, a, manual
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/asim/state/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestingWorkloadSeed() int64 {
// for configuration.
func NewStorePool(
nodeCountFn storepool.NodeCountFunc, nodeLivenessFn storepool.NodeLivenessFunc, hlc *hlc.Clock,
) *storepool.StorePool {
) (*storepool.StorePool, *cluster.Settings) {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

Expand All @@ -59,7 +59,7 @@ func NewStorePool(
nodeLivenessFn,
/* deterministic */ true,
)
return sp
return sp, st
}

// OffsetTick offsets start time by adding tick number of seconds to it.
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/asim/state/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/google/btree"
)
Expand Down Expand Up @@ -291,14 +292,16 @@ func (s *state) AddStore(nodeID NodeID) (Store, bool) {
return nil, false
}

sp, st := NewStorePool(s.NodeCountFn(), s.NodeLivenessFn(), hlc.NewClock(&s.clock, 0))
node := s.nodes[nodeID]
s.storeSeqGen++
storeID := s.storeSeqGen
store := &store{
storeID: storeID,
nodeID: nodeID,
desc: roachpb.StoreDescriptor{StoreID: roachpb.StoreID(storeID), Node: node.Descriptor()},
storepool: NewStorePool(s.NodeCountFn(), s.NodeLivenessFn(), hlc.NewClock(&s.clock, 0)),
storepool: sp,
settings: st,
replicas: make(map[RangeID]ReplicaID),
}

Expand Down Expand Up @@ -704,6 +707,7 @@ func (s *state) NodeCountFn() storepool.NodeCountFunc {
// populates the storepool with the current state.
func (s *state) MakeAllocator(storeID StoreID) allocatorimpl.Allocator {
return allocatorimpl.MakeAllocator(
s.stores[storeID].settings,
s.stores[storeID].storepool,
func(addr string) (time.Duration, bool) { return 0, true },
nil,
Expand Down Expand Up @@ -776,6 +780,7 @@ type store struct {
desc roachpb.StoreDescriptor

storepool *storepool.StorePool
settings *cluster.Settings
replicas map[RangeID]ReplicaID
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,9 +1233,9 @@ func NewStore(
}
s.ioThreshold.t = &admissionpb.IOThreshold{}
if cfg.RPCContext != nil {
s.allocator = allocatorimpl.MakeAllocator(cfg.StorePool, cfg.RPCContext.RemoteClocks.Latency, cfg.TestingKnobs.AllocatorKnobs)
s.allocator = allocatorimpl.MakeAllocator(cfg.Settings, cfg.StorePool, cfg.RPCContext.RemoteClocks.Latency, cfg.TestingKnobs.AllocatorKnobs)
} else {
s.allocator = allocatorimpl.MakeAllocator(cfg.StorePool, func(string) (time.Duration, bool) {
s.allocator = allocatorimpl.MakeAllocator(cfg.Settings, cfg.StorePool, func(string) (time.Duration, bool) {
return 0, false
}, cfg.TestingKnobs.AllocatorKnobs)
}
Expand Down

0 comments on commit 92136d9

Please sign in to comment.