forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 1
/
topology.go
127 lines (106 loc) · 2.79 KB
/
topology.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package elasticsearch
import (
"encoding/json"
"math/rand"
"strconv"
"strings"
"sync/atomic"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/mode"
)
type topology struct {
clients []mode.ProtocolClient
TopologyMap atomic.Value // Value holds a map[string][string]
}
type publishedTopology struct {
Name string
IPs string
}
func (t *topology) randomClient() *Client {
switch len(t.clients) {
case 0:
return nil
case 1:
return t.clients[0].(*Client).Clone()
default:
return t.clients[rand.Intn(len(t.clients))].(*Client).Clone()
}
}
// Get the name of a shipper by its IP address from the local topology map
func (t *topology) GetNameByIP(ip string) string {
topologyMap, ok := t.TopologyMap.Load().(map[string]string)
if ok {
name, exists := topologyMap[ip]
if exists {
return name
}
}
return ""
}
// Each shipper publishes a list of IPs together with its name to Elasticsearch
func (t *topology) PublishIPs(name string, localAddrs []string) error {
client := t.randomClient()
if client == nil {
return ErrNotConnected
}
debugf("Publish IPs: %s", localAddrs)
params := map[string]string{
"refresh": "true",
}
_, _, err := client.Index(
".packetbeat-topology", //index
"server-ip", //type
name, // id
params, // parameters
publishedTopology{name, strings.Join(localAddrs, ",")}, // body
)
if err != nil {
logp.Err("Fail to publish IP addresses: %s", err)
return err
}
newMap, err := loadTopolgyMap(client)
if err != nil {
return err
}
t.TopologyMap.Store(newMap)
return nil
}
// Update the local topology map
func loadTopolgyMap(client *Client) (map[string]string, error) {
// get all shippers IPs from Elasticsearch
index := ".packetbeat-topology"
docType := "server-ip"
// get number of entries in index for search query to return all entries in one query
_, cntRes, err := client.CountSearchURI(index, docType, nil)
if err != nil {
logp.Err("Getting topology map fails with: %s", err)
return nil, err
}
params := map[string]string{"size": strconv.Itoa(cntRes.Count)}
_, res, err := client.SearchURI(index, docType, params)
if err != nil {
logp.Err("Getting topology map fails with: %s", err)
return nil, err
}
topology := make(map[string]string)
for _, obj := range res.Hits.Hits {
var result QueryResult
err = json.Unmarshal(obj, &result)
if err != nil {
logp.Err("Failed to read response: %v", err)
return nil, err
}
var pub publishedTopology
err = json.Unmarshal(result.Source, &pub)
if err != nil {
logp.Err("json.Unmarshal fails with: %s", err)
return nil, err
}
// add mapping
for _, addr := range strings.Split(pub.IPs, ",") {
topology[addr] = pub.Name
}
}
debugf("Topology map %s", topology)
return topology, nil
}