/
websocket_conn.go
129 lines (115 loc) · 2.36 KB
/
websocket_conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package ws
import (
"sync"
"github.com/gorilla/websocket"
)
/*
TODO: 实现 gorilla/websocket 的事件监听,适配 socketio
socketio 数据传输格式: `["binding","{\"code\":0,\"data\":123456,\"message\":\"\"}"]`
// 字符串数组,第一个字符串是事件名,第二个字符串是数据,通常是 json string
["",""]
*/
type Conn struct {
SocketId string
wsConn *websocket.Conn
inChan chan []byte
outJsonChan chan any
outMessageChan chan []byte
closeChan chan byte
mutex sync.Mutex
isClosed bool
}
func InitConn(wsConn *websocket.Conn) (conn *Conn, err error) {
conn = &Conn{
SocketId: wsConn.RemoteAddr().String(),
wsConn: wsConn,
inChan: make(chan []byte, CommonChanCount),
outJsonChan: make(chan any, CommonChanCount),
outMessageChan: make(chan []byte, CommonChanCount),
closeChan: make(chan byte, 1),
}
// 启动协程去读取消息
go conn.readLoop()
// 启动协程去发送消息
go conn.writeLoop()
return
}
func (c *Conn) ReadMessage() (data []byte, err error) {
select {
case data = <-c.inChan:
case <-c.closeChan:
err = ErrConnClosed
}
return data, err
}
func (c *Conn) WriteMessage(data []byte) (err error) {
select {
case c.outMessageChan <- data:
case <-c.closeChan:
err = ErrConnClosed
}
return err
}
func (c *Conn) WriteJson(data any) (err error) {
select {
case c.outJsonChan <- data:
case <-c.closeChan:
err = ErrConnClosed
}
return err
}
func (c *Conn) Close() {
// 线程安全的Close
_ = c.wsConn.Close()
// 这一行代码只需要执行一次
c.mutex.Lock()
if !c.isClosed {
close(c.closeChan)
c.isClosed = true
}
c.mutex.Unlock()
}
func (c *Conn) readLoop() {
var (
data []byte
err error
)
for {
_, data, err = c.wsConn.ReadMessage()
if err != nil {
goto ERROR
}
select {
case c.inChan <- data:
case <-c.closeChan:
// todo:关闭
goto ERROR
}
}
ERROR:
// todo:关闭连接操作
c.Close()
}
func (c *Conn) writeLoop() {
var (
message []byte
json any
err error
)
for {
select {
case message = <-c.outMessageChan:
err = c.wsConn.WriteMessage(websocket.TextMessage, message)
case json = <-c.outJsonChan:
err = c.wsConn.WriteJSON(json)
case <-c.closeChan:
goto ERROR
}
if err != nil {
goto ERROR
}
}
ERROR:
//todo:关闭连接操作
c.Close()
}