Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v12] reduce cache retry load (#23025) #24719

Merged
merged 1 commit into from Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 46 additions & 0 deletions api/utils/retryutils/retry_test.go
Expand Up @@ -32,6 +32,21 @@ func TestLinear(t *testing.T) {
Max: 3 * time.Second,
})
require.NoError(t, err)
testLinear(t, r)
}

func TestLinearV2(t *testing.T) {
t.Parallel()

r2, err := NewRetryV2(RetryV2Config{
Driver: NewLinearDriver(time.Second),
Max: 3 * time.Second,
})
require.NoError(t, err)
testLinear(t, r2)
}

func testLinear(t *testing.T, r Retry) {
require.Equal(t, r.Duration(), time.Duration(0))
r.Inc()
require.Equal(t, r.Duration(), time.Second)
Expand All @@ -45,6 +60,37 @@ func TestLinear(t *testing.T) {
require.Equal(t, r.Duration(), time.Duration(0))
}

func TestExponential(t *testing.T) {
t.Parallel()

r, err := NewRetryV2(RetryV2Config{
Driver: NewExponentialDriver(time.Second),
Max: 12 * time.Second,
})
require.NoError(t, err)

require.Equal(t, r.Duration(), time.Duration(0))
r.Inc()
require.Equal(t, r.Duration(), time.Second)
r.Inc()
require.Equal(t, r.Duration(), 2*time.Second)
r.Inc()
require.Equal(t, r.Duration(), 4*time.Second)
r.Inc()
require.Equal(t, r.Duration(), 8*time.Second)
r.Inc()
// should hit configured maximum
require.Equal(t, r.Duration(), 12*time.Second)
r.Reset()
require.Equal(t, r.Duration(), time.Duration(0))

// verify that exponentiation is capped s.t. we don't wrap
for i := 0; i < 128; i++ {
r.Inc()
require.True(t, r.Duration() > 0 && r.Duration() <= time.Second*12)
}
}

func TestLinearRetryMax(t *testing.T) {
t.Parallel()

Expand Down
238 changes: 238 additions & 0 deletions api/utils/retryutils/retryv2.go
@@ -0,0 +1,238 @@
/*
Copyright 2019-2022 Gravitational, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package retryutils defines common retry and jitter logic.
package retryutils

import (
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
)

// maxBackoff is an absolute maximum amount of backoff that our backoff helpers will
// apply. Used as a safety precaution to limit the impact of misconfigured backoffs.
const maxBackoff = 16 * time.Minute

// maxAttempts is the peak attempt number we will scale to (used to prevent overflows).
const maxAttempts = 16

// statically assert that we don't overflow.
const _ = maxBackoff << (maxAttempts - 1)

// statically assert that RetryV2 implements the Retry interface.
var _ Retry = (*RetryV2)(nil)

// driver is the underlying retry driver. determines the difference in behavior between
// linear/exponential retries.
//
// NOTE: drivers must be stateless. If a stateful driver needs to be implemented in the
// future, this interface will need to be extended to support safe use of Retry.Clone.
type Driver interface {
// Duration calculates the step-specific delay for a given attempt. Excludes
// base duration and jitter, which are applied by the outer retry instance.
Duration(attempt int64) time.Duration

// Check verifies the correctness of any driver-internal parameters.
Check() error
}

// NewLinearDriver creates a linear retry driver with the supplied step value. Resulting retries
// have increase their backoff by a fixed step amount on each increment, with the first retry
// having a base step amount of zero.
func NewLinearDriver(step time.Duration) Driver {
return linearDriver{step}
}

type linearDriver struct {
step time.Duration
}

func (d linearDriver) Duration(attempt int64) time.Duration {
dur := d.step * time.Duration(attempt)
if dur > maxBackoff {
return maxBackoff
}
return dur
}

func (d linearDriver) Check() error {
if d.step <= 0 {
return trace.BadParameter("linear driver requires positive step value")
}

if d.step > maxBackoff {
return trace.BadParameter("linear backoff step value too large: %v (max=%v)", d.step, maxBackoff)
}
return nil
}

// NewExponentialDriver creates a new exponential retry driver with the supplied base
// step value. Resulting retries double their base backoff on each increment.
func NewExponentialDriver(base time.Duration) Driver {
return exponentialDriver{base}
}

type exponentialDriver struct {
base time.Duration
}

func (d exponentialDriver) Duration(attempt int64) time.Duration {
if attempt > maxAttempts {
// 16 will exceed any reasonable Max value already, and we don't
// want to accidentally wrap and end up w/ negative durations.
attempt = 16
}

// in order to maintain consistency with existing linear behavior, the first attempt
// results in a base duration of 0.
if attempt <= 0 {
return 0
}

// duration calculated as step * the square of the attempt number
dur := d.base << (attempt - 1)

if dur > maxBackoff {
return maxBackoff
}

return dur
}

func (d exponentialDriver) Check() error {
if d.base <= 0 {
return trace.BadParameter("exponential driver requires positive base")
}

if d.base > maxBackoff {
return trace.BadParameter("exponential backoff base too large: %v (max=%v)", d.base, maxBackoff)
}
return nil
}

// RetryV2Config sets up retry configuration
// using arithmetic progression
type RetryV2Config struct {
// First is a first element of the progression,
// could be 0
First time.Duration
// Driver generates the underlying progression of delays. Cannot be nil.
Driver Driver
// Max is a maximum value of the progression,
// can't be 0
Max time.Duration
// Jitter is an optional jitter function to be applied
// to the delay. Note that supplying a jitter means that
// successive calls to Duration may return different results.
Jitter Jitter `json:"-"`
// AutoReset, if greater than zero, causes the linear retry to automatically
// reset after Max * AutoReset has elapsed since the last call to Incr.
AutoReset int64
// Clock to override clock in tests
Clock clockwork.Clock
}

// CheckAndSetDefaults checks and sets defaults
func (c *RetryV2Config) CheckAndSetDefaults() error {
if c.Driver == nil {
return trace.BadParameter("missing parameter Driver")
}
if err := c.Driver.Check(); err != nil {
return trace.Wrap(err)
}
if c.Max == 0 {
return trace.BadParameter("missing parameter Max")
}
if c.Clock == nil {
c.Clock = clockwork.NewRealClock()
}
return nil
}

// NewRetryV2 returns a new retry instance.
func NewRetryV2(cfg RetryV2Config) (*RetryV2, error) {
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
return newRetryV2(cfg), nil
}

// newRetryV2 creates an instance of RetryV2 from a
// previously verified configuration.
func newRetryV2(cfg RetryV2Config) *RetryV2 {
return &RetryV2{RetryV2Config: cfg}
}

// RetryV2 is used to moderate the rate of retries by applying successively increasing
// delays. The nature of the progression is determined by the 'Driver', which generates
// the portion of the delay corresponding to the attempt number (e.g. Exponential(1s) might
// generate the sequence 0s, 1s, 2s, 4s, 8s, etc). This progression is can be modified through
// the use of a custom base/start value, jitters, etc.
type RetryV2 struct {
// RetryV2Config is a linear retry config
RetryV2Config
lastUse time.Time
attempt int64
}

// Reset resets retry period to initial state
func (r *RetryV2) Reset() {
r.attempt = 0
}

// Clone creates an identical copy of RetryV2 with fresh state.
func (r *RetryV2) Clone() Retry {
return newRetryV2(r.RetryV2Config)
}

// Inc increments attempt counter
func (r *RetryV2) Inc() {
r.attempt++
}

// Duration returns retry duration based on state
func (r *RetryV2) Duration() time.Duration {
if r.AutoReset > 0 {
now := r.Clock.Now()
if now.After(r.lastUse.Add(r.Max * time.Duration(r.AutoReset))) {
r.Reset()
}
r.lastUse = now
}

a := r.First + r.Driver.Duration(r.attempt)
if a < 1 {
return 0
}

if a > r.Max {
a = r.Max
}

if r.Jitter != nil {
a = r.Jitter(a)
}

return a
}

// After returns channel that fires with timeout
// defined in Duration method.
func (r *RetryV2) After() <-chan time.Time {
return r.Clock.After(r.Duration())
}
35 changes: 29 additions & 6 deletions lib/cache/cache.go
Expand Up @@ -719,9 +719,22 @@ func (c *Config) CheckAndSetDefaults() error {
}
if c.MaxRetryPeriod == 0 {
c.MaxRetryPeriod = defaults.MaxWatcherBackoff

// non-control-plane caches should use a longer backoff in order to limit
// thundering herd effects upon restart of control-plane elements.
if !isControlPlane(c.target) {
c.MaxRetryPeriod = defaults.MaxLongWatcherBackoff
}
}
if c.WatcherInitTimeout == 0 {
c.WatcherInitTimeout = time.Minute
c.WatcherInitTimeout = defaults.MaxWatcherBackoff

// permit non-control-plane watchers to take a while to start up. slow receipt of
// init events is a common symptom of the thundering herd effect caused by restarting
// control plane elements.
if !isControlPlane(c.target) {
c.WatcherInitTimeout = defaults.MaxLongWatcherBackoff
}
}
if c.CacheInitTimeout == 0 {
c.CacheInitTimeout = time.Second * 20
Expand Down Expand Up @@ -869,13 +882,14 @@ func New(config Config) (*Cache, error) {

// Start the cache. Should only be called once.
func (c *Cache) Start() error {
retry, err := retryutils.NewLinear(retryutils.LinearConfig{
First: utils.FullJitter(c.MaxRetryPeriod / 10),
Step: c.MaxRetryPeriod / 5,
retry, err := retryutils.NewRetryV2(retryutils.RetryV2Config{
First: utils.FullJitter(c.MaxRetryPeriod / 16),
Driver: retryutils.NewExponentialDriver(c.MaxRetryPeriod / 16),
Max: c.MaxRetryPeriod,
Jitter: retryutils.NewHalfJitter(),
Clock: c.Clock,
})

if err != nil {
c.Close()
return trace.Wrap(err)
Expand Down Expand Up @@ -1327,14 +1341,23 @@ func tracedApplyFn(parent oteltrace.Span, tracer oteltrace.Tracer, kind resource
// throttled to limit load spiking during a mass
// restart of nodes
func fetchLimit(target string) int {
switch target {
case "auth", "proxy":
if isControlPlane(target) {
return 5
}

return 1
}

// isControlPlane checks if the cache target is a control-plane element.
func isControlPlane(target string) bool {
switch target {
case "auth", "proxy":
return true
}

return false
}

func (c *Cache) fetch(ctx context.Context) (fn applyFn, err error) {
ctx, fetchSpan := c.Tracer.Start(ctx, "cache/fetch", oteltrace.WithAttributes(attribute.String("target", c.target)))
defer func() {
Expand Down
14 changes: 11 additions & 3 deletions lib/cache/cache_test.go
Expand Up @@ -2965,7 +2965,7 @@ func TestCache_Backoff(t *testing.T) {
p.eventsS.closeWatchers()
p.backend.SetReadError(trace.ConnectionProblem(nil, "backend is unavailable"))

step := p.cache.Config.MaxRetryPeriod / 5.0
step := p.cache.Config.MaxRetryPeriod / 16.0
for i := 0; i < 5; i++ {
// wait for cache to reload
select {
Expand All @@ -2974,8 +2974,16 @@ func TestCache_Backoff(t *testing.T) {
duration, err := time.ParseDuration(event.Event.Resource.GetKind())
require.NoError(t, err)

stepMin := step * time.Duration(i) / 2
stepMax := step * time.Duration(i+1)
// emulate the logic of exponential backoff multiplier calc
var mul int64
if i == 0 {
mul = 0
} else {
mul = 1 << (i - 1)
}

stepMin := step * time.Duration(mul) / 2
stepMax := step * time.Duration(mul+1)

require.GreaterOrEqual(t, duration, stepMin)
require.LessOrEqual(t, duration, stepMax)
Expand Down