forked from olahol/melody
/
melody.go
144 lines (116 loc) · 3.6 KB
/
melody.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package melody
import (
"github.com/gorilla/websocket"
"net/http"
)
type handleMessageFunc func(*Session, []byte)
type handleErrorFunc func(*Session, error)
type handleSessionFunc func(*Session)
type filterFunc func(*Session) bool
type Melody struct {
Config *Config
Upgrader *websocket.Upgrader
messageHandler handleMessageFunc
messageHandlerBinary handleMessageFunc
errorHandler handleErrorFunc
connectHandler handleSessionFunc
disconnectHandler handleSessionFunc
hub *hub
}
// Returns a new melody instance with default Upgrader and Config.
func New() *Melody {
upgrader := &websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
hub := newHub()
go hub.run()
return &Melody{
Config: newConfig(),
Upgrader: upgrader,
messageHandler: func(*Session, []byte) {},
messageHandlerBinary: func(*Session, []byte) {},
errorHandler: func(*Session, error) {},
connectHandler: func(*Session) {},
disconnectHandler: func(*Session) {},
hub: hub,
}
}
// Fires fn when a session connects.
func (m *Melody) HandleConnect(fn func(*Session)) {
m.connectHandler = fn
}
// Fires fn when a session disconnects.
func (m *Melody) HandleDisconnect(fn func(*Session)) {
m.disconnectHandler = fn
}
// Callback when a text message comes in.
func (m *Melody) HandleMessage(fn func(*Session, []byte)) {
m.messageHandler = fn
}
// Callback when a binary message comes in.
func (m *Melody) HandleMessageBinary(fn func(*Session, []byte)) {
m.messageHandlerBinary = fn
}
// Fires when a session has an error.
func (m *Melody) HandleError(fn func(*Session, error)) {
m.errorHandler = fn
}
// Handles http requests and upgrades them to websocket connections.
func (m *Melody) HandleRequest(w http.ResponseWriter, r *http.Request) {
conn, err := m.Upgrader.Upgrade(w, r, nil)
if err != nil {
m.errorHandler(nil, err)
return
}
session := &Session{
Request: r,
conn: conn,
output: make(chan *envelope, m.Config.MessageBufferSize),
melody: m,
}
m.hub.register <- session
go m.connectHandler(session)
go session.writePump()
session.readPump()
if m.hub.open {
m.hub.unregister <- session
}
go m.disconnectHandler(session)
}
// Broadcasts a text message to all sessions.
func (m *Melody) Broadcast(msg []byte) {
message := &envelope{t: websocket.TextMessage, msg: msg}
m.hub.broadcast <- message
}
// Broadcasts a text message to all sessions that fn returns true for.
func (m *Melody) BroadcastFilter(msg []byte, fn func(*Session) bool) {
message := &envelope{t: websocket.TextMessage, msg: msg, filter: fn}
m.hub.broadcast <- message
}
// Broadcasts a text message to all sessions except session s.
func (m *Melody) BroadcastOthers(msg []byte, s *Session) {
m.BroadcastFilter(msg, func(q *Session) bool {
return s != q
})
}
// Broadcasts a binary message to all sessions.
func (m *Melody) BroadcastBinary(msg []byte) {
message := &envelope{t: websocket.BinaryMessage, msg: msg}
m.hub.broadcast <- message
}
// Broadcasts a binary message to all sessions that fn returns true for.
func (m *Melody) BroadcastBinaryFilter(msg []byte, fn func(*Session) bool) {
message := &envelope{t: websocket.BinaryMessage, msg: msg, filter: fn}
m.hub.broadcast <- message
}
// Broadcasts a binary message to all sessions except session s.
func (m *Melody) BroadcastBinaryOthers(msg []byte, s *Session) {
m.BroadcastBinaryFilter(msg, func(q *Session) bool {
return s != q
})
}
// Closes the melody instance and all connected sessions.
func (m *Melody) Close() {
m.hub.exit <- true
}