/
stats.go
398 lines (340 loc) · 11.9 KB
/
stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
package stats
import (
"context"
"fmt"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"
"github.com/Comcast/Ravel/pkg/types"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)
// Statistics collection for BGP load balancers. This would work for any load balancer VIP, really.
// Once a Stats is started it doesn't stop.
// 1. we need a way to receive the set of expected vip,port tuples
// 2. we need a way to periodically emit statistics about these tuples
type Stats struct {
sync.Mutex
// map of IP address to port to counters.
counters map[gopacket.Endpoint]map[gopacket.Endpoint]*counters
target string // statsd service address
freq float64
interval *time.Ticker // how often to send statistics
device string // eth device to read packets from. (probably lo)
kind LBKind // bgp, ipvs
configChan chan *types.ClusterConfig
pcap *pcap.Handle
prometheusPort string
flowMetrics *flowMetrics
flowMetricsEnabled bool
ctx context.Context
logger log.FieldLogger
}
// Public Interface
// ================================================================================
func (s *Stats) EnableBPFStats() error {
// The 1600 will have to change if we go to Jumbo Frames or something.
if handle, err := pcap.OpenLive(s.device, 1600, false, pcap.BlockForever); err != nil {
return fmt.Errorf("unable to instantiate pcap on device %s: %v", s.device, err)
} else if err := handle.SetBPFFilter("tcp or udp"); err != nil {
return fmt.Errorf("unable to set pcap filters. %v", err)
} else {
s.pcap = handle
}
go s.initMetrics()
go s.capture()
s.flowMetricsEnabled = true
return nil
}
// Private Interface
// ================================================================================
// make valid prometheus label out of labels
// "-" char throws invalid metric name panic; replace with "_"
func newLabel(namespace, service, name string) string {
join := strings.Join([]string{namespace, service, name}, "_")
return strings.Replace(join, "-", "_", -1)
}
// captureFlowStatistics aggregates all of the data from the flow counters and transfers that data into
// the prometheus metrics values for delivery via the prometheus endpoint.
func (s *Stats) captureFlowStatistics() {
if !s.flowMetricsEnabled {
return
}
// get, and clear all of the counters.
for ip, p := range s.counters {
for port, stats := range p {
var protocol string
ipStr := ip.String()
portStr := port.String()
if stats.IsTCP {
tx := stats.GetTCPTx()
rx := stats.GetTCPRx()
sa := stats.GetTCPSynAck()
fin := stats.GetTCPFin()
rst := stats.GetTCPRst()
protocol = "TCP"
s.flowMetrics.tx(ipStr, portStr, protocol, stats.Namespace, stats.PortName, stats.Service, tx)
s.flowMetrics.rx(ipStr, portStr, protocol, stats.Namespace, stats.PortName, stats.Service, rx)
s.flowMetrics.tcpState(ipStr, portStr, stateSynAck, protocol, stats.Namespace, stats.PortName, stats.Service, sa)
s.flowMetrics.tcpState(ipStr, portStr, stateFin, protocol, stats.Namespace, stats.PortName, stats.Service, fin)
s.flowMetrics.tcpState(ipStr, portStr, stateRst, protocol, stats.Namespace, stats.PortName, stats.Service, rst)
// print
s.logger.Debugf("prometheus tcp scrape: ns=%s svc=%s port=%s addr=%v:%v prot=tcp tx=%d rx=%d synack=%d fin=%d rst=%d",
stats.Namespace, stats.Service, stats.PortName, ip, port, tx, rx, sa, fin, rst)
} else {
tx := stats.GetUDPTx()
rx := stats.GetUDPRx()
protocol = "UDP"
s.flowMetrics.tx(ipStr, portStr, protocol, stats.Namespace, stats.PortName, stats.Service, tx)
s.flowMetrics.rx(ipStr, portStr, protocol, stats.Namespace, stats.PortName, stats.Service, rx)
}
}
}
}
func NewStats(ctx context.Context, kind LBKind, device, statsHost, prometheusPort string, freq time.Duration, logger logrus.FieldLogger) (*Stats, error) {
s := &Stats{
kind: kind,
target: statsHost,
device: device,
configChan: make(chan *types.ClusterConfig),
freq: freq.Seconds(),
interval: time.NewTicker(freq),
counters: map[gopacket.Endpoint]map[gopacket.Endpoint]*counters{},
prometheusPort: prometheusPort,
ctx: ctx,
logger: logger,
}
go s.run()
if err := s.startServer(); err != nil {
return nil, err
}
return s, nil
}
func (s *Stats) UpdateConfig(c *types.ClusterConfig) error {
// log.Debugln("Stats saw an UpdateConfig call")
select {
case s.configChan <- c:
default:
return fmt.Errorf("stats reconfiguration channel is full")
}
return nil
}
func (s *Stats) run() {
log.Debugln("Stats counter starting up")
defer s.interval.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-s.interval.C:
s.captureFlowStatistics()
case newConfig := <-s.configChan:
// log.Debugln("new configuration inbound")
s.loadConfiguration(newConfig)
}
}
}
// loadConfiguration takes a ClusterConfig and populates a set of
// VIP, Port tuples for use in the internal pcap capture mechanism
func (s *Stats) loadConfiguration(c *types.ClusterConfig) error {
// s.logger.Debugf("loading new configuration")
s.Lock()
defer s.Unlock()
// traverse the config and generate the counters map.
// IP addresses will be captured in ipset and the berkeley packet filter will
// be set to filter traffic to *only* traffic on the designated VIP interfaces.
ipset := []string{}
for ipRaw, portMap := range c.Config {
// log.Debugln("Loading configuration for VIP with IP:", ipRaw)
ip := layers.NewIPEndpoint(net.ParseIP(string(ipRaw)))
var ip6 gopacket.Endpoint
var has6 bool
if ip6Raw, ok := c.IPV6[ipRaw]; ok {
// log.Debugln("VIP with IP", ipRaw, "has IPV6 enabled")
has6 = true
ip6 = layers.NewIPEndpoint(net.ParseIP(string(ip6Raw)))
ipset = append(ipset, string(ip6Raw))
}
ipset = append(ipset, string(ipRaw))
for portRaw, cfg := range portMap {
// log.Debugln("VIP with IP", ipRaw, "has port", portRaw)
p, _ := strconv.Atoi(portRaw)
tport := layers.NewTCPPortEndpoint(layers.TCPPort(p))
uport := layers.NewUDPPortEndpoint(layers.UDPPort(p))
if _, ok := s.counters[ip]; !ok {
s.counters[ip] = map[gopacket.Endpoint]*counters{}
}
if _, ok := s.counters[ip6]; !ok && has6 {
s.counters[ip6] = map[gopacket.Endpoint]*counters{}
}
if _, ok := s.counters[ip][tport]; !ok {
s.counters[ip][tport] = NewCounters(cfg.Namespace, cfg.Service, cfg.PortName, true)
if has6 {
s.counters[ip6][tport] = NewCounters(cfg.Namespace, cfg.Service, cfg.PortName, true)
}
}
if _, ok := s.counters[ip][uport]; !ok {
s.counters[ip][uport] = NewCounters(cfg.Namespace, cfg.Service, cfg.PortName, false)
if has6 {
s.counters[ip6][uport] = NewCounters(cfg.Namespace, cfg.Service, cfg.PortName, false)
}
}
}
}
// set the BPF filter
// log.Debugln("ip set: %v", ipset)
return s.setBPFFilter(ipset)
}
// setBPFFilter takes a list of ip addresses and sets the berkely packet filter
// in our pcap to filter traffic to just those addresses. This prevents the pcap from
// needing to process 100% of the tcp and udp traffic on a node.
func (s *Stats) setBPFFilter(ips []string) error {
if !s.flowMetricsEnabled {
return nil
}
filters := strings.Join(ips, " or ")
return s.pcap.SetBPFFilter(fmt.Sprintf("(tcp or udp) and (%s)", filters))
}
func (s *Stats) capture() {
// Fast parsing approach - reuse the same layers every time.
var eth layers.Ethernet
var ip4 layers.IPv4
var ip6 layers.IPv6
var tcp layers.TCP
var udp layers.UDP
parser := gopacket.NewDecodingLayerParser(layers.LayerTypeEthernet, ð, &ip4, &ip6, &tcp, &udp)
decoded := []gopacket.LayerType{}
for {
data, ci, err := s.pcap.ReadPacketData()
// ReadPacketData() will give data []byte the underlying buffer
// that the C language PCAP library uses. The Go runtime won't know about that
// memory. Since var data []byte doesn't escape this for-loop, much less func capture(),
// it's allocated on the stack, and isn't eligible for garbage collection. I think.
// That means the the memory that data []byte really uses isn't garbage collected either.
// Since this function, func capture() is run by only one go routine, there shouldn't be
// an issue with race conditions for the C PCAP library buffer.
if err != nil {
// shouldn't happen but we'll quit another way.
continue
}
parser.DecodeLayers(data, &decoded)
if len(decoded) != 3 {
// icmp messages or weird ipv6 things
continue
} else if layers.LayerTypeTCP == decoded[2] {
if layers.LayerTypeIPv6 == decoded[1] {
if stats, ok := s.getCountersAndIncrement(ci.CaptureLength, ip6.SrcIP, ip6.DstIP, tcp.SrcPort, tcp.DstPort); ok {
s.metricTCP(stats, tcp)
}
} else if layers.LayerTypeIPv4 == decoded[1] {
if stats, ok := s.getCountersAndIncrement(ci.CaptureLength, ip4.SrcIP, ip4.DstIP, tcp.SrcPort, tcp.DstPort); ok {
s.metricTCP(stats, tcp)
}
}
}
}
}
// initMetrics initialize the prometheus flowMetrics stats handlers + server
func (s *Stats) initMetrics() error {
// initialize all the stats
s.flowMetrics = newFlowMetrics(s.kind)
return nil
}
func (s *Stats) startServer() error {
s.logger.Infof("starting metrics server on: %v", s.prometheusPort)
// we start the server async, but add a tiem delay in the code below in order to catch errors
// quickly. this will help to prevent configuration errors where the stats port is invalid.
errs := make(chan error)
http.Handle("/metrics", promhttp.Handler())
go func() {
err := http.ListenAndServe(fmt.Sprintf(":%s", s.prometheusPort), nil)
if err != nil {
s.logger.Errorf("prometheus stats server could not be initialized on port %s: %s", s.prometheusPort, err.Error())
}
errs <- err
}()
select {
case err := <-errs:
return fmt.Errorf("prometheus stats server could not be initialized on port %s: %s", s.prometheusPort, err.Error())
case <-time.After(3 * time.Second):
// break out after N seconds
}
return nil
}
func (s *Stats) getCountersAndIncrement(i int, srcIP, dstIP net.IP, sp, dp interface{}) (*counters, bool) {
n := uint64(i)
isTCP := true
src := layers.NewIPEndpoint(srcIP)
dst := layers.NewIPEndpoint(dstIP)
var srcPort, dstPort gopacket.Endpoint
switch sp.(type) {
case layers.TCPPort:
srcPort = layers.NewTCPPortEndpoint(sp.(layers.TCPPort))
dstPort = layers.NewTCPPortEndpoint(dp.(layers.TCPPort))
case layers.UDPPort:
srcPort = layers.NewUDPPortEndpoint(sp.(layers.UDPPort))
dstPort = layers.NewUDPPortEndpoint(dp.(layers.UDPPort))
isTCP = false
default:
s.logger.Debugf("fallthrough on source port type detection.")
return nil, false
}
var outStats *counters
var found bool
s.Lock()
defer s.Unlock()
if pm, ok := s.counters[dst]; ok {
// this is receive traffic
if stats, ok := pm[dstPort]; ok {
found = true
outStats = stats
if isTCP {
stats.AddTCPRx(n)
} else {
stats.AddUDPRx(n)
}
}
} else if pm, ok = s.counters[src]; ok {
// this is transmit traffic
if stats, ok := pm[srcPort]; ok {
found = true
outStats = stats
if isTCP {
stats.AddTCPTx(n)
} else {
stats.AddUDPTx(n)
}
}
}
return outStats, found
}
func (s *Stats) metricTCP(stats *counters, tcp layers.TCP) {
// count handshake, fin, resets, and congestion window messages
// this is an if/elseif block because each of these messages is
// believed to be mutually exclusive, i.e. the tcp implemenation
// would not send a RST packet at the same time as a FIN packet.
if tcp.SYN && tcp.ACK {
stats.IncrTCPSynAck()
} else if tcp.FIN {
stats.IncrTCPFin()
} else if tcp.RST {
stats.IncrTCPRst()
}
}
func clean(ip string) string {
return strings.Replace(ip, ":", "_", -1)
}
// increment a counter for a TCP/UDP state event with the following labels:
/*
stateEvent: the state event type (syn, ack, fin, tx, rx)
protocol: are we TCP or UDP
namespace, port name and service name of the service being load balanced
value: the float num we are incrementing
*/