/
slide_window.go
70 lines (60 loc) · 2.66 KB
/
slide_window.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package ratelimit
import (
"container/list"
"context"
"google.golang.org/grpc"
"sync"
"time"
)
// SlideWindowLimiterOptions defines the function signature for configuration options of SlideWindowLimiter.
type SlideWindowLimiterOptions func(l *SlideWindowLimiter)
// SlideWindowLimiter implements rate limiting using a sliding window approach.
type SlideWindowLimiter struct {
queue *list.List // Queue holds the timestamps of incoming requests.
interval int64 // Interval defines the sliding window width in nanoseconds.
rate int // Rate defines the maximum number of requests allowed within the interval.
mutex sync.Mutex // Mutex ensures atomic access to the queue.
onReject rejectStrategy // OnReject is a strategy executed when a request is rejected.
}
// NewSlideWindowLimiter returns a new instance of SlideWindowLimiter with the specified interval and rate.
func NewSlideWindowLimiter(interval time.Duration, rate int) *SlideWindowLimiter {
return &SlideWindowLimiter{
queue: list.New(),
interval: interval.Nanoseconds(),
rate: rate,
onReject: defaultRejectStrategy, // Set the default reject strategy on new SlideWindowLimiter.
}
}
// SlideWindowMarkFailed provides an option to set the markFailedStrategy as the reject strategy.
func SlideWindowMarkFailed() SlideWindowLimiterOptions {
return func(l *SlideWindowLimiter) {
l.mutex.Lock() // Protect access to onReject to ensure thread safety.
defer l.mutex.Unlock() // Defer unlock so it executes regardless of where the method exits.
l.onReject = markFailedStrategy
}
}
// LimitUnary returns a grpc.UnaryServerInterceptor that imposes rate limiting based on the sliding window mechanism.
func (l *SlideWindowLimiter) LimitUnary() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
now := time.Now().UnixNano()
boundary := now - l.interval
l.mutex.Lock()
defer l.mutex.Unlock() // Use defer to ensure the mutex is always unlocked after locking.
// Remove older timestamps from the queue which are outside of the sliding window.
for element := l.queue.Front(); element != nil; {
next := element.Next() // Save next element because current might get removed.
if element.Value.(int64) < boundary {
l.queue.Remove(element)
}
element = next // Move to next saved element.
}
if l.queue.Len() < l.rate {
// There is room in the sliding window for this request.
l.queue.PushBack(now)
resp, err = handler(ctx, req)
return
}
// Exceeded the rate limit, execute the reject strategy.
return l.onReject(ctx, req, info, handler)
}
}