-
Notifications
You must be signed in to change notification settings - Fork 0
/
ping-monitor.go
301 lines (249 loc) · 8.57 KB
/
ping-monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
package main
// Based on github.com/t-lin/ping-exporter
import (
"context"
"errors"
"flag"
"fmt"
"log"
"net/http"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/pnet"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/multiformats/go-multiaddr"
"github.com/Multi-Tier-Cloud/common/p2pnode"
"github.com/Multi-Tier-Cloud/common/peerlastseen"
"github.com/Multi-Tier-Cloud/common/util"
)
const defaultKeyFile = "~/.privKeyPing"
var (
debug = flag.Bool("debug", false, "Debug mode")
hostname = flag.String("hostname", "", "Name for labelling metrics (defaults to hostname)")
promEndpoint = flag.String("prom-listen-addr", ":9100", "Listening address/endpoint for Prometheus to scrape")
p2pEndpoint = flag.String("p2p-listen-addr", ":4001", "Listening address/endpoing for the P2P node")
)
func init() {
// Set up logging defaults
log.SetFlags(log.Ldate | log.Lmicroseconds | log.Lshortfile)
}
// Context provided here must already has a deadline set
func pingPeer(ctx context.Context, node *p2pnode.Node, peer peer.ID,
pingGaugeVec *prometheus.GaugeVec,
pls *peerlastseen.PeerLastSeen) {
// Ping and get result
responseChan := ping.Ping(ctx, node.Host, peer)
result := <-responseChan
if result.Error != nil {
log.Println("ID:", peer, "Failed to ping, error:", result.Error)
return
}
pls.UpdateLastSeen(peer)
if (*debug) {
log.Println("ID:", peer, "RTT:", result.RTT)
}
// Get Gauge object with peer id as targetHost
pingGauge := pingGaugeVec.WithLabelValues(fmt.Sprint(peer), *hostname)
pingGauge.Set(float64(result.RTT) / 1000000) // Convert ns to ms
}
// collect pings all peers in the monitor node's Peerstore to collect
// performance data. For now it prints out what it finds
func collect(node *p2pnode.Node,
pingGaugeVec *prometheus.GaugeVec,
callback peerlastseen.PeerLastSeenCB) {
// Create PeerLastSeen with timeout of 10 minutes for peers (arbitrary at this point)
// TODO: Make the timeout configurable
pls, err := peerlastseen.NewPeerLastSeen(10 * time.Minute, callback)
if err != nil {
log.Fatalln(err)
}
// Loop infinitely
for {
// Per-round context, used to trigger ping goroutines to stop
ctx, cancel := context.WithTimeout(node.Ctx, time.Second)
// Get peer in Peerstore
for _, id := range node.Host.Network().Peers() {
if id == node.Host.ID() {
continue
}
go pingPeer(ctx, node, id, pingGaugeVec, pls)
}
<-ctx.Done()
cancel() // In case anyone isn't listening on Done()
}
}
func exportEWMAs(node *p2pnode.Node, ewmaGaugeVec *prometheus.GaugeVec) {
ticker := time.NewTicker(time.Second)
for {
<-ticker.C
for _, id := range node.Host.Network().Peers() {
if id == node.Host.ID() {
continue
}
// Get Gauge object with peer id as targetHost
ewmaGauge := ewmaGaugeVec.WithLabelValues(fmt.Sprint(id), *hostname)
ewmaGauge.Set(float64(node.Host.Peerstore().LatencyEWMA(id)) / 1000000) // Convert ns to ms
}
}
}
// Validates an IPv4 TCP or UDP endpoint address
func validateEndpoint(ep string) bool {
epSplit := strings.Split(ep, ":")
if len(epSplit) != 2 {
return false
}
ip, portStr := epSplit[0], epSplit[1]
// Validate IP
// Allow endpoints with just the port number (e.g. ":1234")
if ip != "" {
match, err := regexp.Match("[0-9.]", []byte(ip))
if match != true || err != nil {
return false
}
ipSplit := strings.Split(ip, ".")
if len(ipSplit) != 4 {
return false
}
for _, octetStr := range ipSplit {
octet, err := strconv.Atoi(octetStr)
if octet < 0 || octet > 255 || err != nil {
return false
}
}
}
// Validate port number
port, err := strconv.Atoi(portStr)
if err != nil {
return false
}
if port < 1 || port > 65535 {
return false
}
return true
}
func tcpEndpoint2MultiaddrStr(ep string) string {
if !validateEndpoint(ep) {
return ""
}
// Assume all checks on endpoint are done
epSplit := strings.Split(ep, ":")
ip, portStr := epSplit[0], epSplit[1]
if ip == "" {
ip = "0.0.0.0"
}
return fmt.Sprintf("/ip4/%s/tcp/%s", ip, portStr)
}
func main() {
var err error
var keyFlags util.KeyFlags
var bootstraps *[]multiaddr.Multiaddr
var psk *pnet.PSK
if keyFlags, err = util.AddKeyFlags(defaultKeyFile); err != nil {
log.Fatalln(err)
}
if bootstraps, err = util.AddBootstrapFlags(); err != nil {
log.Fatalln(err)
}
if psk, err = util.AddPSKFlag(); err != nil {
log.Fatalln(err)
}
flag.Parse()
// Default name as hostname
if *hostname == "" {
*hostname, err = os.Hostname()
if err != nil {
log.Fatalln(err)
}
}
// Set up Prometheus GaugeVec object for raw RTTs to other peers
pingGaugeVec := promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "ping_rtt",
Help: "Historical ping RTTs over time (ms)",
},
[]string{
"targetHost", // Specify ping target
"host", // Name of host running ping-exporter
},
)
// Set up Prometheus GaugeVec object EWMA RTTs to other peers
// The EWMA calculation is part of libp2p's PeerStore and the
// smoothing factor is hard-coded to 0.1.
ewmaGaugeVec := promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "ping_ewma_rtt",
Help: "Exponentially weighted moving average of ping RTTs over time (ms)",
},
[]string{
"targetHost", // Specify ping target
"host", // Name of host running ping-exporter
},
)
// Map Prometheus metrics scrape path to handler function
pMetricsPath := "/metrics" // Make configurable?
http.Handle(pMetricsPath, promhttp.Handler())
// Start server in separate goroutine
if !validateEndpoint(*promEndpoint) {
log.Fatalf("ERROR: Invalid listening address provided (%s)\n", *promEndpoint)
}
go http.ListenAndServe(*promEndpoint, nil)
// Create or load keys
priv, err := util.CreateOrLoadKey(keyFlags)
if err != nil {
log.Fatalln(err)
}
// Setup node configuration
config := p2pnode.NewConfig()
config.PrivKey = priv
config.PSK = *psk
if !validateEndpoint(*p2pEndpoint) {
log.Fatalf("ERROR: Invalid listening address provided (%s)\n", *p2pEndpoint)
}
config.ListenAddrs = append(config.ListenAddrs, tcpEndpoint2MultiaddrStr(*p2pEndpoint))
if len(*bootstraps) != 0 {
// This node should connect to a pre-existing bootstrap as part of
// a larger network. It should also still be a bootstrap itself.
config.BootstrapPeers = *bootstraps
}
// Create new node
node, err := p2pnode.NewNode(context.Background(), config)
if err != nil {
log.Fatalln(err)
}
// Print multiaddress (for copying and pasting to other services)
peerInfo := peer.AddrInfo{
ID: node.Host.ID(),
Addrs: node.Host.Addrs(),
}
addrs, err := peer.AddrInfoToP2pAddrs(&peerInfo)
log.Println("P2P addresses for this node:")
for _, addr := range addrs {
log.Println("\t", addr)
}
// Define callback for PeerLastSeen
expireMetrics := func(id peer.ID) {
// Delete Gauge objects
ok := pingGaugeVec.DeleteLabelValues(fmt.Sprint(id), *hostname)
if !ok {
log.Printf("Failed to delete ping gauge for ID:", id)
}
ok = ewmaGaugeVec.DeleteLabelValues(fmt.Sprint(id), *hostname)
if !ok {
log.Printf("Failed to delete EWMA gauge for ID:", id)
}
}
// Start background goroutine to export the EWMA metric in PeerStore
go exportEWMAs(&node, ewmaGaugeVec)
// Start pinging (each ping RTT sample will automatically re-calculate
// the EWMAs in the PeerStore). The call to collect() should not return.
log.Println("Starting RTT collection from peers")
collect(&node, pingGaugeVec, expireMetrics)
log.Fatalln(errors.New("Monitor node exitted monitor loop"))
}