Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add throttler generator #7

Merged
merged 2 commits into from
Feb 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Throttler interface {
```
`Throttler` interface exposes pair of counterpart methods: `Acquire` takes a part of *throttling quota* or returns error if *throttling quota* is drained and needs to be called right before shared resource acquire; `Release` puts a part of *throttling quota* back or returns error if this is not possible and needs to be called just after shared resource release; **Note:** all derived throttler implementations are thread safe, so they could be used concurrently without additional locking. **Note:** all acquired throttlers should be released exatly the same amount of times they have been acquired. **Note:** despite throttler `Release` method has the same signature as `Acquire` has, `Release` implementations should try to handle any internal error gracefully and return error back rarely, nevertheless all errors returned by `Release` should be handeled by client.

In Gohalt throtllers could be easily combined with each other to build complex pipelines. There are multiple composite throttlers (all, any, ring, pattern, not, etc) as well as leaf throttlers (timed, latency, monitor, metric, percentile, etc) to work with in Gohalt. If you don't find in [existing throttlers](#Throttlers) the one that fits your needs you can create custom throttler by implementing `Throttler` interface. Such custom throttler should work with existing Gohalt throttlers and tools out of box.
In Gohalt throtllers could be easily combined with each other to build complex pipelines. There are multiple composite throttlers (all, any, ring, pattern, not, generator, etc) as well as leaf throttlers (timed, latency, monitor, metric, percentile, etc) to work with in Gohalt. If you don't find in [existing throttlers](#Throttlers) the one that fits your needs you can create custom throttler by implementing `Throttler` interface. Such custom throttler should work with existing Gohalt throttlers and tools out of box.

Gohalt includes multiple supporting surrounding tools to make throttling more sugary.
```go
Expand Down Expand Up @@ -80,7 +80,7 @@ func WithTimestamp(ctx context.Context, ts time.Time) context.Context
func WithPriority(ctx context.Context, priority uint8) context.Context
// WithKey adds the provided key to the provided context
// to add additional call identifier to context.
// Resulted context is used by: `pattern` throtttler.
// Resulted context is used by: `pattern` and `generator` throtttlers.
func WithKey(ctx context.Context, key string) context.Context
// WithMessage adds the provided message to the provided context
// to add additional message that need to be used to context.
Expand Down Expand Up @@ -143,15 +143,15 @@ You can find list of returning error types for all existing throttlers in thrott
| echo | `func NewThrottlerEcho(err error) Throttler` | Always throttles with the specified error back.<br> - could return any specified error; |
| wait | `func NewThrottlerWait(duration time.Duration) Throttler` | Always waits for the specified duration. |
| square | `func NewThrottlerSquare(duration time.Duration, limit time.Duration, reset bool) Throttler` | Always waits for square growing *[1, 4, 9, 16, ...]* multiplier on the specified initial duration, up until the specified duration limit is reached.<br> If reset is set then after throttler riches the specified duration limit next multiplier value will be reseted. |
| jitter | `func NewThrottlerJitter(initial time.Duration, limit time.Duration, reset bool, jitter float64) Throttler` | Waits accordingly to undelying square throttler but also adds the provided jitter delta distribution on top.<br> Jitter value is normalized to [0.0, 1.0] range and defines which part of square delay could be randomized in percents.<br> Implementation uses `math/rand` as PRNG function and expects rand seeding by a client. |
| jitter | `func NewThrottlerJitter(initial time.Duration, limit time.Duration, reset bool, jitter float64) Throttler` | Waits accordingly to undelying square throttler but also adds the provided jitter delta distribution on top.<br> Jitter value is normalized to [0.0, 1.0] range and defines which part of square delay could be randomized in percents.<br> Implementation uses secure `crypto/rand` as PRNG function. |
| context | `func NewThrottlerContext() Throttler` | Always throttless on *done* context.<br> - could return `ErrorInternal`; |
| panic | `func NewThrottlerPanic() Throttler` | Always panics with `ErrorInternal`. |
| each | `func NewThrottlerEach(threshold uint64) Throttler` | Throttles each periodic *i-th* call defined by the specified threshold.<br> - could return `ErrorThreshold`; |
| before | `func NewThrottlerBefore(threshold uint64) Throttler` | Throttles each call below the *i-th* call defined by the specified threshold.<br> - could return `ErrorThreshold`; |
| after | `func NewThrottlerAfter(threshold uint64) Throttler` | Throttles each call after the *i-th* call defined by the specified threshold.<br> - could return `ErrorThreshold`; |
| past | `func NewThrottlerPast(threshold time.Time) Throttler` | Throttles each call befor timestamp defined by the specified UTC time threshold.<br> - could return `ErrorThreshold`; |
| future | `func NewThrottlerFuture(threshold time.Time) Throttler` | Throttles each call after timestamp defined by the specified UTC time threshold.<br> - could return `ErrorThreshold`; |
| chance | `func NewThrottlerChance(threshold float64) Throttler` | Throttles each call with the chance *p* defined by the specified threshold.<br> Chance value is normalized to *[0.0, 1.0]* range.<br> Implementation uses `math/rand` as PRNG function and expects rand seeding by a client.<br> - could return `ErrorThreshold`; |
| chance | `func NewThrottlerChance(threshold float64) Throttler` | Throttles each call with the chance *p* defined by the specified threshold.<br> Chance value is normalized to *[0.0, 1.0]* range.<br> Implementation uses secure `crypto/rand` as PRNG function.<br> - could return `ErrorThreshold`; |
| running | `func NewThrottlerRunning(threshold uint64) Throttler` | Throttles each call which exeeds the running quota *acquired - release* *q* defined by the specified threshold.<br> - could return `ErrorThreshold`; |
| buffered | `func NewThrottlerBuffered(threshold uint64) Throttler` | Waits on call which exeeds the running quota *acquired - release* *q* defined by the specified threshold until the running quota is available again. |
| priority | `func NewThrottlerPriority(threshold uint64, levels uint8) Throttler` | Waits on call which exeeds the running quota *acquired - release* *q* defined by the specified threshold until the running quota is available again.<br> Running quota is not equally distributed between *n* levels of priority defined by the specified levels.<br> Use `func WithPriority(ctx context.Context, priority uint8) context.Context` to override context call priority, *1* by default. |
Expand All @@ -170,6 +170,7 @@ You can find list of returning error types for all existing throttlers in thrott
| suppress | `func NewThrottlerSuppress(thr Throttler) Throttler` | Suppresses provided throttler to never throttle. |
| retry | `func NewThrottlerRetry(thr Throttler, retries uint64) Throttler` | Retries provided throttler error up until the provided retries threshold.<br> If provided onthreshold flag is set even `ErrorThreshold` errors will be retried.<br> Internally retry uses square throttler with `DefaultRetriedDuration` initial duration.<br> - could return any underlying throttler error; |
| cache | `func NewThrottlerCache(thr Throttler, cache time.Duration) Throttler` | Caches provided throttler calls for the provided cache duration, throttler release resulting resets cache.<br> Only non throttling calls are cached for the provided cache duration.<br> - could return any underlying throttler error; |
| generator | `func NewThrottlerGenerator(gen Generator, capacity uint64, eviction float64) Throttler` | Creates new throttler instance that throttles if found key matching throttler throttles.<br> If no key matching throttler has been found generator used insted to provide new throttler that will be added to existing throttlers map.<br> Generated throttlers are kept in bounded map with capacity *c* defined by the specified capacity and eviction rate *e* defined by specified eviction value is normalized to [0.0, 1.0], where eviction rate affects number of throttlers that will be removed from the map after bounds overflow.<br> Use `WithKey` to specify key for throttler matching and generation.<br> - could return `ErrorInternal`;<br> - could return any underlying throttler error; |

## Integrations

Expand Down
2 changes: 1 addition & 1 deletion context.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func ctxPriority(ctx context.Context, limit uint8) uint8 {

// WithKey adds the provided key to the provided context
// to add additional call identifier to context.
// Resulted context is used by: `pattern` throtttler.
// Resulted context is used by: `pattern` and `generator` throtttlers.
func WithKey(ctx context.Context, key string) context.Context {
return context.WithValue(ctx, ghctxkey, key)
}
Expand Down
5 changes: 5 additions & 0 deletions generators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package gohalt

// Generator defines func signature that is able
// to generate new throttlers by provided key.
type Generator func(string) (Throttler, error)
15 changes: 15 additions & 0 deletions math.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package gohalt

import (
"crypto/rand"
"math"
"math/big"
)

func rndf64(fallback float64) float64 {
rnd, err := rand.Int(rand.Reader, big.NewInt(math.MaxInt64))
if err != nil {
return fallback
}
return float64(rnd.Int64()) / math.MaxInt64
}
74 changes: 69 additions & 5 deletions throttlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package gohalt
import (
"context"
"math"
"math/rand"
"regexp"
"sync"
"time"
Expand Down Expand Up @@ -115,7 +114,7 @@ type tjitter struct {
// adds the provided jitter delta distribution on top.
// Jitter value is normalized to [0.0, 1.0] range and defines
// which part of square delay could be randomized in percents.
// Implementation uses `math/rand` as PRNG function and expects rand seeding by a client.
// Implementation uses secure `crypto/rand` as PRNG function.
func NewThrottlerJitter(initial time.Duration, limit time.Duration, reset bool, jitter float64) Throttler {
jitter = math.Abs(jitter)
if jitter > 1.0 {
Expand All @@ -136,7 +135,7 @@ func (thr *tjitter) Acquire(ctx context.Context) error {
}
}
base := float64(duration) * thr.jitter
side := (float64(duration) - base) * rand.Float64()
side := (float64(duration) - base) * rndf64(1.0)
return sleep(ctx, time.Duration(base+side))
}

Expand Down Expand Up @@ -320,7 +319,7 @@ type tchance struct {
// NewThrottlerChance creates new throttler instance that
// throttles each call with the chance p defined by the specified threshold.
// Chance value is normalized to [0.0, 1.0] range.
// Implementation uses `math/rand` as PRNG function and expects rand seeding by a client.
// Implementation uses secure `crypto/rand` as PRNG function.
// - could return `ErrorThreshold`;
func NewThrottlerChance(threshold float64) Throttler {
threshold = math.Abs(threshold)
Expand All @@ -331,7 +330,7 @@ func NewThrottlerChance(threshold float64) Throttler {
}

func (thr tchance) Acquire(context.Context) error {
if thr.threshold > 1.0-rand.Float64() {
if thr.threshold > 1.0-rndf64(0.0) {
return ErrorThreshold{
Throttler: "chance",
Threshold: strpercent(thr.threshold),
Expand Down Expand Up @@ -1009,3 +1008,68 @@ func (thr tcache) Release(ctx context.Context) error {
_ = thr.reset(ctx)
return nil
}

type tgenerator struct {
gen Generator
thrs sync.Map
size uint64
capacity uint64
evict Runnable
}

// NewThrottlerGenerator creates new throttler instance that
// throttles if found key matching throttler throttles.
// If no key matching throttler has been found generator see `Generator` used insted
// to provide new throttler that will be added to existing throttlers map.
// Generated throttlers are kept in bounded map with capacity c defined by the specified capacity
// and eviction rate e defined by specified eviction value is normalized to [0.0, 1.0], where eviction rate affects number of
// throttlers that will be removed from the map after bounds overflow.
// Use `WithKey` to specify key for throttler matching and generation.
// - could return `ErrorInternal`;
// - could return any underlying throttler error;
func NewThrottlerGenerator(gen Generator, capacity uint64, eviction float64) Throttler {
thr := &tgenerator{gen: gen, capacity: capacity}
eviction = math.Abs(eviction)
if eviction > 1.0 {
eviction = 1.0
}
num := uint64(math.Ceil(float64(capacity) * eviction))
thr.evict = locked(func(c context.Context) error {
var i uint64
thr.thrs.Range(func(key interface{}, _ interface{}) bool {
thr.thrs.Delete(key)
atomicBDecr(&thr.size)
i++
return i < num
})
return nil
})
return thr
}

func (thr *tgenerator) Acquire(ctx context.Context) error {
key := ctxKey(ctx)
if thr, ok := thr.thrs.Load(key); ok {
return thr.(Throttler).Acquire(ctx)
}
gthr, err := thr.gen(key)
if err != nil {
return ErrorInternal{
Throttler: "generator",
Message: err.Error(),
}
}
if size := atomicGet(&thr.size) + 1; size > thr.capacity {
gorun(ctx, thr.evict)
}
thr.thrs.Store(key, gthr)
atomicBIncr(&thr.size)
return gthr.Acquire(ctx)
}

func (thr *tgenerator) Release(ctx context.Context) error {
if thr, ok := thr.thrs.Load(ctxKey(ctx)); ok {
return thr.(Throttler).Release(ctx)
}
return nil
}
93 changes: 93 additions & 0 deletions throttlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,99 @@ func TestThrottlers(t *testing.T) {
},
},
},
"Throttler generator should throttle on generator error": {
tms: 3,
thr: NewThrottlerGenerator(
func(string) (Throttler, error) {
return NewThrottlerEcho(nil), testerr
},
10,
0.1,
),
ctxs: []context.Context{
WithKey(context.Background(), "test"),
WithKey(context.Background(), "nontest"),
WithKey(context.Background(), "111"),
},
errs: []error{
ErrorInternal{
Throttler: "generator",
Message: testerr.Error(),
},
ErrorInternal{
Throttler: "generator",
Message: testerr.Error(),
},
ErrorInternal{
Throttler: "generator",
Message: testerr.Error(),
},
},
},
"Throttler generator should throttle on matching throttler key": {
tms: 5,
thr: NewThrottlerGenerator(
func(string) (Throttler, error) {
return NewThrottlerAfter(1), nil
},
10,
0.1,
),
ctxs: []context.Context{
WithKey(context.Background(), "125"),
WithKey(context.Background(), "125"),
WithKey(context.Background(), "test"),
WithKey(context.Background(), "nontest"),
WithKey(context.Background(), "125"),
},
errs: []error{
nil,
ErrorThreshold{
Throttler: "after",
Threshold: strpair{current: 2, threshold: 1},
},
nil,
nil,
ErrorThreshold{
Throttler: "after",
Threshold: strpair{current: 3, threshold: 1},
},
},
},
"Throttler generator should evict throttlers on bounds overflow pattern": {
tms: 7,
thr: NewThrottlerGenerator(
func(string) (Throttler, error) {
return NewThrottlerAfter(1), nil
},
2,
1000,
),
ctxs: []context.Context{
WithKey(context.Background(), "111"),
WithKey(context.Background(), "test"),
WithKey(context.Background(), "111"),
WithKey(context.Background(), "test1"),
WithKey(context.Background(), "kkk"),
WithKey(context.Background(), "kkk"),
WithKey(context.Background(), "test2"),
},
errs: []error{
nil,
nil,
ErrorThreshold{
Throttler: "after",
Threshold: strpair{current: 2, threshold: 1},
},
nil,
nil,
ErrorThreshold{
Throttler: "after",
Threshold: strpair{current: 2, threshold: 1},
},
nil,
},
},
}
for tname, ptrtcase := range table {
t.Run(tname, func(t *testing.T) {
Expand Down