/
oneserver.go
100 lines (84 loc) · 1.7 KB
/
oneserver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package transport
import (
"context"
"sync/atomic"
"time"
)
type OneSvrConf struct {
Address string `yaml:"address"`
Protocol string `yaml:"protocol"`
MaxInvoke int `yaml:"max_invoke"`
QueueCap int `yaml:"queue_cap"`
AcceptTimeout time.Duration `yaml:"accept_timeout"`
ReadTimeout time.Duration
WriteTimeout time.Duration
HandleTimeout time.Duration
IdleTimeout time.Duration
TCPReadBuf int
TCPWriteBuf int
TCPNoDelay bool
ServiceWeight int `yaml:"service_weight"`
}
type OneHandler interface {
Listen() error
Handle() error
}
type OneSvr struct {
isClosed bool
handler OneHandler
conf *OneSvrConf
proto SvrProtocol
logger Logger
numInvoke int32
lastInvoke time.Time
}
func NewOneSvr(sp SvrProtocol, logger Logger, conf *OneSvrConf) *OneSvr {
s := &OneSvr{
conf: conf,
logger: logger,
proto: sp,
}
return s
}
func (s *OneSvr) getHandler() (h OneHandler) {
switch s.conf.Protocol {
case "tcp":
h = newTcpHandler(s)
default:
s.logger.Error("no such protocol")
}
return
}
func (s *OneSvr) Serve() (err error) {
h := s.getHandler()
if err = h.Listen(); err == nil {
err = h.Handle()
}
return
}
func (s *OneSvr) Shutdown() {
s.isClosed = true
}
func (s *OneSvr) GetConf() *OneSvrConf {
return s.conf
}
func (s *OneSvr) invoke(ctx context.Context, pkg []byte) (rsp []byte) {
cfg := s.conf
atomic.AddInt32(&s.numInvoke,1)
if cfg.HandleTimeout != 0 {
done := make(chan struct{})
go func() {
rsp = s.proto.Invoke(ctx,pkg)
done <- struct{}{}
}()
select {
case <-done:
case <-time.After(cfg.HandleTimeout):
rsp = s.proto.InvokeTimeout(ctx,pkg)
}
}else {
rsp = s.proto.Invoke(ctx,pkg)
}
atomic.AddInt32(&s.numInvoke,-1)
return
}