/
client.go
133 lines (110 loc) · 2.58 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package chat
import (
"bytes"
"fmt"
"time"
"goyave.dev/goyave/v4/websocket"
ws "github.com/gorilla/websocket"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
)
var (
newline = []byte{'\n'}
space = []byte{' '}
)
// Client is a middleman between the websocket connection and the hub.
type Client struct {
// Buffered channel of outbound messages.
send chan []byte
readErr chan error
writeErr chan error
// The websocket connection.
conn *websocket.Conn
hub *Hub
Name string
}
func (c *Client) pump() error {
c.hub.register <- c
go c.writePump()
go c.readPump()
var err error
select {
case e := <-c.readErr:
err = e
case e := <-c.writeErr:
err = e
if err == nil {
// Hub closing, wait for readPump to return
<-c.readErr
}
}
return err
}
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsCloseError(err) {
c.readErr <- err
return
}
c.readErr <- fmt.Errorf("read: %w", err)
return
}
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
c.hub.Broadcast([]byte(fmt.Sprintf("%s: %s", c.Name, message)))
}
}
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
c.writeErr <- nil
return
}
w, err := c.conn.NextWriter(ws.TextMessage)
if err != nil {
c.writeErr <- fmt.Errorf("next writer: %w", err)
return
}
w.Write(message)
// Add queued chat messages to the current websocket message.
n := len(c.send)
for i := 0; i < n; i++ {
w.Write(newline)
w.Write(<-c.send)
}
if err := w.Close(); err != nil {
c.writeErr <- fmt.Errorf("writer close: %w", err)
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(ws.PingMessage, nil); err != nil {
c.writeErr <- fmt.Errorf("ping: %w", err)
return
}
}
}
}