/
registry.go
206 lines (159 loc) · 4.77 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package api_ws
import (
"bytes"
"fmt"
"github.com/deze333/vroom/auth"
)
//------------------------------------------------------------
// Registry of active WebSocket connections
//------------------------------------------------------------
var _connsPublic = map[string]*Conn{} // {agentId: conn}
var _connsAuthd = map[string]map[string]*Conn{} // {authId: {agentId: conn}}
//------------------------------------------------------------
// Functions
//------------------------------------------------------------
// Registers open connection based on session ID.
func RegisterConn(ws *Conn) {
if ws.isAuthd {
// Authenticated connection
if id := auth.GetAuthdId(ws.r); id != "" {
// Use authentication ID
ws.authId = id
// Try to use "email" from session values
if vals, err := auth.GetSessionValues(ws.r); err == nil {
ws.authEmail = fmt.Sprint(vals["email"])
}
// Add authd connection
if conns, ok := _connsAuthd[ws.authId]; ok {
conns[ws.agentId] = ws
} else {
_connsAuthd[ws.authId] = map[string]*Conn{ws.agentId: ws}
}
// Debug
//fmt.Println(DumpConnsAuthd("REGISTERED CONNS: AUTHD"))
} else {
// Can't find ID for authenticated connection
_onPanic(
fmt.Sprintf("Cannot find ID for authd connection"),
fmt.Sprintf("%v", ws.r))
}
} else {
// Public connection
_connsPublic[ws.agentId] = ws
// Debug
//fmt.Println(DumpConnsPublic("REGISTERED CONNS: PUBLIC"))
}
}
// Deregisteres connection by removing it from the registry.
func DeregisterConn(ws *Conn) {
if ws.isAuthd {
// Authd connections
if conns, ok := _connsAuthd[ws.authId]; ok {
delete(conns, ws.agentId)
}
// Debug
//fmt.Println(DumpConnsAuthd("REGISTERED CONNS: AUTHD"))
} else {
// Public connections
delete(_connsPublic, ws.agentId)
// Debug
//fmt.Println(DumpConnsPublic("REGISTERED CONNS: PUBLIC"))
}
}
// Finds connection that corresponds to agentId.
func GetConn(isAuthd bool, agentId string) (ws *Conn) {
if isAuthd {
// Authd connections
for _, conns := range _connsAuthd {
if ws, ok := conns[agentId]; ok {
return ws
}
}
} else {
// Public connections
ws = _connsPublic[agentId]
}
return
}
// Applies function fn to all public connections.
func applyToPublic(fn func(*Conn)) {
for _, conn := range _connsPublic {
fn(conn)
}
}
// Applies function fn to all authd connections.
func applyToAuthd(fn func(*Conn)) {
for _, conns := range _connsAuthd {
for _, conn := range conns {
fn(conn)
}
}
}
// Closes authenticated connection by ID.
func CloseAuthdConn(id string) {
if conns, ok := _connsAuthd[id]; ok {
// Multiple connections may share same authentication ID
for _, ws := range conns {
//fmt.Printf("__X Closed WebSocket ID = %v, conn = %p\n", id, ws.conn)
ws.conn.Close()
}
DeregisterConn(&Conn{isAuthd: true, authId: id})
}
}
//------------------------------------------------------------
// Monitoring functions
//------------------------------------------------------------
func ConnsInfoAuthd() (infos []map[string]interface{}) {
infos = []map[string]interface{}{}
// For each registered request connection
for authId, conns := range _connsAuthd {
// Add its opened WebSocket sessions
for agentId, ws := range conns {
info := map[string]interface{}{}
info["_authId"] = authId
info["_httpReqId"] = fmt.Sprintf("%v", agentId)
sessVals, _ := auth.GetSessionValues(ws.r)
for k, v := range sessVals {
info["sess/"+k] = v
}
info["ws/authd"] = fmt.Sprint(ws.isAuthd)
info["ws/open"] = fmt.Sprint(ws.isOpen)
infos = append(infos, info)
}
}
return
}
//------------------------------------------------------------
// Debug functions
//------------------------------------------------------------
func DumpConnsPublic(header string) string {
var buf bytes.Buffer
buf.WriteString("\n")
buf.WriteString("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n")
buf.WriteString(header)
buf.WriteString("\n")
buf.WriteString("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n")
i := 0
for agentId, ws := range _connsPublic {
buf.WriteString(fmt.Sprintf("\t%v: agentId = %v, wsconn = %p\n", i, agentId, ws.conn))
i++
}
return buf.String()
}
func DumpConnsAuthd(header string) string {
var buf bytes.Buffer
buf.WriteString("\n")
buf.WriteString("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n")
buf.WriteString(header)
buf.WriteString("\n")
buf.WriteString("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n")
for authId, conns := range _connsAuthd {
buf.WriteString(fmt.Sprintf("\t%v:\n", authId))
j := 0
for agentId, ws := range conns {
buf.WriteString(fmt.Sprintf("\t\t%v: %v, agentId = %v, wsconn = %p\n", j, ws.authEmail, agentId, ws.conn))
j++
}
}
return buf.String()
}