Skip to content

Commit

Permalink
use limit script at batch limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
da440dil committed Dec 18, 2021
1 parent 08f4b61 commit 97ca004
Show file tree
Hide file tree
Showing 10 changed files with 468 additions and 279 deletions.
46 changes: 46 additions & 0 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,49 @@ func BenchmarkCounter(b *testing.B) {
})
}
}

func BenchmarkLimiter(b *testing.B) {
client := redis.NewClient(&redis.Options{})
defer client.Close()

size := 10 * time.Second
limit := uint(10000)
tests := map[string]Limiter{
"One": NewLimiter(
client,
WithLimiter(size, limit),
),
"Two": NewLimiter(
client,
WithLimiter(size, limit),
WithLimiter(size*2, limit*2),
),
"Three": NewLimiter(
client,
WithLimiter(size, limit),
WithLimiter(size*2, limit*2),
WithLimiter(size*3, limit*3),
),
"Four": NewLimiter(
client,
WithLimiter(size, limit),
WithLimiter(size*2, limit*2),
WithLimiter(size*3, limit*3),
WithLimiter(size*4, limit*4),
),
}

ctx := context.Background()
key := "key"
for name, tc := range tests {
b.Run(name, func(b *testing.B) {
err := client.Del(ctx, key).Err()
if err != nil {
b.Fatal(err)
}
for i := 0; i < b.N; i++ {
tc.Limit(ctx, key)
}
})
}
}
127 changes: 8 additions & 119 deletions counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import (
"context"
_ "embed"
"errors"
"math/rand"
"strconv"
"sync"
"time"

"github.com/go-redis/redis/v8"
Expand All @@ -25,7 +22,7 @@ type RedisClient interface {
type Result struct {
counter int64
ttl int64
limit int
limit int64
}

// OK is operation success flag.
Expand All @@ -34,13 +31,13 @@ func (r Result) OK() bool {
}

// Counter is current counter value.
func (r Result) Counter() int {
return int(r.counter)
func (r Result) Counter() int64 {
return r.counter
}

// Remainder is diff between limit and current counter value.
func (r Result) Remainder() int {
return r.limit - int(r.counter)
func (r Result) Remainder() int64 {
return r.limit - r.counter
}

// TTL of the current window.
Expand All @@ -56,8 +53,8 @@ var ErrUnexpectedRedisResponse = errors.New("counter: unexpected redis response"
type Counter struct {
client RedisClient
script *redis.Script
limit int64
size int
limit int
}

// Count increments key by value.
Expand All @@ -83,9 +80,6 @@ func (c *Counter) Count(ctx context.Context, key string, value int) (Result, err
return r, ErrUnexpectedRedisResponse
}
r.limit = c.limit
if r.ttl == -2 {
r.ttl = 0
}
return r, nil
}

Expand All @@ -95,7 +89,7 @@ var fwscr = redis.NewScript(fwsrc)

// FixedWindow creates new counter which implements distributed counter using fixed window algorithm.
func FixedWindow(client RedisClient, size time.Duration, limit uint) *Counter {
return &Counter{client, fwscr, int(size / time.Millisecond), int(limit)}
return &Counter{client: client, script: fwscr, size: int(size / time.Millisecond), limit: int64(limit)}
}

//go:embed slidingwindow.lua
Expand All @@ -104,110 +98,5 @@ var swscr = redis.NewScript(swsrc)

// SlidingWindow creates new counter which implements distributed counter using sliding window algorithm.
func SlidingWindow(client RedisClient, size time.Duration, limit uint) *Counter {
return &Counter{client, swscr, int(size / time.Millisecond), int(limit)}
}

// Limiter implements distributed rate limiting.
type Limiter interface {
// Limit applies the limit: increments key value of each distributed counter.
Limit(ctx context.Context, key string) (Result, error)
}

var random *rand.Rand

func init() {
random = rand.New(rand.NewSource(time.Now().UnixNano()))
}

// NewLimiter creates new limiter which implements distributed rate limiting.
// Each limiter is created with pseudo-random name which may be set with options, every Redis key will be prefixed with this name.
// The rate of decreasing the window size on each next limiter call by default equal 1, may be set with options.
func NewLimiter(c *Counter, options ...func(*limiter)) Limiter {
lt := &limiter{c, strconv.Itoa(random.Int()) + ":", 1}
for _, option := range options {
option(lt)
}
return lt
}

// WithLimiterName sets unique limiter name.
func WithLimiterName(name string) func(*limiter) {
return func(lt *limiter) {
lt.prefix = name + ":"
}
}

// WithLimiterRate sets limiter rate of decreasing the window size on each next limiter call.
func WithLimiterRate(rate uint) func(*limiter) {
return func(lt *limiter) {
lt.rate = int(rate)
}
}

// NewLimiterSuite creates new limiter suite which contains two or more limiters which run concurently on every limiter suite call.
func NewLimiterSuite(v1 Limiter, v2 Limiter, vs ...Limiter) Limiter {
lts := append([]Limiter{v1, v2}, vs...)
return &limiters{lts: lts, size: len(lts)}
}

type limiter struct {
counter *Counter
prefix string
rate int
}

func (lt *limiter) Limit(ctx context.Context, key string) (Result, error) {
return lt.counter.Count(ctx, lt.prefix+key, lt.rate)
}

type limiters struct {
lts []Limiter
wg sync.WaitGroup
mu sync.Mutex
size int
}

const maxInt = int(^uint(0) >> 1)

func (ls *limiters) Limit(ctx context.Context, key string) (Result, error) {
results := make([]result, ls.size)

ls.mu.Lock()
ls.wg.Add(ls.size)
for i := 0; i < ls.size; i++ {
go func(i int) {
defer ls.wg.Done()
r, err := ls.lts[i].Limit(ctx, key)
results[i] = result{r, err}
}(i)
}
ls.wg.Wait()
ls.mu.Unlock()

r := Result{0, int64(-1), maxInt}
for i := 0; i < ls.size; i++ {
v := results[i]
if v.err != nil {
return r, v.err
}
if v.result.OK() {
if r.OK() && r.Remainder() > v.result.Remainder() { // minimal remainder
r = v.result
}
continue
}
if r.OK() { // not ok first time
r = v.result
continue
}
if r.TTL() < v.result.TTL() { // maximum TTL
r = v.result
}
}
return r, nil
}

type result struct {
result Result
err error
return &Counter{client: client, script: swscr, size: int(size / time.Millisecond), limit: int64(limit)}
}
Loading

0 comments on commit 97ca004

Please sign in to comment.