This repository has been archived by the owner on Aug 30, 2019. It is now read-only.
/
listener.go
80 lines (61 loc) · 2.24 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package api
import (
"errors"
"net"
"sync/atomic"
"time"
log "github.com/cihub/seelog"
)
// rateLimitedListener wraps a regular TCPListener with rate limiting.
type rateLimitedListener struct {
connLease int32 // How many connections are available for this listener before rate-limiting kicks in
*net.TCPListener
}
// newRateLimitedListener returns a new wrapped listener, which is non-initialized
func newRateLimitedListener(l net.Listener, conns int) (*rateLimitedListener, error) {
tcpL, ok := l.(*net.TCPListener)
if !ok {
return nil, errors.New("cannot wrap listener")
}
sl := &rateLimitedListener{connLease: int32(conns), TCPListener: tcpL}
return sl, nil
}
// Refresh periodically refreshes the connection lease, and thus cancels any rate limits in place
func (sl *rateLimitedListener) Refresh(conns int) {
for range time.Tick(30 * time.Second) {
atomic.StoreInt32(&sl.connLease, int32(conns))
log.Debugf("Refreshed the connection lease: %d conns available", conns)
}
}
// rateLimitedError indicates a user request being blocked by our rate limit
// It satisfies the net.Error interface
type rateLimitedError struct{}
// Error returns an error string
func (e *rateLimitedError) Error() string { return "request has been rate-limited" }
// Temporary tells the HTTP server loop that this error is temporary and recoverable
func (e *rateLimitedError) Temporary() bool { return true }
// Timeout tells the HTTP server loop that this error is not a timeout
func (e *rateLimitedError) Timeout() bool { return false }
// Accept reimplements the regular Accept but adds rate limiting.
func (sl *rateLimitedListener) Accept() (net.Conn, error) {
if atomic.LoadInt32(&sl.connLease) <= 0 {
// we've reached our cap for this lease period, reject the request
return nil, &rateLimitedError{}
}
for {
//Wait up to 1 second for Reads and Writes to the new connection
sl.SetDeadline(time.Now().Add(time.Second))
newConn, err := sl.TCPListener.Accept()
if err != nil {
netErr, ok := err.(net.Error)
//If this is a timeout, then continue to wait for
//new connections
if ok && netErr.Timeout() && netErr.Temporary() {
continue
}
}
// decrement available conns
atomic.AddInt32(&sl.connLease, -1)
return newConn, err
}
}