-
Notifications
You must be signed in to change notification settings - Fork 165
/
hub.go
63 lines (54 loc) · 1.25 KB
/
hub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package main
import (
"log"
"sync"
"time"
)
type hub struct {
// the mutex to protect connections
connectionsMx sync.RWMutex
// Registered connections.
connections map[*connection]struct{}
// Inbound messages from the connections.
broadcast chan []byte
logMx sync.RWMutex
log [][]byte
}
func newHub() *hub {
h := &hub{
connectionsMx: sync.RWMutex{},
broadcast: make(chan []byte),
connections: make(map[*connection]struct{}),
}
go func() {
for {
msg := <-h.broadcast
h.connectionsMx.RLock()
for c := range h.connections {
select {
case c.send <- msg:
// stop trying to send to this connection after trying for 1 second.
// if we have to stop, it means that a reader died so remove the connection also.
case <-time.After(1 * time.Second):
log.Printf("shutting down connection %v", *c)
h.removeConnection(c)
}
}
h.connectionsMx.RUnlock()
}
}()
return h
}
func (h *hub) addConnection(conn *connection) {
h.connectionsMx.Lock()
defer h.connectionsMx.Unlock()
h.connections[conn] = struct{}{}
}
func (h *hub) removeConnection(conn *connection) {
h.connectionsMx.Lock()
defer h.connectionsMx.Unlock()
if _, ok := h.connections[conn]; ok {
delete(h.connections, conn)
close(conn.send)
}
}