-
Notifications
You must be signed in to change notification settings - Fork 8
/
cache.go
415 lines (361 loc) · 10.9 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
package main
import (
"database/sql"
"encoding/json"
"net/http"
"strings"
"sync"
"time"
)
// ChildMap represents a single child map, which is regularly cached.
type ChildMap struct {
ID int
Name, Hostname string
}
// UpdateMapCache updates the node cache intelligently using
// Conf.ChildMaps. Any unknown map addresses are added to the database
// automatically, and errors are logged.
func UpdateMapCache() {
// If there are no addresses to retrieve from, do nothing.
if len(Conf.ChildMaps) == 0 {
return
}
// Because we are refreshing the entire cache, delete all cached
// nodes.
err := Db.ClearCache()
if err != nil {
l.Errf("Error clearing cache: %s", err)
return
}
// Get a full database dump from all child maps and cache it.
err = GetAllFromChildMaps(Conf.ChildMaps)
if err != nil {
l.Errf("Error updating map cache: %s", err)
}
}
func (db DB) CacheNode(node *Node) (err error) {
stmt, err := db.Prepare(`INSERT INTO nodes_cached
(address, owner, details, lat, lon, status, expiration, updated)
VALUES(?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return
}
if node.RetrieveTime == 0 {
node.RetrieveTime = time.Now().Unix()
}
_, err = stmt.Exec(node.Addr, node.OwnerName, node.Details,
node.Latitude, node.Longitude, node.SourceID, node.Status,
node.RetrieveTime)
stmt.Close()
return
}
func (db DB) CacheNodes(nodes []*Node) (err error) {
stmt, err := db.Prepare(`INSERT INTO nodes_cached
(address, owner, details, lat, lon, status, source, retrieved)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return
}
for _, node := range nodes {
if node.RetrieveTime == 0 {
node.RetrieveTime = time.Now().Unix()
}
_, err = stmt.Exec([]byte(node.Addr), node.OwnerName,
node.Details,
node.Latitude, node.Longitude,
node.Status, node.SourceID, node.RetrieveTime)
if err != nil {
return
}
}
stmt.Close()
return
}
func (db DB) ClearCache() (err error) {
_, err = db.Exec(`DELETE FROM nodes_cached;`)
return err
}
// AddNewMapSource inserts a new map address into the cached_maps
// table.
func (db DB) AddNewMapSource(address, name string) (err error) {
_, err = db.Exec(`INSERT INTO cached_maps
(hostname,name) VALUES(?, ?)`, address, name)
return
}
// UpdateMapSourceData updates the name, and possibly other data,
// of a cached map
func (db DB) UpdateMapSourceData(address, name string) (err error) {
_, err = db.Exec(`UPDATE cached_maps
SET name=? WHERE hostname=?`, name, address)
return
}
// DumpChildMaps returns a slice containing all known child maps.
func (db DB) DumpChildMaps() (childMaps []*ChildMap, err error) {
childMaps = make([]*ChildMap, 0)
// Retrieve all child maps from the database.
rows, err := db.Query(`SELECT name, hostname, id
FROM cached_maps;`)
if err == sql.ErrNoRows {
return childMaps, nil
} else if err != nil {
return
}
// Scan in all of the values.
for rows.Next() {
childMap := &ChildMap{}
if err = rows.Scan(&childMap.Name, &childMap.Hostname,
&childMap.ID); err != nil {
return
}
childMaps = append(childMaps, childMap)
}
return
}
// GetMapSourceToID returns a mapping of child map hostnames to their
// local IDs. It also includes a mapping of "local" to id 0.
func (db DB) GetMapSourceToID() (sourceToID map[string]int, err error) {
// Initialize the map and insert the "local" id.
sourceToID = make(map[string]int, 1)
sourceToID["local"] = 0
// Retrieve every pair of hostnames and IDs.
rows, err := db.Query(`SELECT hostname,id
FROM cached_maps;`)
if err == sql.ErrNoRows {
return sourceToID, nil
} else if err != nil {
return
}
// Put in the rest of the mappings.
for rows.Next() {
var hostname string
var id int
if err = rows.Scan(&hostname, &id); err != nil {
return
}
sourceToID[hostname] = id
}
return
}
// GetMapIDToSource returns a mapping of local IDs to public
// hostnames. ID 0 is "local".
func (db DB) GetMapIDToSource() (IDToSource map[int]string, err error) {
// Initialize the slice with "local".
IDToSource = make(map[int]string, 1)
IDToSource[0] = "local"
// Retrieve every pair of IDs and hostnames.
rows, err := db.Query(`SELECT id,hostname
FROM cached_maps;`)
if err == sql.ErrNoRows {
return IDToSource, nil
} else if err != nil {
return
}
// Put in the rest of the IDs.
for rows.Next() {
var id int
var hostname string
if err = rows.Scan(&id, &hostname); err != nil {
return
}
IDToSource[id] = hostname
}
return
}
func (db DB) FindSourceMap(id int) (source string, err error) {
if id == 0 {
return "local", nil
}
row := db.QueryRow(`SELECT hostname
FROM cached_maps
WHERE id=?`, id)
err = row.Scan(&source)
return
}
func (db DB) CacheFormatNodes(nodes []*Node) (sourceMaps map[string][]*Node, err error) {
// First, get a mapping of IDs to sources for quick access.
idSources, err := db.GetMapIDToSource()
if err != nil {
return
}
// Now, prepare the data to be returned. Nodes will be added one
// at a time to the key arrays.
sourceMaps = make(map[string][]*Node)
for _, node := range nodes {
hostname := idSources[node.SourceID]
sourcemapNodes := sourceMaps[hostname]
if sourcemapNodes == nil {
sourcemapNodes = make([]*Node, 0, 5)
}
sourceMaps[hostname] = append(sourcemapNodes, node)
}
return
}
// nodeDumpWrapper is a structure which wraps a response from /api/all
// in which the Data field is a map[string][]*Node.
type nodeDumpWrapper struct {
Data map[string][]*Node `json:"data"`
Error interface{} `json:"error"`
}
type statusDumpWrapper struct {
Data map[string]interface{} `json:"data"`
Error interface{} `json:"error"`
}
// GetAllFromChildMaps accepts a list of child map addresses to
// retrieve nodes from. It does this concurrently, and puts any nodes
// and newly discovered addresses in the local ID table.
func GetAllFromChildMaps(addresses []string) (err error) {
// First off, initialize the slice into which we'll be appending
// all the nodes, and the souceToID map and mutex.
nodes := make([]*Node, 0)
sourceToID, err := Db.GetMapSourceToID()
if err != nil {
return
}
sourceMutex := new(sync.RWMutex)
// Next, we'll need a WaitGroup so we can block until all requests
// complete and a mutex to control appending to nodes.
waiter := new(sync.WaitGroup)
nodesMutex := new(sync.Mutex)
// We'll need to wait for len(addresses) goroutines to finish, so
// put that number in the WaitGroup.
waiter.Add(len(addresses))
// Now, start a separate goroutine for every address to
// concurrently retrieve nodes and append them (thread-safely) to
// nodes. Whenever appendNodesFromChildMap() finishes, it calls
// waiter.Done().
for _, address := range addresses {
go appendNodesFromChildMap(&nodes, address,
&sourceToID, sourceMutex, nodesMutex, waiter)
}
// Block until all goroutines are finished. This is simple to do
// with the WaitGroup, which keeps track of the number we're
// waiting for.
waiter.Wait()
return Db.CacheNodes(nodes)
}
// appendNodesFromChildMap is a helper function used by
// GetAllFromChildMaps() which calls GetAllFromChildMap() and
// thread-safely appends the result to the given slice. At the end of
// the function, it calls wg.Done().
func appendNodesFromChildMap(dst *[]*Node, address string,
sourceToID *map[string]int, sourceMutex *sync.RWMutex,
dstMutex *sync.Mutex, wg *sync.WaitGroup) {
// First, retrieve the nodes if possible. If there was an error,
// it will be logged, and if there were no nodes, we can stop
// here.
nodes := GetAllFromChildMap(address, sourceToID, sourceMutex)
if nodes == nil {
wg.Done()
return
}
// Now that we have the nodes, we need to lock the destination
// slice while we append to it.
dstMutex.Lock()
*dst = append(*dst, nodes...)
dstMutex.Unlock()
wg.Done()
}
func GetMapStatus(address string) (data map[string]interface{}) {
resp, err := http.Get(strings.TrimRight(address, "/") + "/api/status")
if err != nil {
l.Errf("Querying status of %q produced: %s", address, err)
return nil
}
var jresp statusDumpWrapper
err = json.NewDecoder(resp.Body).Decode(&jresp)
if err != nil {
l.Errf("Querying status of %q produced: %s", address, err)
return nil
} else if jresp.Error != nil {
l.Errf("Querying status of %q produced remote error: %s",
address, jresp.Error)
return nil
}
data = jresp.Data
return
}
// GetAllFromChildMap retrieves a list of nodes from a single remote
// address, and localizes them. If it encounters a remote address that
// is not already known, it safely adds it to the sourceToID map. It
// is safe for concurrent use. If it encounters an error, it will log
// it and return nil.
func GetAllFromChildMap(address string, sourceToID *map[string]int,
sourceMutex *sync.RWMutex) (nodes []*Node) {
// Query the node's status
mapStatus := GetMapStatus(address)
// Try to get all nodes via the API.
resp, err := http.Get(strings.TrimRight(address, "/") + "/api/all")
if err != nil {
l.Errf("Caching %q produced: %s", address, err)
return nil
}
// Read the data into a the nodeDumpWrapper type, so that it
// decodes properly.
var jresp nodeDumpWrapper
err = json.NewDecoder(resp.Body).Decode(&jresp)
if err != nil {
l.Errf("Caching %q produced: %s", address, err)
return nil
} else if jresp.Error != nil {
l.Errf("Caching %q produced remote error: %s",
address, jresp.Error)
return nil
}
// Prepare an initial slice so that it can be appended to, then
// loop through and convert sources to IDs.
//
// Additionally, use a boolean to keep track of whether we've
// replaced "local" with the actual address already, to save some
// needless compares.
nodes = make([]*Node, 0)
var replacedLocal bool
for source, remoteNodes := range jresp.Data {
// If we come across "local", then replace it with the address
// we're retrieving from.
if !replacedLocal && source == "local" {
source = address
}
// Get the name of the map from the status info
name, ok := mapStatus["name"].(string)
if !ok {
name = ""
}
// First, check if the source is known. If not, then we need
// to add it and refresh our map. Make sure all reads and
// writes to sourceToID are threadsafe.
sourceMutex.RLock()
id, ok := (*sourceToID)[source]
sourceMutex.RUnlock()
if !ok {
// Add the new source to the database, and put it in the
// map under the ID len(sourceToID), because that should
// be unique.
sourceMutex.Lock()
err := Db.AddNewMapSource(source, name)
if err != nil {
// Uh oh.
sourceMutex.Unlock()
l.Errf("Error while caching %q: %s", address, err)
return
}
id = len(*sourceToID)
(*sourceToID)[source] = id
sourceMutex.Unlock()
l.Debugf("Discovered new source map %q, ID %d\n",
source, id)
} else {
err := Db.UpdateMapSourceData(address, name)
if err != nil {
l.Errf("Error while updating %q: %s", address, err)
}
}
// Once the ID is set, proceed on to add it in all the
// remoteNodes.
for _, n := range remoteNodes {
n.SourceID = id
}
// Finally, append remoteNodes to the slice we're returning.
nodes = append(nodes, remoteNodes...)
}
return
}