Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Commit

Permalink
Writer improvements.
Browse files Browse the repository at this point in the history
* More modular structure for some extra code reuse
* Readded retriable errors
* Added exponential and jittery backoff timer and apply it by default to
  failed flushes.
  • Loading branch information
AlexJF committed Dec 27, 2017
1 parent 33b1bab commit 4626e73
Show file tree
Hide file tree
Showing 13 changed files with 1,685 additions and 247 deletions.
78 changes: 78 additions & 0 deletions backoff/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package backoff

import "time"

// Timer represents a timer that implements some backOff strategy that can adapt to number of schedulings.
type Timer interface {
ScheduleRetry(err error)
CurrentDelay() time.Duration
ReceiveTick() <-chan time.Time
Reset()
Stop()
}

type DelayProvider = func(numRetries int, err error) time.Duration

// CustomTimer represents a backoff timer configured with a certain DelayProvider.
type CustomTimer struct {
numRetries int
currentDelay time.Duration

delayProvider DelayProvider

tickChannel chan time.Time
timer *time.Timer
}

func NewCustomTimer(delayProvider DelayProvider) *CustomTimer {
return &CustomTimer{
delayProvider: delayProvider,
tickChannel: make(chan time.Time),
}
}

// ScheduleRetry schedules the next retry tick according to the given delay
func (t *CustomTimer) ScheduleRetry(err error) {
t.Stop()
t.currentDelay = t.delayProvider(t.numRetries, err)

t.timer = time.AfterFunc(t.currentDelay, func() {
t.tickChannel <- time.Now()
})
t.numRetries += 1
}

// CurrentDelay returns the delay of the current or last ticked retry.
func (t *CustomTimer) CurrentDelay() time.Duration {
return t.currentDelay
}

// NumRetries returns the number of tries since this timer was last reset.
func (t *CustomTimer) NumRetries() int {
return t.numRetries
}

// ReceiveTick returns a channel that will receive a time.Time object as soon as the previously scheduled retry ticks.
func (t *CustomTimer) ReceiveTick() <-chan time.Time {
return t.tickChannel
}

// Reset stops and resets the number of retries counter of this timer.
func (t *CustomTimer) Reset() {
t.Stop()
t.numRetries = 0
t.currentDelay = 0
}

// Stop prevents any current scheduled retry from ticking.
func (t *CustomTimer) Stop() {
if t.timer != nil {
t.timer.Stop()
}
}

// Close cleans up the resources used by this timer. It cannot be reused after this call.
func (t *CustomTimer) Close() {
t.Reset()
close(t.tickChannel)
}
117 changes: 117 additions & 0 deletions backoff/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package backoff

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

type SpecialError struct{}

func (*SpecialError) Error() string {
return "this is a very special error"
}

func TestCustomTimer_ScheduleRetry(t *testing.T) {
assert := assert.New(t)

testDelay := 200 * time.Millisecond

timer := NewCustomTimer(func(numRetries int, err error) time.Duration {
if _, ok := err.(*SpecialError); ok {
// If special error use fixed delay of 100 ms
return 100 * time.Millisecond
} else {
// If normal error (or nil)
return time.Duration(int64(1+numRetries) * int64(testDelay))
}
})

// First schedule (numRetries == 0)
callTime := time.Now()
timer.ScheduleRetry(nil)
assert.Equal(testDelay, timer.CurrentDelay(), "Timer should report correct retry delay")

select {
case tickTime := <-timer.ReceiveTick():
assert.WithinDuration(tickTime, callTime, time.Duration(1.5*float64(testDelay)),
"Tick time and call time should be within expected delay of each other (with a small margin)")
case <-time.After(1 * time.Second):
assert.Fail("Received no tick within 500ms")
}

// Second schedule (numRetries == 1)
callTime = time.Now()
timer.ScheduleRetry(nil)
assert.Equal(time.Duration(2*testDelay), timer.CurrentDelay(), "Timer should report correct retry delay")

select {
case tickTime := <-timer.ReceiveTick():
assert.WithinDuration(tickTime, callTime, time.Duration(2.5*float64(testDelay)),
"Tick time and call time should be within expected delay of each other (with a small margin)")
case <-time.After(1 * time.Second):
assert.Fail("Received no tick within 500ms")
}

// Third schedule (numRetries == 2 but error is SpecialError)
callTime = time.Now()
timer.ScheduleRetry(&SpecialError{})
assert.Equal(100*time.Millisecond, timer.CurrentDelay(), "Timer should report correct retry delay")

select {
case tickTime := <-timer.ReceiveTick():
assert.WithinDuration(tickTime, callTime, time.Duration(200*time.Millisecond),
"Tick time and call time should be within expected delay of each other (with a small margin)")
case <-time.After(1 * time.Second):
assert.Fail("Received no tick within 500ms")
}

timer.Close()
}

func TestCustomTimer_StopNotTicked(t *testing.T) {
assert := assert.New(t)

testDelay := 100 * time.Millisecond

timer := NewCustomTimer(func(_ int, _ error) time.Duration { return testDelay })

timer.ScheduleRetry(nil)
timer.Stop()

select {
case <-timer.ReceiveTick():
assert.Fail("Shouldn't have received tick because timer was stopped")
case <-time.After(2 * testDelay):
assert.True(true, "Should end without receiving anything")
}

assert.Equal(1, timer.NumRetries(), "Stopping the timer should not have reset it")
assert.Equal(testDelay, timer.CurrentDelay(), "Stopping the timer should not have reset it")

timer.Close()
}

func TestCustomTimer_Reset(t *testing.T) {
assert := assert.New(t)

testDelay := 100 * time.Millisecond

timer := NewCustomTimer(func(_ int, _ error) time.Duration { return testDelay })

timer.ScheduleRetry(nil)
timer.Reset()

select {
case <-timer.ReceiveTick():
assert.Fail("Shouldn't have received tick because resetting a timer should also stop it")
case <-time.After(2 * testDelay):
assert.True(true, "Should end without receiving anything")
}

assert.Equal(0, timer.NumRetries(), "Timer should have been reset")
assert.Equal(0*time.Millisecond, timer.CurrentDelay(), "Timer should have been reset")

timer.Close()
}
60 changes: 60 additions & 0 deletions backoff/exponential.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package backoff

import (
"math"
"math/rand"
"time"
)

// ExponentialConfig holds the parameters used by the ExponentialTimer.
type ExponentialConfig struct {
MaxDuration time.Duration
GrowthBase int
Base time.Duration
Random *rand.Rand
}

// DefaultExponentialConfig creates an ExponentialConfig with default values.
func DefaultExponentialConfig() ExponentialConfig {
return ExponentialConfig{
MaxDuration: 120 * time.Second,
GrowthBase: 2,
Base: time.Second,
Random: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

func DefaultExponentialDelayProvider() DelayProvider {
return ExponentialDelayProvider(DefaultExponentialConfig())
}

func ExponentialDelayProvider(conf ExponentialConfig) DelayProvider {
return func(numRetries int, _ error) time.Duration {
newExpDuration := time.Duration(int64(math.Pow(float64(conf.GrowthBase), float64(numRetries))) *
int64(conf.Base))

if newExpDuration > conf.MaxDuration {
newExpDuration = conf.MaxDuration
}

return time.Duration(conf.Random.Int63n(int64(newExpDuration)))
}
}

// ExponentialTimer performs an exponential backoff following the FullJitter implementation described in
// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
type ExponentialTimer struct {
CustomTimer
}

// NewExponentialTimer creates an exponential backoff timer using the default configuration.
func NewExponentialTimer() *ExponentialTimer {
return NewCustomExponentialTimer(DefaultExponentialConfig())
}

// NewCustomExponentialTimer creates an exponential backoff timer using the provided configuration.
func NewCustomExponentialTimer(conf ExponentialConfig) *ExponentialTimer {
return &ExponentialTimer{
CustomTimer: *NewCustomTimer(ExponentialDelayProvider(conf)),
}
}
80 changes: 80 additions & 0 deletions backoff/exponential_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package backoff

import (
"fmt"
"math"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

var bogusError = fmt.Errorf("bogus error")

func TestDefaultRandomSeed(t *testing.T) {
assert := assert.New(t)

delayProvider1 := DefaultExponentialDelayProvider()
delayProvider2 := DefaultExponentialDelayProvider()

// Ensure different timers are not synchronized in their backoffing (use different seeds)
assert.NotEqual(delayProvider1(0, nil), delayProvider2(0, nil))
}

func TestExponentialDelay(t *testing.T) {
assert := assert.New(t)

conf := ExponentialConfig{
// Use nanoseconds to reduce universe from which randoms are chosen. Seconds should be the same, just scaled.
MaxDuration: 120 * time.Nanosecond,
GrowthBase: 2,
Base: time.Nanosecond,
// Use fixed random to prevent flakiness in case the CI has very bad luck
Random: rand.New(rand.NewSource(1234)),
}

delayProvider := ExponentialDelayProvider(conf)

prevMax := int64(0)

// Try successive calls to delayProvider with increasing numRetries (from 0 to 19).
for i := 0; i < 20; i++ {
expectedMax := int64(math.Pow(2, float64(i)))

if expectedMax > int64(conf.MaxDuration) {
expectedMax = int64(conf.MaxDuration)
}

// For each value of numRetries, get min and max value we saw over 500 calls
min, max := minMaxForSample(delayProvider, 500, i)

assert.True(max <= expectedMax, "Max should be lower or equal to expected max. Max: %d, expected: %d", max,
expectedMax)
assert.True(max >= prevMax, "Max should grow because this is exp. backoff. Current: %d, prev: %d",
max, prevMax)
assert.True(min <= max/2, "Minimum should be 'far' from max since this should be jittery. Min: %d, max: %d",
min, max)

prevMax = max
}
}

func minMaxForSample(delayProvider DelayProvider, n int, numTries int) (min, max int64) {
max = 0
min = math.MaxInt64

for i := 0; i < n; i++ {
delay := int64(delayProvider(numTries, nil))

if delay > max {
max = delay
}

if delay < min {
min = delay
}
}

return
}
1 change: 1 addition & 0 deletions info/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package info
type TraceWriterInfo struct {
Payloads int64
Traces int64
Spans int64
Errors int64
Bytes int64
}
Expand Down
Loading

0 comments on commit 4626e73

Please sign in to comment.