forked from decred/dcrdata
/
websocket.go
156 lines (139 loc) · 4 KB
/
websocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright (c) 2017, The dcrdata developers
// See LICENSE for details.
package explorer
import (
"sync"
"time"
)
const (
wsWriteTimeout = 10 * time.Second
wsReadTimeout = 12 * time.Second
pingInterval = 12 * time.Second
sigNewBlock hubSignal = iota
sigMempoolUpdate
sigPingAndUserCount
)
// WebSocketMessage represents the JSON object used to send and received typed
// messages to the web client.
type WebSocketMessage struct {
EventId string `json:"event"`
Message string `json:"message"`
}
// Event type field for an SSE event
var eventIDs = map[hubSignal]string{
sigNewBlock: "newblock",
sigMempoolUpdate: "mempool",
sigPingAndUserCount: "ping",
}
// WebsocketHub and its event loop manage all websocket client connections.
// WebsocketHub is responsible for closing all connections registered with it.
// If the event loop is running, calling (*WebsocketHub).Stop() will handle it.
type WebsocketHub struct {
sync.RWMutex
clients map[*hubSpoke]struct{}
Register chan *hubSpoke
Unregister chan *hubSpoke
HubRelay chan hubSignal
NewBlockSummary chan BlockBasic
quitWSHandler chan struct{}
}
type hubSignal int
type hubSpoke chan hubSignal
// NewWebsocketHub creates a new WebsocketHub
func NewWebsocketHub() *WebsocketHub {
return &WebsocketHub{
clients: make(map[*hubSpoke]struct{}),
Register: make(chan *hubSpoke),
Unregister: make(chan *hubSpoke),
HubRelay: make(chan hubSignal),
quitWSHandler: make(chan struct{}),
}
}
// NumClients returns the number of clients connected to the websocket hub
func (wsh *WebsocketHub) NumClients() int {
return len(wsh.clients)
}
// RegisterClient registers a websocket connection with the hub.
func (wsh *WebsocketHub) RegisterClient(c *hubSpoke) {
log.Debug("Registering new websocket client")
wsh.Register <- c
}
// registerClient should only be called from the run loop
func (wsh *WebsocketHub) registerClient(c *hubSpoke) {
wsh.clients[c] = struct{}{}
}
// UnregisterClient unregisters the input websocket connection via the main
// run() loop. This call will block if the run() loop is not running.
func (wsh *WebsocketHub) UnregisterClient(c *hubSpoke) {
wsh.Unregister <- c
}
// unregisterClient should only be called from the loop in run().
func (wsh *WebsocketHub) unregisterClient(c *hubSpoke) {
if _, ok := wsh.clients[c]; !ok {
// unknown client, do not close channel
log.Warnf("unknown client")
return
}
delete(wsh.clients, c)
// Close the channel, but make sure the client didn't do it
safeClose(*c)
}
func safeClose(cc hubSpoke) {
select {
case _, ok := <-cc:
if !ok {
log.Debug("Channel already closed!")
return
}
default:
}
close(cc)
}
// Stop kills the run() loop and unregisteres all clients (connections).
func (wsh *WebsocketHub) Stop() {
// end the run() loop, allowing in progress operations to complete
wsh.quitWSHandler <- struct{}{}
// unregister all clients
for client := range wsh.clients {
wsh.unregisterClient(client)
}
}
func (wsh *WebsocketHub) run() {
log.Info("Starting WebsocketHub run loop.")
for {
events:
select {
case hubSignal := <-wsh.HubRelay:
switch hubSignal {
case sigNewBlock:
log.Infof("Signaling new block to %d clients.", len(wsh.clients))
case sigPingAndUserCount:
log.Tracef("Signaling ping/user count to %d clients.", len(wsh.clients))
case sigMempoolUpdate:
log.Infof("Signaling mempool update to %d clients.", len(wsh.clients))
default:
log.Errorf("Unknown hub signal: %v", hubSignal)
break events
}
for client := range wsh.clients {
// signal or unregister the client
select {
case *client <- hubSignal:
default:
wsh.unregisterClient(client)
}
}
case c := <-wsh.Register:
wsh.registerClient(c)
case c := <-wsh.Unregister:
wsh.unregisterClient(c)
case _, ok := <-wsh.quitWSHandler:
if !ok {
log.Error("close channel already closed. This should not happen.")
return
}
close(wsh.quitWSHandler)
return
}
}
}