-
Notifications
You must be signed in to change notification settings - Fork 0
/
session.go
765 lines (697 loc) · 20.1 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
// Copyright 2016 CodisLabs. All Rights Reserved.
// Licensed under the MIT (MIT-LICENSE.txt) license.
package proxy
import (
"encoding/json"
"fmt"
"net"
"strconv"
"sync"
"time"
"github.com/CodisLabs/codis/pkg/models"
"github.com/CodisLabs/codis/pkg/proxy/redis"
"github.com/CodisLabs/codis/pkg/utils/errors"
"github.com/CodisLabs/codis/pkg/utils/log"
"github.com/CodisLabs/codis/pkg/utils/sync2/atomic2"
)
type Session struct {
// 与客户端的链接
Conn *redis.Conn
// 请求数量
Ops int64
// 上次创建时间
CreateUnix int64
// 上次请求时间
LastOpUnix int64
// 客户端当前select的database
database int32
quit bool
exit sync.Once
// 统计数据
stats struct {
opmap map[string]*opStats
total atomic2.Int64
fails atomic2.Int64
flush struct {
n uint
nano int64
}
}
start sync.Once
broken atomic2.Bool
config *Config
authorized bool
}
func (s *Session) String() string {
o := &struct {
Ops int64 `json:"ops"`
CreateUnix int64 `json:"create"`
LastOpUnix int64 `json:"lastop,omitempty"`
RemoteAddr string `json:"remote"`
}{
s.Ops, s.CreateUnix, s.LastOpUnix,
s.Conn.RemoteAddr(),
}
b, _ := json.Marshal(o)
return string(b)
}
/*
每接到一个redis请求,就创建一个独立的session进行处理(默认的每个session的tcp连接过期时间为75秒,也就是每个请求最多处理75秒)。
这里的第一个参数是net.Conn,Conn是一个通用的面向流的网络连接,多个goroutines可以同时调用Conn的方法
这里的net.Conn就是我们之前Proxy的lproxy这个Listener监听到的19000请求到来的时候返回的net.Conn
*/
func NewSession(sock net.Conn, config *Config) *Session {
// 创建redis连接
c := redis.NewConn(sock,
config.SessionRecvBufsize.AsInt(),
config.SessionSendBufsize.AsInt(),
)
c.ReaderTimeout = config.SessionRecvTimeout.Duration()
c.WriterTimeout = config.SessionSendTimeout.Duration()
c.SetKeepAlivePeriod(config.SessionKeepAlivePeriod.Duration())
s := &Session{
Conn: c, config: config,
CreateUnix: time.Now().Unix(),
}
s.stats.opmap = make(map[string]*opStats, 16)
log.Infof("session [%p] create: %s", s, s)
return s
}
func (s *Session) CloseReaderWithError(err error) error {
s.exit.Do(func() {
if err != nil {
log.Infof("session [%p] closed: %s, error: %s", s, s, err)
} else {
log.Infof("session [%p] closed: %s, quit", s, s)
}
})
return s.Conn.CloseReader()
}
func (s *Session) CloseWithError(err error) error {
s.exit.Do(func() {
if err != nil {
log.Infof("session [%p] closed: %s, error: %s", s, s, err)
} else {
log.Infof("session [%p] closed: %s, quit", s, s)
}
})
s.broken.Set(true)
return s.Conn.Close()
}
var (
ErrRouterNotOnline = errors.New("router is not online")
ErrTooManySessions = errors.New("too many sessions")
ErrTooManyPipelinedRequests = errors.New("too many pipelined requests")
)
var RespOK = redis.NewString([]byte("OK"))
func (s *Session) Start(d *Router) {
// 只会执行一次
s.start.Do(func() {
// 默认为1000
if int(incrSessions()) > s.config.ProxyMaxClients {
go func() {
s.Conn.Encode(redis.NewErrorf("ERR max number of clients reached"), true)
s.CloseWithError(ErrTooManySessions)
s.incrOpFails(nil, nil)
s.flushOpStats(true)
}()
decrSessions()
return
}
// Router是否在线
if !d.isOnline() {
go func() {
s.Conn.Encode(redis.NewErrorf("ERR router is not online"), true)
s.CloseWithError(ErrRouterNotOnline)
s.incrOpFails(nil, nil)
s.flushOpStats(true)
}()
decrSessions()
return
}
// 给RequestChan的buff中赋1024位的数组,并返回一个RequestChan
// serveProxy 线程创建, 后面loopWriter和loopReader线程使用
tasks := NewRequestChanBuffer(1024)
go func() {
// 从后端redis server的回复中拿到回复
// 负责合并请求结果 然后返回给客户端
s.loopWriter(tasks)
// alive session减一
decrSessions()
}()
go func() {
// 接收客户端(连接proxy真正的客户端用户)的请求
// 负责读取和分发请求到后端
s.loopReader(tasks, d)
// 所有请求取完或者proxy退出之后,上面的方法就会结束,关闭tasks这个requestChan
tasks.Close()
}()
})
}
//loopReader读取和分发请求到后端,关键代码就是handleRequest函数,传入的两个参数分别是d *Router和r := &Request{},也就是把结果存到task里面, 后面loopWriter会用到
func (s *Session) loopReader(tasks *RequestChan, d *Router) (err error) {
defer func() {
s.CloseReaderWithError(err)
}()
var (
breakOnFailure = s.config.SessionBreakOnFailure
maxPipelineLen = s.config.SessionMaxPipeline
)
// 循环处理前端请求
// session只要没有退出,就一直从conn中取请求,直到请求取完就return,然后会关闭tasks这个requestChan
// 生产线程为本线程, 在处理完客户端请求后将rsp放入chan中, write线程进行消费处理
for !s.quit {
//从与前端的redis连接中取出请求参数(from socket)
multi, err := s.Conn.DecodeMultiBulk()
if err != nil {
return err
}
if len(multi) == 0 {
continue
}
s.incrOpTotal()
//检测requestChan的data是否超过配置的每个pipeline最大请求长度,默认为10000
if tasks.Buffered() > maxPipelineLen {
return s.incrOpFails(nil, ErrTooManyPipelinedRequests)
}
start := time.Now()
s.LastOpUnix = start.Unix()
s.Ops++
// 构造出request结构
// 这个request贯穿整个流程..
r := &Request{}
//这个Multi非常重要,请求的参数就在里面,是一个[]*redis.Resp切片
r.Multi = multi
//WaitGroup的作用是,阻塞主线程的执行,一直等到所有的goroutine执行完成。
//每创建一个goroutine 就把任务队列中任务的数量+1,任务完成,将任务队列中的任务数量-1。有点类似于java里面的CountDownLatch
//这个Batch用于检测redis请求是否完成(完成的标志是BackendConn调用了setResponse 后端链接中调用的)
r.Batch = &sync.WaitGroup{}
r.Database = s.database
r.UnixNano = start.UnixNano()
// 处理实际请求->dispatch->forward->backConn.push_back(input)
if err := s.handleRequest(r, d); err != nil {
r.Resp = redis.NewErrorf("ERR handle request, %s", err)
tasks.PushBack(r)
if breakOnFailure {
return err
}
} else {
// 此时已经处理完毕,将resp设置为request的一个参数, 不一定处理完成哦
// 将request添加到当前Session中之前创建的RequestChan中
// loopWriter后面再遍历RequestChan取出所有请求及结果
tasks.PushBack(r)
}
}
return nil
}
// 合并请求的处理结果并返回给客户端
// 请求结果由BackendConn处理好之后,放在Request这个struct的*redis.Resp属性中,这里只需要把结果取出
// 可以看到,codis将请求与结果关联起来的方式,就是把结果当做request的一个属性
func (s *Session) loopWriter(tasks *RequestChan) (err error) {
defer func() {
s.CloseWithError(err)
tasks.PopFrontAllVoid(func(r *Request) {
s.incrOpFails(r, nil)
})
s.flushOpStats(true)
}()
var (
breakOnFailure = s.config.SessionBreakOnFailure
maxPipelineLen = s.config.SessionMaxPipeline
)
p := s.Conn.FlushEncoder()
p.MaxInterval = time.Millisecond
p.MaxBuffered = maxPipelineLen / 2
//前面我们在tasks.PushBack(r)中,将请求放入了data []*Request切片,现在就是取出最早的请求及其处理结果
//如果当前session的requestChan为空,就调用cond.wait让goroutine等待,直到调用pushback又放入请求为止
// TODO: 所以这里启新的协程了吗? 没有 每个客户端连接会在自己的线程中read/write
return tasks.PopFrontAll(func(r *Request) error {
// 拿取后端处理的结果, 如果还没处理完, 会将自己阻塞起, 本身是在自己连接的线程里...
resp, err := s.handleResponse(r)
if err != nil {
resp = redis.NewErrorf("ERR handle response, %s", err)
if breakOnFailure {
s.Conn.Encode(resp, true)
return s.incrOpFails(r, err)
}
}
if err := p.Encode(resp); err != nil {
return s.incrOpFails(r, err)
}
fflush := tasks.IsEmpty()
if err := p.Flush(fflush); err != nil {
return s.incrOpFails(r, err)
} else {
s.incrOpStats(r, resp.Type)
}
if fflush {
s.flushOpStats(false)
}
return nil
})
}
func (s *Session) handleResponse(r *Request) (*redis.Resp, error) {
// goroutine都会一直阻塞到所有请求处理完之后,前面我们已经看到,向RequestChan中添加一个请求的时候 request.Batch++
// 后面BackendConn调用setResponse,也就是完成处理的时候,又会调用Done方法减一
// 如果所有的请求处理完之后 这里wait会被唤醒
r.Batch.Wait()
// 处理完之后...
//如果是单个的请求,例如SET,这里就为空了
if r.Coalesce != nil {
//如果是MSET这种请求,就需要调用之前自定义的Coalesce方法,将请求合并之后再返回
if err := r.Coalesce(); err != nil {
return nil, err
}
}
if err := r.Err; err != nil {
return nil, err
} else if r.Resp == nil {
return nil, ErrRespIsRequired
}
return r.Resp, nil
}
// 实际处理请求的方法 同步方法吗??
// 异步的, 只负责将请求写入后端bc.input中, bc.loopWriter会被唤醒读出请求
func (s *Session) handleRequest(r *Request, d *Router) error {
//解析请求 opstr取决于具体的命令 比如说"SET"
opstr, flag, err := getOpInfo(r.Multi)
if err != nil {
return err
}
r.OpStr = opstr
r.OpFlag = flag
r.Broken = &s.broken
//有些命令不支持 直接会返回错误
if flag.IsNotAllowed() {
return fmt.Errorf("command '%s' is not allowed", opstr)
}
switch opstr {
case "QUIT":
return s.handleQuit(r)
case "AUTH":
return s.handleAuth(r)
}
if !s.authorized {
if s.config.SessionAuth != "" {
r.Resp = redis.NewErrorf("NOAUTH Authentication required")
return nil
}
s.authorized = true
}
//除了SELECT和default之外 都多传入了一个参数 也就是Router
switch opstr {
case "SELECT":
return s.handleSelect(r)
case "PING":
return s.handleRequestPing(r, d)
case "INFO":
return s.handleRequestInfo(r, d)
case "MGET":
return s.handleRequestMGet(r, d)
case "MSET":
return s.handleRequestMSet(r, d)
case "DEL":
return s.handleRequestDel(r, d)
case "EXISTS":
return s.handleRequestExists(r, d)
case "SLOTSINFO":
return s.handleRequestSlotsInfo(r, d)
case "SLOTSSCAN":
return s.handleRequestSlotsScan(r, d)
case "SLOTSMAPPING":
return s.handleRequestSlotsMapping(r, d)
default:
return d.dispatch(r)
}
}
func (s *Session) handleQuit(r *Request) error {
s.quit = true
r.Resp = RespOK
return nil
}
func (s *Session) handleAuth(r *Request) error {
if len(r.Multi) != 2 {
r.Resp = redis.NewErrorf("ERR wrong number of arguments for 'AUTH' command")
return nil
}
switch {
case s.config.SessionAuth == "":
r.Resp = redis.NewErrorf("ERR Client sent AUTH, but no password is set")
case s.config.SessionAuth != string(r.Multi[1].Value):
s.authorized = false
r.Resp = redis.NewErrorf("ERR invalid password")
default:
s.authorized = true
r.Resp = RespOK
}
return nil
}
func (s *Session) handleSelect(r *Request) error {
if len(r.Multi) != 2 {
r.Resp = redis.NewErrorf("ERR wrong number of arguments for 'SELECT' command")
return nil
}
//将字符串转换为十进制整数,从请求参数中取出是对哪个db进行操作
switch db, err := strconv.Atoi(string(r.Multi[1].Value)); {
case err != nil:
r.Resp = redis.NewErrorf("ERR invalid DB index")
case db < 0 || db >= int(s.config.BackendNumberDatabases):
r.Resp = redis.NewErrorf("ERR invalid DB index, only accept DB [0,%d)", s.config.BackendNumberDatabases)
default:
r.Resp = RespOK
s.database = int32(db)
}
return nil
}
func (s *Session) handleRequestPing(r *Request, d *Router) error {
var addr string
var nblks = len(r.Multi) - 1
switch {
case nblks == 0:
slot := uint32(time.Now().Nanosecond()) % MaxSlotNum
return d.dispatchSlot(r, int(slot))
default:
addr = string(r.Multi[1].Value)
copy(r.Multi[1:], r.Multi[2:])
r.Multi = r.Multi[:nblks]
}
if !d.dispatchAddr(r, addr) {
r.Resp = redis.NewErrorf("ERR backend server '%s' not found", addr)
return nil
}
return nil
}
func (s *Session) handleRequestInfo(r *Request, d *Router) error {
var addr string
var nblks = len(r.Multi) - 1
switch {
case nblks == 0:
slot := uint32(time.Now().Nanosecond()) % MaxSlotNum
return d.dispatchSlot(r, int(slot))
default:
addr = string(r.Multi[1].Value)
copy(r.Multi[1:], r.Multi[2:])
r.Multi = r.Multi[:nblks]
}
if !d.dispatchAddr(r, addr) {
r.Resp = redis.NewErrorf("ERR backend server '%s' not found", addr)
return nil
}
return nil
}
func (s *Session) handleRequestMGet(r *Request, d *Router) error {
var nkeys = len(r.Multi) - 1
switch {
case nkeys == 0:
r.Resp = redis.NewErrorf("ERR wrong number of arguments for 'MGET' command")
return nil
case nkeys == 1:
return d.dispatch(r)
}
var sub = r.MakeSubRequest(nkeys)
for i := range sub {
sub[i].Multi = []*redis.Resp{
r.Multi[0],
r.Multi[i+1],
}
// 这里也是复用通用逻辑 mget10个key r.Batch就会+10
if err := d.dispatch(&sub[i]); err != nil {
return err
}
}
r.Coalesce = func() error {
var array = make([]*redis.Resp, len(sub))
for i := range sub {
if err := sub[i].Err; err != nil {
return err
}
switch resp := sub[i].Resp; {
case resp == nil:
return ErrRespIsRequired
case resp.IsArray() && len(resp.Array) == 1:
array[i] = resp.Array[0]
default:
return fmt.Errorf("bad mget resp: %s array.len = %d", resp.Type, len(resp.Array))
}
}
r.Resp = redis.NewArray(array)
return nil
}
return nil
}
//调用Router的dispatch方法,将请求分发给相应的槽进行处理,
//如果调用Mset,一次设置多个值的话,就还要多做一步,通过Coalesce来合并请求结果
func (s *Session) handleRequestMSet(r *Request, d *Router) error {
var nblks = len(r.Multi) - 1
switch {
case nblks == 0 || nblks%2 != 0:
r.Resp = redis.NewErrorf("ERR wrong number of arguments for 'MSET' command")
return nil
case nblks == 2:
return d.dispatch(r)
}
//将一个Mset请求拆分成多个子set请求,分别dispatch
var sub = r.MakeSubRequest(nblks / 2)
for i := range sub {
sub[i].Multi = []*redis.Resp{
r.Multi[0],
r.Multi[i*2+1],
r.Multi[i*2+2],
}
if err := d.dispatch(&sub[i]); err != nil {
return err
}
}
r.Coalesce = func() error {
for i := range sub {
if err := sub[i].Err; err != nil {
return err
}
switch resp := sub[i].Resp; {
case resp == nil:
return ErrRespIsRequired
case resp.IsString():
r.Resp = resp
default:
return fmt.Errorf("bad mset resp: %s value.len = %d", resp.Type, len(resp.Value))
}
}
return nil
}
return nil
}
func (s *Session) handleRequestDel(r *Request, d *Router) error {
var nkeys = len(r.Multi) - 1
switch {
case nkeys == 0:
r.Resp = redis.NewErrorf("ERR wrong number of arguments for 'DEL' command")
return nil
case nkeys == 1:
return d.dispatch(r)
}
var sub = r.MakeSubRequest(nkeys)
for i := range sub {
sub[i].Multi = []*redis.Resp{
r.Multi[0],
r.Multi[i+1],
}
if err := d.dispatch(&sub[i]); err != nil {
return err
}
}
r.Coalesce = func() error {
var n int
for i := range sub {
if err := sub[i].Err; err != nil {
return err
}
switch resp := sub[i].Resp; {
case resp == nil:
return ErrRespIsRequired
case resp.IsInt() && len(resp.Value) == 1:
n += int(resp.Value[0] - '0')
default:
return fmt.Errorf("bad del resp: %s value.len = %d", resp.Type, len(resp.Value))
}
}
r.Resp = redis.NewInt(strconv.AppendInt(nil, int64(n), 10))
return nil
}
return nil
}
func (s *Session) handleRequestExists(r *Request, d *Router) error {
var nkeys = len(r.Multi) - 1
switch {
case nkeys == 0:
r.Resp = redis.NewErrorf("ERR wrong number of arguments for 'EXISTS' command")
return nil
case nkeys == 1:
return d.dispatch(r)
}
var sub = r.MakeSubRequest(nkeys)
for i := range sub {
sub[i].Multi = []*redis.Resp{
r.Multi[0],
r.Multi[i+1],
}
if err := d.dispatch(&sub[i]); err != nil {
return err
}
}
r.Coalesce = func() error {
var n int
for i := range sub {
if err := sub[i].Err; err != nil {
return err
}
switch resp := sub[i].Resp; {
case resp == nil:
return ErrRespIsRequired
case resp.IsInt() && len(resp.Value) == 1:
if resp.Value[0] != '0' {
n++
}
default:
return fmt.Errorf("bad exists resp: %s value.len = %d", resp.Type, len(resp.Value))
}
}
r.Resp = redis.NewInt(strconv.AppendInt(nil, int64(n), 10))
return nil
}
return nil
}
func (s *Session) handleRequestSlotsInfo(r *Request, d *Router) error {
var addr string
var nblks = len(r.Multi) - 1
switch {
case nblks != 1:
r.Resp = redis.NewErrorf("ERR wrong number of arguments for 'SLOTSINFO' command")
return nil
default:
addr = string(r.Multi[1].Value)
copy(r.Multi[1:], r.Multi[2:])
r.Multi = r.Multi[:nblks]
}
if !d.dispatchAddr(r, addr) {
r.Resp = redis.NewErrorf("ERR backend server '%s' not found", addr)
return nil
}
return nil
}
func (s *Session) handleRequestSlotsScan(r *Request, d *Router) error {
var nblks = len(r.Multi) - 1
switch {
case nblks <= 1:
r.Resp = redis.NewErrorf("ERR wrong number of arguments for 'SLOTSSCAN' command")
return nil
}
switch slot, err := redis.Btoi64(r.Multi[1].Value); {
case err != nil:
r.Resp = redis.NewErrorf("ERR parse slotnum '%s' failed, %s", r.Multi[1].Value, err)
return nil
case slot < 0 || slot >= MaxSlotNum:
r.Resp = redis.NewErrorf("ERR parse slotnum '%s' failed, out of range", r.Multi[1].Value)
return nil
default:
return d.dispatchSlot(r, int(slot))
}
}
func (s *Session) handleRequestSlotsMapping(r *Request, d *Router) error {
var nblks = len(r.Multi) - 1
switch {
case nblks >= 2:
r.Resp = redis.NewErrorf("ERR wrong number of arguments for 'SLOTSMAPPING' command")
return nil
}
marshalToResp := func(m *models.Slot) *redis.Resp {
if m == nil {
return redis.NewArray(nil)
}
var replicaGroups []*redis.Resp
for i := range m.ReplicaGroups {
var group []*redis.Resp
for _, addr := range m.ReplicaGroups[i] {
group = append(group, redis.NewString([]byte(addr)))
}
replicaGroups = append(replicaGroups, redis.NewArray(group))
}
return redis.NewArray([]*redis.Resp{
redis.NewString([]byte(strconv.Itoa(m.Id))),
redis.NewString([]byte(m.BackendAddr)),
redis.NewString([]byte(m.MigrateFrom)),
redis.NewArray(replicaGroups),
})
}
if nblks == 0 {
var array = make([]*redis.Resp, MaxSlotNum)
for i, m := range d.GetSlots() {
array[i] = marshalToResp(m)
}
r.Resp = redis.NewArray(array)
return nil
}
switch slot, err := redis.Btoi64(r.Multi[1].Value); {
case err != nil:
r.Resp = redis.NewErrorf("ERR parse slotnum '%s' failed, %s", r.Multi[1].Value, err)
return nil
case slot < 0 || slot >= MaxSlotNum:
r.Resp = redis.NewErrorf("ERR parse slotnum '%s' failed, out of range", r.Multi[1].Value)
return nil
default:
r.Resp = marshalToResp(d.GetSlot(int(slot)))
return nil
}
}
func (s *Session) incrOpTotal() {
s.stats.total.Incr()
}
func (s *Session) getOpStats(opstr string) *opStats {
e := s.stats.opmap[opstr]
if e == nil {
e = &opStats{opstr: opstr}
s.stats.opmap[opstr] = e
}
return e
}
func (s *Session) incrOpStats(r *Request, t redis.RespType) {
e := s.getOpStats(r.OpStr)
e.calls.Incr()
e.nsecs.Add(time.Now().UnixNano() - r.UnixNano)
switch t {
case redis.TypeError:
e.redis.errors.Incr()
}
}
func (s *Session) incrOpFails(r *Request, err error) error {
if r != nil {
e := s.getOpStats(r.OpStr)
e.fails.Incr()
} else {
s.stats.fails.Incr()
}
return err
}
func (s *Session) flushOpStats(force bool) {
var nano = time.Now().UnixNano()
if !force {
const period = int64(time.Millisecond) * 100
if d := nano - s.stats.flush.nano; d < period {
return
}
}
s.stats.flush.nano = nano
incrOpTotal(s.stats.total.Swap(0))
incrOpFails(s.stats.fails.Swap(0))
for _, e := range s.stats.opmap {
if e.calls.Int64() != 0 || e.fails.Int64() != 0 {
incrOpStats(e)
}
}
s.stats.flush.n++
if len(s.stats.opmap) <= 32 {
return
}
if (s.stats.flush.n % 16384) == 0 {
s.stats.opmap = make(map[string]*opStats, 32)
}
}