/
websocket.go
166 lines (146 loc) · 3.38 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package websocket
import (
"bufio"
"encoding/binary"
"errors"
"io"
"net"
"net/http"
"unicode/utf8"
"github.com/gbrlsnchs/websocket/internal"
)
var (
errInvalidCloseCode = errors.New("websocket: invalid close code")
errInvalidUTF8 = errors.New("websocket: payload contains invalid UTF-8 content")
)
const defaultRWSize = 4096
const (
stateOpen = iota
stateClosing
stateClosed
)
// WebSocket is a websocket instance that may be
// either a client or a server depending on how it is created.
type WebSocket struct {
*writer
fb *frameBuffer
conn io.Closer
state int
cc uint16
opcode uint8
payload []byte
err error
}
func newWS(conn net.Conn, client bool) *WebSocket {
return &WebSocket{
fb: &frameBuffer{
rd: bufio.NewReaderSize(conn, defaultRWSize),
first: true,
client: client,
},
writer: &writer{
wr: bufio.NewWriterSize(conn, defaultRWSize),
opcode: OpcodeText,
client: client,
},
conn: conn,
}
}
// UpgradeHTTP switches the protocol from HTTP to the WebSocket Protocol.
func UpgradeHTTP(w http.ResponseWriter, r *http.Request) (*WebSocket, error) {
conn, err := internal.Handshake(w, r)
if err != nil {
status := http.StatusBadRequest
http.Error(w, http.StatusText(status), status)
return nil, err
}
return newWS(conn, false), nil
}
// Close closes the connection manually by sending the close code 1000.
func (ws *WebSocket) Close() error {
if ws.cc == 0 {
ws.cc = 1000
}
ws.SetWriteOpcode(opcodeClose)
binary.Write(ws, binary.BigEndian, ws.cc)
var err error
if ws.state >= stateClosing {
err = ws.conn.Close()
}
ws.resolveState()
return err
}
func (ws *WebSocket) CloseCode() uint16 { return ws.cc }
func (ws *WebSocket) Err() error { return ws.err }
func (ws *WebSocket) Next() bool {
for {
f, err := ws.fb.next()
if err != nil {
ws.state = stateClosed
if err == io.EOF {
return false
}
ws.conn.Close()
ws.err = err
return false
}
switch {
case f.opcode == opcodePing:
ws.handlePing(f.payload)
case f.opcode == opcodePong: // no-op
case f.opcode == opcodeClose:
defer ws.Close()
ws.cc = 1000
ws.resolveState()
if f.hasCloseCode && !validCloseCode(f.cc) {
ws.cc = 1002
ws.err = errInvalidCloseCode
return false
}
if !utf8.Valid(f.payload) {
ws.cc = 1002
ws.err = errInvalidClosePayload
return false
}
ws.opcode = f.opcode
ws.payload = f.payload
return false
default:
ws.fb.add(f)
if f.final {
defer ws.fb.reset()
if ws.fb.opcode == OpcodeText && !utf8.Valid(ws.fb.payload) {
ws.conn.Close()
ws.err = errInvalidUTF8
return false
}
ws.opcode = ws.fb.opcode
ws.payload = ws.fb.payload
return true
}
}
}
}
func (ws *WebSocket) Opcode() uint8 { return ws.opcode }
func (ws *WebSocket) Payload() []byte { return ws.payload }
func (ws *WebSocket) Read(b []byte) (int, error) { return copy(b, ws.payload), nil }
func (ws *WebSocket) SetCloseCode(cc uint16) error {
if !validCloseCode(cc) {
return errInvalidCloseCode
}
ws.cc = cc
return nil
}
func (ws *WebSocket) SetWriteOpcode(opcode uint8) { ws.writer.opcode = opcode }
func (ws *WebSocket) handlePing(b []byte) {
ws.SetWriteOpcode(opcodePong)
ws.Write(b)
}
func (ws *WebSocket) resolveState() {
switch ws.state {
case stateOpen:
ws.state = stateClosing
case stateClosing:
ws.state = stateClosed
}
}