Skip to content

Commit

Permalink
throttler time tests resilience
Browse files Browse the repository at this point in the history
  • Loading branch information
Kostiantyn Masliuk committed Aug 31, 2020
1 parent f3acfaf commit 90a41af
Showing 1 changed file with 49 additions and 55 deletions.
104 changes: 49 additions & 55 deletions throttlers_test.go
Expand Up @@ -5,33 +5,35 @@ import (
"errors"
"fmt"
"regexp"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

const (
ms0_0 time.Duration = 0
ms0_9 time.Duration = time.Duration(0.9 * float64(time.Millisecond))
ms1_0 time.Duration = time.Millisecond
ms1_5 time.Duration = time.Duration(1.5 * float64(time.Millisecond))
ms2_0 time.Duration = 2 * time.Millisecond
ms3_0 time.Duration = 3 * time.Millisecond
ms4_0 time.Duration = 4 * time.Millisecond
ms5_0 time.Duration = 5 * time.Millisecond
ms7_0 time.Duration = 7 * time.Millisecond
ms9_0 time.Duration = 9 * time.Millisecond
)

type tcase struct {
tms uint64
thr Throttler
acts []Runnable
pres []Runnable
ctxs []context.Context
errs []error
durs []time.Duration
idx int64
over bool
tms uint64 // number of sub runs inside one case
thr Throttler // throttler itself
acts []Runnable // actions that need to be throttled
pres []Runnable // actions that neeed to be run before throttle
tss []time.Duration // timestamps that needs to be applied to contexts set
ctxs []context.Context // contexts set for throttling
errs []error // expected throttler errors
durs []time.Duration // expected throttler durations
idx uint64 // carries seq number of sub run execution
over bool // if throttler needs to be over released
}

func (t *tcase) run(index int) (err error, dur time.Duration) {
Expand All @@ -51,17 +53,21 @@ func (t *tcase) run(index int) (err error, dur time.Duration) {
func() {
defer func() {
if msg := recover(); msg != nil {
atomic.AddInt64(&t.idx, 1)
atomicIncr(&t.idx)
err = errors.New(msg.(string))
}
}()
ts = time.Now()
// force strict acquire order
for index != int(atomic.LoadInt64(&t.idx)) {
for index != int(atomicGet(&t.idx)) {
time.Sleep(time.Microsecond)
}
// set additional timestamp only if present
if index < len(t.tss) {
ctx = WithTimestamp(ctx, time.Now().Add(t.tss[index]))
}
err = t.thr.Acquire(ctx)
atomic.AddInt64(&t.idx, 1)
atomicIncr(&t.idx)
}()
dur = time.Since(ts)
// run additional action only if present
Expand Down Expand Up @@ -272,21 +278,16 @@ func TestThrottlerPattern(t *testing.T) {
tms: 6,
thr: NewThrottlerTimed(
2,
ms1_0,
0,
ms2_0,
ms0_0,
),
acts: []Runnable{
delayed(ms1_0, nope),
delayed(ms1_0, nope),
delayed(ms1_0, nope),
},
pres: []Runnable{
nil,
nil,
nil,
nil,
delayed(ms2_0, nope),
delayed(ms2_0, nope),
delayed(ms3_0, nope),
delayed(ms3_0, nope),
},
errs: []error{
nil,
Expand All @@ -301,21 +302,16 @@ func TestThrottlerPattern(t *testing.T) {
tms: 6,
thr: NewThrottlerTimed(
2,
ms4_0,
ms2_0,
ms1_0,
),
acts: []Runnable{
delayed(ms1_0, nope),
delayed(ms1_0, nope),
delayed(ms1_0, nope),
},
pres: []Runnable{
nil,
nil,
nil,
delayed(ms1_5, nope),
delayed(ms3_0, nope),
nil,
delayed(ms2_0, nope),
delayed(ms7_0, nope),
},
errs: []error{
nil,
Expand Down Expand Up @@ -406,10 +402,10 @@ func TestThrottlerPattern(t *testing.T) {
"Throttler latency should throttle on latency above threshold": {
tms: 3,
thr: NewThrottlerLatency(ms0_9, ms5_0),
ctxs: []context.Context{
WithTimestamp(context.Background(), time.Now().Add(-ms5_0)),
context.Background(),
context.Background(),
tss: []time.Duration{
-ms5_0,
ms0_0,
ms0_0,
},
errs: []error{
nil,
Expand All @@ -420,15 +416,15 @@ func TestThrottlerPattern(t *testing.T) {
"Throttler latency should not throttle on latency above threshold after retention": {
tms: 3,
thr: NewThrottlerLatency(ms0_9, ms3_0),
ctxs: []context.Context{
WithTimestamp(context.Background(), time.Now().Add(-ms5_0)),
context.Background(),
context.Background(),
tss: []time.Duration{
-ms5_0,
ms0_0,
ms0_0,
},
pres: []Runnable{
nil,
nil,
delayed(ms5_0, nope),
delayed(ms9_0, nope),
},
errs: []error{
nil,
Expand All @@ -438,13 +434,12 @@ func TestThrottlerPattern(t *testing.T) {
},
"Throttler percentile should throttle on latency above threshold": {
tms: 5,
thr: NewThrottlerPercentile(ms3_0, 0.5, ms5_0),
ctxs: []context.Context{
context.Background(),
WithTimestamp(context.Background(), time.Now().Add(-ms5_0)),
WithTimestamp(context.Background(), time.Now().Add(-ms5_0)),
WithTimestamp(context.Background(), time.Now().Add(-ms1_0)),
context.Background(),
thr: NewThrottlerPercentile(ms3_0, 0.5, ms7_0),
tss: []time.Duration{
ms0_0,
-ms5_0,
-ms5_0,
-ms1_0,
},
errs: []error{
nil,
Expand All @@ -457,12 +452,11 @@ func TestThrottlerPattern(t *testing.T) {
"Throttler percentile should throttle on latency above threshold after retention": {
tms: 5,
thr: NewThrottlerPercentile(ms3_0, 1.5, ms5_0),
ctxs: []context.Context{
context.Background(),
WithTimestamp(context.Background(), time.Now().Add(-ms5_0)),
WithTimestamp(context.Background(), time.Now().Add(-ms5_0)),
WithTimestamp(context.Background(), time.Now().Add(-ms1_0)),
context.Background(),
tss: []time.Duration{
ms0_0,
-ms5_0,
-ms5_0,
-ms1_0,
},
pres: []Runnable{
nil,
Expand Down Expand Up @@ -723,11 +717,11 @@ func TestThrottlerPattern(t *testing.T) {
}
for tname, tcase := range table {
t.Run(tname, func(t *testing.T) {
var index int64
var index uint64
for i := 0; i < int(tcase.tms); i++ {
t.Run(fmt.Sprintf("run %d", i+1), func(t *testing.T) {
t.Parallel()
index := int(atomic.AddInt64(&index, 1) - 1)
index := int(atomicIncr(&index) - 1)
resErr, resDur := tcase.result(index)
err, dur := tcase.run(index)
assert.Equal(t, resErr, err)
Expand Down

0 comments on commit 90a41af

Please sign in to comment.