/
socket.go
135 lines (110 loc) · 3.08 KB
/
socket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package websocket
import (
"time"
"github.com/sirupsen/logrus"
"github.com/gorilla/websocket"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
)
type Socket struct {
conn *websocket.Conn
subp *Subprotocol
clientID string
log *logrus.Entry
sendQueue chan []byte
}
func NewSocket(conn *websocket.Conn, subp *Subprotocol, log *logrus.Entry) *Socket {
return &Socket{
subp: subp,
conn: conn,
sendQueue: make(chan []byte),
log: log,
}
}
// send queues the message to be sent
func (s *Socket) send(msg []byte) {
s.sendQueue <- msg
}
func (s *Socket) readPump() {
defer func() {
_ = s.conn.Close()
}()
s.conn.SetReadLimit(maxMessageSize)
err := s.conn.SetReadDeadline(time.Now().Add(pongWait))
if err != nil {
s.log.WithError(err).Warn("Failed to set read deadline")
}
s.conn.SetPongHandler(func(string) error {
err := s.conn.SetReadDeadline(time.Now().Add(pongWait))
if err != nil {
s.log.WithError(err).Warn("Failed to set read deadline")
}
return nil
})
for {
_, message, err := s.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
s.log.WithError(err).Error("Connection closed unexpectedly")
}
break
}
s.subp.handleMessage(s, message)
}
}
func (s *Socket) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
_ = s.conn.Close()
}()
for {
select {
case message, ok := <-s.sendQueue:
err := s.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
s.log.WithError(err).Warn("Failed to set write deadline")
}
if !ok {
// The channel was closed
err = s.conn.WriteMessage(websocket.CloseMessage, []byte{})
if err != nil {
s.log.WithError(err).Warn("Writing close message on channel close failed")
}
return
}
err = s.conn.WriteMessage(websocket.BinaryMessage, message)
if err != nil {
s.log.WithError(err).Error("Writing message failed")
}
case <-ticker.C:
err := s.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
s.log.WithError(err).Warn("Failed to set write deadline")
}
if err = s.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
s.log.WithError(err).Warn("Failed to write ping message")
return
}
}
}
}
func (s *Socket) Close() error {
return s.conn.WriteMessage(websocket.CloseMessage, []byte{})
}
// Send is a helper method to send a message to current socket
func (s *Socket) Send(topic string, payload []byte) error {
return s.subp.SendToSocket(s, topic, payload)
}
// SendToClient is a helper method to send a message to all sockets of the same client
func (s *Socket) SendToClient(topic string, payload []byte) error {
return s.subp.SendToClient(s.clientID, topic, payload)
}