forked from davyxu/cellnet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
acceptor.go
155 lines (109 loc) · 2.73 KB
/
acceptor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package udp
import (
"expvar"
"fmt"
"github.com/bobwong89757/cellnet"
"github.com/bobwong89757/cellnet/peer"
"github.com/bobwong89757/cellnet/util"
"net"
"time"
)
const MaxUDPRecvBuffer = 2048
type udpAcceptor struct {
peer.CoreSessionManager
peer.CorePeerProperty
peer.CoreContextSet
peer.CoreRunningTag
peer.CoreProcBundle
localAddr *net.UDPAddr
conn *net.UDPConn
sesQueue *util.Queue
sesTimeout time.Duration
mtSesQueueCount *expvar.Int
mtTotalRecvUDPPacket *expvar.Int
}
func (self *udpAcceptor) Start() cellnet.Peer {
if self.mtSesQueueCount == nil {
self.mtSesQueueCount = expvar.NewInt(fmt.Sprintf("cellnet.Peer(%s).SessionQueueCount", self.Name()))
}
if self.mtTotalRecvUDPPacket == nil {
self.mtTotalRecvUDPPacket = expvar.NewInt(fmt.Sprintf("cellnet.Peer(%s).TotalRecvUDPPacket", self.Name()))
}
var err error
self.localAddr, err = net.ResolveUDPAddr("udp", self.Address())
if err != nil {
log.Errorf("#udp.resolve failed(%s) %v", self.NameOrAddress(), err.Error())
return self
}
self.conn, err = net.ListenUDP("udp", self.localAddr)
if err != nil {
log.Errorf("#udp.listen failed(%s) %s", self.NameOrAddress(), err.Error())
self.SetRunning(false)
return self
}
log.Infof("#udp.listen(%s) %s", self.Name(), self.Address())
go self.accept()
return self
}
func (self *udpAcceptor) accept() {
self.SetRunning(true)
recvBuff := make([]byte, MaxUDPRecvBuffer)
for {
n, remoteAddr, err := self.conn.ReadFromUDP(recvBuff)
if err != nil {
break
}
if n > 0 {
self.mtTotalRecvUDPPacket.Add(1)
ses := self.allocSession(remoteAddr)
ses.Recv(recvBuff[:n])
}
}
self.SetRunning(false)
}
func (self *udpAcceptor) allocSession(addr *net.UDPAddr) *udpSession {
var ses *udpSession
if self.sesQueue.Count() > 0 {
ses = self.sesQueue.Peek().(*udpSession)
// 这个session还能用,需要重新new
if ses.IsAlive() {
ses = nil
} else {
// 可以复用
ses = self.sesQueue.Dequeue().(*udpSession)
}
}
if ses == nil {
ses = &udpSession{}
self.sesQueue.Enqueue(ses)
}
self.mtSesQueueCount.Set(int64(self.sesQueue.Count()))
ses.timeOutTick = time.Now().Add(self.sesTimeout)
ses.conn = self.conn
ses.remote = addr
ses.pInterface = self
ses.CoreProcBundle = &self.CoreProcBundle
return ses
}
func (self *udpAcceptor) SetSessionTTL(dur time.Duration) {
self.sesTimeout = dur
}
func (self *udpAcceptor) Stop() {
if self.conn != nil {
self.conn.Close()
}
// TODO 等待accept线程结束
self.SetRunning(false)
}
func (self *udpAcceptor) TypeName() string {
return "udp.Acceptor"
}
func init() {
peer.RegisterPeerCreator(func() cellnet.Peer {
p := &udpAcceptor{
sesQueue: util.NewQueue(64),
sesTimeout: time.Second,
}
return p
})
}