-
Notifications
You must be signed in to change notification settings - Fork 893
/
middleware.go
48 lines (40 loc) · 1.22 KB
/
middleware.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package p2p
import (
"sync/atomic"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/network"
)
var log = logging.Logger("shrex/middleware")
type Middleware struct {
// concurrencyLimit is the maximum number of requests that can be processed at once.
concurrencyLimit int64
// parallelRequests is the number of requests currently being processed.
parallelRequests atomic.Int64
// numRateLimited is the number of requests that were rate limited.
numRateLimited atomic.Int64
}
func NewMiddleware(concurrencyLimit int) *Middleware {
return &Middleware{
concurrencyLimit: int64(concurrencyLimit),
}
}
// DrainCounter returns the current value of the rate limit counter and resets it to 0.
func (m *Middleware) DrainCounter() int64 {
return m.numRateLimited.Swap(0)
}
func (m *Middleware) RateLimitHandler(handler network.StreamHandler) network.StreamHandler {
return func(stream network.Stream) {
current := m.parallelRequests.Add(1)
defer m.parallelRequests.Add(-1)
if current > m.concurrencyLimit {
m.numRateLimited.Add(1)
log.Debug("concurrency limit reached")
err := stream.Close()
if err != nil {
log.Debugw("server: closing stream", "err", err)
}
return
}
handler(stream)
}
}