/
handler.go
157 lines (143 loc) · 3.51 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package proxy
import (
"io"
"net"
"sync"
"sync/atomic"
"time"
"overlord/lib/log"
libnet "overlord/lib/net"
"overlord/lib/prom"
"overlord/proto"
"overlord/proto/memcache"
mcbin "overlord/proto/memcache/binary"
"overlord/proto/redis"
rclstr "overlord/proto/redis/cluster"
)
const (
handlerOpening = int32(0)
handlerClosed = int32(1)
)
// variables need to change
var (
// TODO: config and reduce to small
concurrent = 2
maxConcurrent = 1024
)
// Handler handle conn.
type Handler struct {
p *Proxy
cc *ClusterConfig
forwarder proto.Forwarder
conn *libnet.Conn
pc proto.ProxyConn
closed int32
err error
}
// NewHandler new a conn handler.
func NewHandler(p *Proxy, cc *ClusterConfig, conn net.Conn, forwarder proto.Forwarder) (h *Handler) {
h = &Handler{
p: p,
cc: cc,
forwarder: forwarder,
}
h.conn = libnet.NewConn(conn, time.Second*time.Duration(h.p.c.Proxy.ReadTimeout), time.Second*time.Duration(h.p.c.Proxy.WriteTimeout))
// cache type
switch cc.CacheType {
case proto.CacheTypeMemcache:
h.pc = memcache.NewProxyConn(h.conn)
case proto.CacheTypeMemcacheBinary:
h.pc = mcbin.NewProxyConn(h.conn)
case proto.CacheTypeRedis:
h.pc = redis.NewProxyConn(h.conn)
case proto.CacheTypeRedisCluster:
h.pc = rclstr.NewProxyConn(h.conn, forwarder)
default:
panic(proto.ErrNoSupportCacheType)
}
prom.ConnIncr(cc.Name)
return
}
// Handle reads Msg from client connection and dispatchs Msg back to cache servers,
// then reads response from cache server and writes response into client connection.
func (h *Handler) Handle() {
go h.handle()
}
func (h *Handler) handle() {
var (
messages []*proto.Message
msgs []*proto.Message
wg = &sync.WaitGroup{}
err error
)
messages = h.allocMaxConcurrent(wg, messages, len(msgs))
for {
// 1. read until limit or error
if msgs, err = h.pc.Decode(messages); err != nil {
h.deferHandle(messages, err)
return
}
// 2. send to cluster
h.forwarder.Forward(msgs)
wg.Wait()
// 3. encode
for _, msg := range msgs {
if err = h.pc.Encode(msg); err != nil {
h.pc.Flush()
h.deferHandle(messages, err)
return
}
msg.MarkEnd()
msg.ResetSubs()
if prom.On {
prom.ProxyTime(h.cc.Name, msg.Request().CmdString(), int64(msg.TotalDur()/time.Microsecond))
}
}
if err = h.pc.Flush(); err != nil {
h.deferHandle(messages, err)
return
}
// 4. release resource
for _, msg := range msgs {
msg.Reset()
}
// 5. alloc MaxConcurrent
messages = h.allocMaxConcurrent(wg, messages, len(msgs))
}
}
func (h *Handler) allocMaxConcurrent(wg *sync.WaitGroup, msgs []*proto.Message, lastCount int) []*proto.Message {
var alloc int
if lm := len(msgs); lm == 0 {
alloc = concurrent
} else if lm < maxConcurrent && lm == lastCount {
alloc = lm * concurrent
}
if alloc > 0 {
proto.PutMsgs(msgs)
msgs = proto.GetMsgs(alloc) // TODO: change the msgs by lastCount trending
for _, msg := range msgs {
msg.WithWaitGroup(wg)
}
}
return msgs
}
func (h *Handler) deferHandle(msgs []*proto.Message, err error) {
proto.PutMsgs(msgs)
h.closeWithError(err)
return
}
func (h *Handler) closeWithError(err error) {
if atomic.CompareAndSwapInt32(&h.closed, handlerOpening, handlerClosed) {
h.err = err
_ = h.conn.Close()
atomic.AddInt32(&h.p.conns, -1) // NOTE: decr!!!
if prom.On {
prom.ConnDecr(h.cc.Name)
}
if log.V(2) {
if err != io.EOF {
log.Warnf("cluster(%s) addr(%s) remoteAddr(%s) handler close error:%+v", h.cc.Name, h.cc.ListenAddr, h.conn.RemoteAddr(), err)
}
}
}
}