forked from dodo-open/dodo-open-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
websocket.go
204 lines (178 loc) · 4.46 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package websocket
import (
"context"
"errors"
"fmt"
restClient "github.com/Szzrain/dodo-open-go/client"
"github.com/Szzrain/dodo-open-go/log"
"github.com/Szzrain/dodo-open-go/tools"
"github.com/gorilla/websocket"
"time"
)
// Client WebSocket client interface
type Client interface {
Connect() error
Listen() error
Write(event *WSEventMessage) error
Reconnect() error
Close()
}
type (
// messageChan message channel
messageChan chan *WSEventMessage
// errorChan error channel to handle errors
errorChan chan error
// client WebSocket client implement
client struct {
c restClient.Client
conf *config
conn *websocket.Conn // WebSocket connection
messageChan messageChan // message channel
closeChan errorChan // errors channel
heartbeatTicker *time.Ticker // ticker for heartbeat
isConnected bool // connection status
}
config struct {
messageQueueSize int
messageHandlers *MessageHandlers // instance level message handlers
}
)
// New a WebSocket instance
func New(rc restClient.Client, options ...OptionHandler) (Client, error) {
conf := &config{
messageQueueSize: 10000,
messageHandlers: DefaultHandlers,
}
for _, optionHandler := range options {
if optionHandler == nil {
return nil, errors.New("invalied OptionHandler (nil detected)")
}
if err := optionHandler(conf); err != nil {
return nil, err
}
}
c := &client{
c: rc,
conf: conf,
isConnected: false,
}
return c, nil
}
// Connect to the WebSocket server
func (c *client) Connect() error {
if c.c == nil {
return errors.New("missing DoDoBot API Client")
}
url, err := c.c.GetWebsocketConnection(context.Background())
if err != nil {
return err
}
// malloc for message channel and error channel
c.messageChan = make(messageChan, c.conf.messageQueueSize)
c.closeChan = make(errorChan, 16)
// dial to WebSocket server
c.conn, _, err = websocket.DefaultDialer.Dial(url.Endpoint, nil)
if err != nil {
log.Errorf("connect error: %v", err)
return err
}
// start heartbeat ticker and update connection status
c.heartbeatTicker = time.NewTicker(time.Second * 25)
c.isConnected = true
return nil
}
// Listen message and handle it
func (c *client) Listen() error {
defer c.Close()
// read message
go c.readMessage()
// listen and handle message
go c.listenMessageAndHandle()
for c.isConnected {
select {
case err := <-c.closeChan:
log.Errorf("[stop listening] %v", err)
if DefaultHandlers.ErrorHandler != nil {
DefaultHandlers.ErrorHandler(err)
}
// reconnect after 2 seconds
time.Sleep(time.Second * 2)
if c.isConnected {
if err := c.Reconnect(); err != nil {
return err
}
}
case <-c.heartbeatTicker.C:
packet := &WSEventMessage{Type: HeartbeatType}
_ = c.Write(packet)
}
}
return nil
}
// Write message
func (c *client) Write(event *WSEventMessage) error {
m, _ := tools.JSON.Marshal(event)
if err := c.conn.WriteMessage(websocket.TextMessage, m); err != nil {
log.Errorf("write message failed cause: %v", err)
c.closeChan <- err
return err
}
return nil
}
// Reconnect to the WebSocket server
func (c *client) Reconnect() error {
c.Close()
if err := c.Connect(); err != nil {
return err
}
return c.Listen()
}
// Close connection and stop heartbeat ticker
func (c *client) Close() {
defer func() {
recover()
}()
c.isConnected = false
if err := c.conn.Close(); err != nil {
log.Errorf("close connection failed cause: %v", err)
}
c.heartbeatTicker.Stop()
close(c.messageChan)
close(c.closeChan)
}
// readMessage read message from connection
func (c *client) readMessage() {
defer func() {
recover()
}()
for c.isConnected {
_, message, err := c.conn.ReadMessage()
if err != nil {
log.Errorf("read message error cause: %v", err)
c.closeChan <- err
return
}
event := &WSEventMessage{}
if err = tools.JSON.Unmarshal(message, &event); err != nil {
log.Errorf("json unmarshal failed cause: %v", err)
continue
}
c.messageChan <- event
}
}
// listenMessageAndHandle listen and handle message from message channel
func (c *client) listenMessageAndHandle() {
defer func() {
if err := recover(); err != nil {
c.closeChan <- fmt.Errorf("we got panic: %v", err)
}
}()
for event := range c.messageChan {
if event.Type == HeartbeatType {
continue
}
if err := c.ParseDataAndHandle(event); err != nil {
log.Errorf("try to parse and handle message failed cause: %v", err)
}
}
}