/
websocket.go
133 lines (116 loc) · 2.92 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package connection
import (
"time"
"github.com/flet-dev/flet/server/config"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)
type WebSocket struct {
conn *websocket.Conn
send chan []byte
done chan bool
}
func NewWebSocket(conn *websocket.Conn) *WebSocket {
cws := &WebSocket{
conn: conn,
send: make(chan []byte, 10),
done: make(chan bool),
}
return cws
}
func (c *WebSocket) Start(handler ReadMessageHandler) bool {
// start read/write loops
go c.readLoop(handler)
go c.writeLoop()
return <-c.done
}
func (c *WebSocket) Send(message []byte) {
c.send <- message
}
func (c *WebSocket) readLoop(readHandler ReadMessageHandler) {
normalClosure := false
defer func() {
log.Println("Exiting WebSocket read loop")
close(c.send)
c.close(normalClosure)
}()
c.conn.SetReadLimit(int64(config.MaxWebSocketMessageSize()))
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
log.Debugln("received pong")
c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
normalClosure = true
} else if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) || err == websocket.ErrReadLimit {
log.Errorf("error: %v", err)
}
break
}
err = readHandler(message)
if err != nil {
log.Errorf("error processing WebSocket message: %v", err)
break
}
}
}
func (c *WebSocket) writeLoop() {
ticker := time.NewTicker(pingPeriod)
normalClosure := false
defer func() {
log.Println("Exiting WebSocket write loop")
ticker.Stop()
c.close(normalClosure)
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
normalClosure = true
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
log.Errorf("Error creating WebSocket message writer: %v", err)
return
}
_, err = w.Write(message)
if err != nil {
log.Errorf("Error writing WebSocket message: %v", err)
return
}
if err := w.Close(); err != nil {
log.Errorf("Error closing WebSocket message writer: %v", err)
return
}
case <-ticker.C:
log.Debugln("send ping")
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
log.Errorf("Error sending WebSocket PING message: %v", err)
return
}
}
}
}
func (c *WebSocket) close(normalClosure bool) {
c.conn.Close()
select {
case c.done <- normalClosure:
default:
// no listeners
}
}