forked from envoyproxy/ratelimit
/
driver_impl.go
94 lines (80 loc) · 2.19 KB
/
driver_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package redis
import (
"github.com/lyft/gostats"
"github.com/lyft/ratelimit/src/assert"
"github.com/mediocregopher/radix.v2/pool"
"github.com/mediocregopher/radix.v2/redis"
logger "github.com/sirupsen/logrus"
)
type poolStats struct {
connectionActive stats.Gauge
connectionTotal stats.Counter
connectionClose stats.Counter
}
func newPoolStats(scope stats.Scope) poolStats {
ret := poolStats{}
ret.connectionActive = scope.NewGauge("cx_active")
ret.connectionTotal = scope.NewCounter("cx_total")
ret.connectionClose = scope.NewCounter("cx_local_close")
return ret
}
type poolImpl struct {
pool *pool.Pool
stats poolStats
}
type connectionImpl struct {
client *redis.Client
pending uint
}
type responseImpl struct {
response *redis.Resp
}
func checkError(err error) {
if err != nil {
panic(RedisError(err.Error()))
}
}
func (this *poolImpl) Get() Connection {
client, err := this.pool.Get()
checkError(err)
this.stats.connectionActive.Inc()
this.stats.connectionTotal.Inc()
return &connectionImpl{client, 0}
}
func (this *poolImpl) Put(c Connection) {
impl := c.(*connectionImpl)
this.stats.connectionActive.Dec()
if impl.pending == 0 {
this.pool.Put(impl.client)
} else {
// radix does not appear to track if we attempt to put a connection back with pipelined
// responses that have not been flushed. If we are in this state, just kill the connection
// and don't put it back in the pool.
impl.client.Close()
this.stats.connectionClose.Inc()
}
}
func NewPoolImpl(scope stats.Scope, socketType string, url string, poolSize int) Pool {
logger.Warnf("connecting to redis on %s %s with pool size %d", socketType, url, poolSize)
pool, err := pool.New(socketType, url, poolSize)
checkError(err)
return &poolImpl{
pool: pool,
stats: newPoolStats(scope)}
}
func (this *connectionImpl) PipeAppend(cmd string, args ...interface{}) {
this.client.PipeAppend(cmd, args...)
this.pending++
}
func (this *connectionImpl) PipeResponse() Response {
assert.Assert(this.pending > 0)
this.pending--
resp := this.client.PipeResp()
checkError(resp.Err)
return &responseImpl{resp}
}
func (this *responseImpl) Int() int64 {
i, err := this.response.Int64()
checkError(err)
return i
}