Skip to content

Commit

Permalink
kvserver: limit the frequency at which system config updates enqueue …
Browse files Browse the repository at this point in the history
…replicas

This has a shockingly profound impact on the runtime of ORM tests.

This PR is a fancier implementation of #53603 that permits bursting and makes
setting a relatively low rate feel less risky in the face of a small burst of
changes.

Release justification: low risk, high benefit changes to existing functionality

Release note (performance improvement): Limited the frequency of an expensive
operation due to schema changes making workload which perform schema changes
at a high rate less resource intensive.
  • Loading branch information
ajwerner committed Aug 31, 2020
1 parent 7823fc8 commit ab3f53d
Showing 1 changed file with 40 additions and 7 deletions.
47 changes: 40 additions & 7 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,21 @@ var concurrentRangefeedItersLimit = settings.RegisterPositiveIntSetting(
64,
)

// Minimum time interval between system config updates which will lead to
// enqueuing replicas.
var queueAdditionOnSystemConfigUpdateRate = settings.RegisterNonNegativeFloatSetting(
"kv.store.system_config_update.queue_add_rate",
"the rate (per second) at which the store will add all replicas to the split and merge queue due to system config gossip",
.5)

// Minimum time interval between system config updates which will lead to
// enqueuing replicas. The default is relatively high to deal with startup
// scenarios.
var queueAdditionOnSystemConfigUpdateBurst = settings.RegisterNonNegativeIntSetting(
"kv.store.system_config_update.queue_add_burst",
"the burst rate at which the store will add all replicas to the split and merge queue due to system config gossip",
32)

// raftLeadershipTransferTimeout limits the amount of time a drain command
// waits for lease transfers.
var raftLeadershipTransferWait = func() *settings.DurationSetting {
Expand Down Expand Up @@ -610,7 +625,8 @@ type Store struct {
// tenantRateLimiters manages tenantrate.Limiters
tenantRateLimiters *tenantrate.LimiterFactory

computeInitialMetrics sync.Once
computeInitialMetrics sync.Once
systemConfigUpdateQueueRateLimiter *quotapool.RateLimiter
}

var _ kv.Sender = &Store{}
Expand Down Expand Up @@ -905,6 +921,20 @@ func NewStore(
s.tenantRateLimiters = tenantrate.NewLimiterFactory(cfg.Settings, &cfg.TestingKnobs.TenantRateKnobs)
s.metrics.registry.AddMetricStruct(s.tenantRateLimiters.Metrics())

s.systemConfigUpdateQueueRateLimiter = quotapool.NewRateLimiter(
"SystemConfigUpdateQueue",
quotapool.Limit(queueAdditionOnSystemConfigUpdateRate.Get(&cfg.Settings.SV)),
queueAdditionOnSystemConfigUpdateBurst.Get(&cfg.Settings.SV))
updateSystemConfigUpdateQueueLimits := func() {
s.systemConfigUpdateQueueRateLimiter.UpdateLimit(
quotapool.Limit(queueAdditionOnSystemConfigUpdateRate.Get(&cfg.Settings.SV)),
queueAdditionOnSystemConfigUpdateBurst.Get(&cfg.Settings.SV))
}
queueAdditionOnSystemConfigUpdateRate.SetOnChange(&cfg.Settings.SV,
updateSystemConfigUpdateQueueLimits)
queueAdditionOnSystemConfigUpdateBurst.SetOnChange(&cfg.Settings.SV,
updateSystemConfigUpdateQueueLimits)

if s.cfg.Gossip != nil {
// Add range scanner and configure with queues.
s.scanner = newReplicaScanner(
Expand Down Expand Up @@ -1826,6 +1856,7 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) {
// For every range, update its zone config and check if it needs to
// be split or merged.
now := s.cfg.Clock.Now()
shouldQueue := s.systemConfigUpdateQueueRateLimiter.AdmitN(1)
newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
key := repl.Desc().StartKey
zone, err := sysCfg.GetZoneConfigForKey(key)
Expand All @@ -1836,12 +1867,14 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) {
zone = s.cfg.DefaultZoneConfig
}
repl.SetZoneConfig(zone)
s.splitQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
s.mergeQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
if shouldQueue {
s.splitQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
s.mergeQueue.Async(ctx, "gossip update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
}
return true // more
})
}
Expand Down

0 comments on commit ab3f53d

Please sign in to comment.