/
statsd.go
595 lines (523 loc) · 16.1 KB
/
statsd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
// Copyright © 2017 Circonus, Inc. <support@circonus.com>
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//
package statsd
import (
"bufio"
"context"
"crypto/x509"
"io/ioutil"
"net"
"regexp"
"strconv"
"sync"
"time"
"github.com/circonus-labs/circonus-agent/internal/config"
"github.com/circonus-labs/circonus-agent/internal/config/defaults"
"github.com/circonus-labs/circonus-agent/internal/release"
"github.com/circonus-labs/circonus-agent/internal/tags"
cgm "github.com/circonus-labs/circonus-gometrics/v3"
"github.com/maier/go-appstats"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
)
// Server defines a statsd server
type Server struct {
disabled bool
enableUDPListener bool // NOTE: defaults to TRUE; uses !disabled (not really a separate option)
enableTCPListener bool // NOTE: defaults to FALSE
debugCGM bool
group *errgroup.Group
groupCtx context.Context
udpAddress *net.UDPAddr
tcpAddress *net.TCPAddr
hostMetrics *cgm.CirconusMetrics
hostMetricsmu sync.Mutex
groupMetrics *cgm.CirconusMetrics
groupMetricsmu sync.Mutex
logger zerolog.Logger
hostPrefix string
hostCategory string
groupCID string
groupPrefix string
groupCounterOp string
groupGaugeOp string
groupSetOp string
metricRegex *regexp.Regexp
metricRegexGroupNames []string
apiKey string
apiApp string
apiURL string
apiCAFile string
udpListener *net.UDPConn
tcpListener *net.TCPListener
tcpMaxConnections uint
tcpConnections map[string]*net.TCPConn
baseTags []string
sync.Mutex
}
const (
maxPacketSize = 1472
packetQueueSize = 1000
destHost = "host"
destGroup = "group"
destIgnore = "ignore"
)
// New returns a statsd server definition
func New(ctx context.Context) (*Server, error) {
s := Server{
disabled: viper.GetBool(config.KeyStatsdDisabled),
logger: log.With().Str("pkg", "statsd").Logger(),
}
if s.disabled {
s.logger.Info().Msg("disabled, not configuring")
return &s, nil
}
err := validateStatsdOptions()
if err != nil {
return nil, err
}
g, gctx := errgroup.WithContext(ctx)
s = Server{
group: g,
groupCtx: gctx,
disabled: viper.GetBool(config.KeyStatsdDisabled),
logger: log.With().Str("pkg", "statsd").Logger(),
hostPrefix: viper.GetString(config.KeyStatsdHostPrefix),
hostCategory: viper.GetString(config.KeyStatsdHostCategory),
groupCID: viper.GetString(config.KeyStatsdGroupCID),
groupPrefix: viper.GetString(config.KeyStatsdGroupPrefix),
groupCounterOp: viper.GetString(config.KeyStatsdGroupCounters),
groupGaugeOp: viper.GetString(config.KeyStatsdGroupGauges),
groupSetOp: viper.GetString(config.KeyStatsdGroupSets),
debugCGM: viper.GetBool(config.KeyDebugCGM),
apiKey: viper.GetString(config.KeyAPITokenKey),
apiApp: viper.GetString(config.KeyAPITokenApp),
apiURL: viper.GetString(config.KeyAPIURL),
apiCAFile: viper.GetString(config.KeyAPICAFile),
baseTags: tags.GetBaseTags(),
tcpConnections: map[string]*net.TCPConn{},
tcpMaxConnections: viper.GetUint(config.KeyStatsdMaxTCPConns),
}
s.enableUDPListener = !s.disabled
s.enableTCPListener = viper.GetBool(config.KeyStatsdEnableTCP)
s.baseTags = append(s.baseTags, []string{
"source:" + release.NAME,
"collector:statsd",
}...)
// standard statsd metric format supported (with addition of tags):
// name:value|type[|@rate][|#tag_list]
// where tag_list is comma separated list of <tag_category:tag_value> pairs
s.metricRegex = regexp.MustCompile(`^(?P<name>[^:\s]+):(?P<value>[^|\s]+)\|(?P<type>[a-z]+)(?:\|@(?P<sample>[0-9.]+))?(?:\|#(?P<tags>[^:,]+:[^:,]+(,[^:,]+:[^:,]+)*))?$`)
s.metricRegexGroupNames = s.metricRegex.SubexpNames()
if !s.disabled {
if ierr := s.initHostMetrics(); ierr != nil {
return nil, errors.Wrap(ierr, "initializing host metrics for StatsD")
}
if ierr := s.initGroupMetrics(); ierr != nil {
return nil, errors.Wrap(ierr, "initializing group metrics for StatsD")
}
}
addr := viper.GetString(config.KeyStatsdAddr)
if addr == "" {
addr = defaults.StatsdAddr
}
port := viper.GetString(config.KeyStatsdPort)
if port == "" {
port = defaults.StatsdPort
}
address := net.JoinHostPort(addr, port)
// UDP listening address
if s.enableUDPListener {
addr, err := net.ResolveUDPAddr("udp", address)
if err != nil {
return nil, errors.Wrapf(err, "resolving UDP address '%s'", address)
}
s.udpAddress = addr
}
// TCP listening address
if s.enableTCPListener {
addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return nil, errors.Wrapf(err, "resolving TCP address '%s'", address)
}
s.tcpAddress = addr
}
return &s, nil
}
// Start the StatsD service
func (s *Server) Start() error {
if s.disabled {
s.logger.Info().Msg("disabled, not starting listener")
return nil
}
if err := s.startUDP(); err != nil {
return errors.Wrap(err, "starting UDP listener")
}
if err := s.startTCP(); err != nil {
return errors.Wrap(err, "starting TCP listener")
}
packetCh := make(chan []byte, packetQueueSize)
if s.enableUDPListener && s.udpListener != nil {
s.group.Go(func() error {
s.logger.Debug().Msg("starting udp listener")
return s.udpReader(packetCh)
})
}
if s.enableTCPListener && s.tcpListener != nil {
s.group.Go(func() error {
s.logger.Debug().Msg("starting tcp listener")
return s.tcpHandler(packetCh)
})
}
s.group.Go(func() error {
s.logger.Debug().Msg("starting packet processor")
return s.processor(packetCh)
})
go func() {
s.logger.Debug().Msg("waiting for group")
_ = s.group.Wait()
close(packetCh)
// only try to flush group metrics since they go
// directly to a broker. there is no point in trying
// to flush host metrics as the 'server' portion of
// the agent may have already closed.
if s.groupMetrics != nil {
s.logger.Info().Msg("flushing group metrics")
s.groupMetricsmu.Lock()
s.groupMetrics.Flush()
s.groupMetricsmu.Unlock()
}
}()
return s.group.Wait()
}
// Flush *host* metrics only
// NOTE: group metrics flush independently to a different check via circonus-gometrics
func (s *Server) Flush() *cgm.Metrics {
if s.disabled {
return nil
}
s.hostMetricsmu.Lock()
defer s.hostMetricsmu.Unlock()
if s.hostMetrics == nil {
return &cgm.Metrics{}
}
return s.hostMetrics.FlushMetrics()
}
// startUDP the StatsD UDP listener
func (s *Server) startUDP() error {
if !s.enableUDPListener {
return nil
}
if s.udpAddress == nil {
return nil
}
l, err := net.ListenUDP("udp", s.udpAddress)
if err != nil {
return errors.Wrap(err, "starting statsd udp listener")
}
s.udpListener = l
return nil
}
// startTCP the StatsD TCP listener
func (s *Server) startTCP() error {
if !s.enableTCPListener {
return nil
}
if s.tcpAddress == nil {
return nil
}
l, err := net.ListenTCP("tcp", s.tcpAddress)
if err != nil {
return errors.Wrap(err, "starting statsd tcp listener")
}
s.tcpListener = l
return nil
}
// logshim is used to satisfy apiclient Logger interface (avoiding ptr receiver issue)
type logshim struct {
logh zerolog.Logger
}
func (l logshim) Printf(fmt string, v ...interface{}) {
l.logh.Printf(fmt, v...)
}
// initHostMetrics initializes the host metrics circonus-gometrics instance
func (s *Server) initHostMetrics() error {
s.hostMetricsmu.Lock()
defer s.hostMetricsmu.Unlock()
cmc := &cgm.Config{
Debug: s.debugCGM,
Log: logshim{logh: s.logger.With().Str("pkg", "cgm.statsd-host-check").Logger()},
}
// put cgm into manual mode (no interval, no api key, invalid submission url)
cmc.Interval = "0" // disable automatic flush
cmc.CheckManager.Check.SubmissionURL = "none" // disable check management (create/update)
hm, err := cgm.NewCirconusMetrics(cmc)
if err != nil {
return errors.Wrap(err, "statsd host check")
}
s.hostMetrics = hm
s.logger.Info().Msg("host check initialized")
return nil
}
// initGroupMetrics initializes the group metric circonus-gometrics instance
// NOTE: Group metrics are sent directly to circonus, to an existing HTTPTRAP
// check created manually or by cosi - the group check is intended to be
// used by multiple systems.
func (s *Server) initGroupMetrics() error {
if s.groupCID == "" {
s.logger.Info().Msg("group check disabled")
return nil
}
s.groupMetricsmu.Lock()
defer s.groupMetricsmu.Unlock()
cmc := &cgm.Config{
Debug: s.debugCGM,
Log: logshim{logh: s.logger.With().Str("pkg", "cgm.statsd-group-check").Logger()},
}
cmc.CheckManager.API.TokenKey = s.apiKey
cmc.CheckManager.API.TokenApp = s.apiApp
cmc.CheckManager.API.URL = s.apiURL
cmc.CheckManager.Check.ID = s.groupCID
if s.apiCAFile != "" {
cert, err := ioutil.ReadFile(s.apiCAFile)
if err != nil {
return err
}
cp := x509.NewCertPool()
if !cp.AppendCertsFromPEM(cert) {
return errors.Errorf("using api CA cert %#v", cert)
}
cmc.CheckManager.API.CACert = cp
}
gm, err := cgm.NewCirconusMetrics(cmc)
if err != nil {
return errors.Wrap(err, "statsd group check")
}
s.groupMetrics = gm
s.logger.Info().Msg("group check initialized")
return nil
}
// udpReader reads packets from the statsd udp listener, adds packets recevied to the queue
func (s *Server) udpReader(packetCh chan<- []byte) error {
for {
if s.done() {
return nil
}
buff := make([]byte, maxPacketSize)
n, err := s.udpListener.Read(buff)
if err != nil {
s.logger.Warn().Err(err).Msg("udp reader")
continue
}
if n > 0 {
_ = appstats.IncrementInt("statsd_packets_total")
pkt := make([]byte, n)
copy(pkt, buff[:n])
packetCh <- pkt
}
}
}
// tcpHandler reads packets from the statsd tcp listener, adds packets recevied to the queue
func (s *Server) tcpHandler(packetCh chan<- []byte) error {
for {
if s.done() {
return nil
}
conn, err := s.tcpListener.AcceptTCP()
if err != nil {
s.logger.Warn().Err(err).Msg("accepting tcp connection")
continue
}
s.Lock()
if uint(len(s.tcpConnections)) > s.tcpMaxConnections {
s.tcpRefuseConnection(conn)
s.Unlock()
continue
}
s.Unlock()
s.tcpAddConnection(conn)
go func(conn *net.TCPConn) {
if err := conn.SetDeadline(time.Now().Add(10 * time.Second)); err != nil {
s.logger.Warn().Err(err).Msg("setting statsd tcp connection deadline")
}
if err := s.tcpReader(conn, packetCh); err != nil {
s.logger.Warn().Err(err).Msg("handling tcp connection")
}
}(conn)
}
}
// tcpReader reads packets from the statsd tcp listener, adds packets recevied to the queue
func (s *Server) tcpReader(conn *net.TCPConn, packetCh chan<- []byte) error {
addr := conn.RemoteAddr().String()
defer func() {
s.logger.Debug().Str("remote", addr).Msg("closing statsd tcp connection")
conn.Close()
s.tcpRemoveConnection(addr)
}()
for {
if s.done() {
return nil
}
scanner := bufio.NewScanner(conn)
for scanner.Scan() {
_ = appstats.IncrementInt("statsd_packets_total")
packetCh <- scanner.Bytes()
}
if s.done() {
return nil
}
if err := scanner.Err(); err != nil {
s.logger.Debug().Err(err).Str("remote", addr).Msg("statsd tcp conn scanner error")
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
s.logger.Debug().Err(nerr).Str("remote", addr).Msg("resetting deadline")
if derr := conn.SetDeadline(time.Now().Add(10 * time.Second)); derr != nil {
return derr
}
continue
}
return err
}
}
}
// tcpRefuseConnection refuses a tcp client connection and logs the event
func (s *Server) tcpRefuseConnection(conn *net.TCPConn) {
conn.Close()
s.logger.Warn().Str("remote", conn.RemoteAddr().String()).Msg("max tcp client connections reached, refusing new connection attempt")
}
// tcpAddConnection tracks tcp client connections
func (s *Server) tcpAddConnection(conn *net.TCPConn) {
s.Lock()
s.tcpConnections[conn.RemoteAddr().String()] = conn
s.Unlock()
}
// tcpRemoveConnection removes a tracked tcp client connection from tracking list
func (s *Server) tcpRemoveConnection(id string) {
s.Lock()
delete(s.tcpConnections, id)
s.Unlock()
}
// processor reads the packet queue and processes each packet
func (s *Server) processor(packetCh <-chan []byte) error {
for {
select {
case <-s.groupCtx.Done():
if s.udpListener != nil {
s.logger.Debug().Msg("closing udp listener")
s.udpListener.Close()
}
if s.tcpListener != nil {
s.Lock()
s.logger.Debug().Msg("closing tcp listener")
s.tcpListener.Close()
if len(s.tcpConnections) > 0 {
s.logger.Debug().Msg("closing tcp connections")
var connList []*net.TCPConn
for _, conn := range s.tcpConnections {
connList = append(connList, conn)
}
for _, conn := range connList {
conn.Close()
}
}
s.Unlock()
}
return nil
case pkt := <-packetCh:
s.processPacket(pkt)
}
}
}
// done checks whether context is done
func (s *Server) done() bool {
select {
case <-s.groupCtx.Done():
return true
default:
return false
}
}
func validateStatsdOptions() error {
if viper.GetBool(config.KeyStatsdDisabled) {
return nil
}
port := viper.GetString(config.KeyStatsdPort)
if port == "" {
return errors.New("invalid StatsD port (empty)")
}
if ok, err := regexp.MatchString("^[0-9]+$", port); err != nil {
return errors.Wrapf(err, "invalid StatsD port (%s)", port)
} else if !ok {
return errors.Errorf("invalid StatsD port (%s)", port)
}
if pnum, err := strconv.ParseUint(port, 10, 32); err != nil {
return errors.Wrap(err, "invalid StatsD port")
} else if pnum < 1024 || pnum > 65535 {
return errors.Errorf("invalid StatsD port 1024>%s<65535", port)
}
// can be empty (all metrics go to host)
// validate further if group check is enabled (see groupPrefix validation below)
hostPrefix := viper.GetString(config.KeyStatsdHostPrefix)
hostCat := viper.GetString(config.KeyStatsdHostCategory)
if hostCat == "" {
return errors.New("invalid StatsD host category (empty)")
}
groupCID := viper.GetString(config.KeyStatsdGroupCID)
if groupCID == "" {
return nil // statsd group check support disabled, all metrics go to host
}
if groupCID == "cosi" {
cid, err := config.LoadCosiCheckID("group")
if err != nil {
return err
}
groupCID = cid
viper.Set(config.KeyStatsdGroupCID, groupCID)
}
ok, err := config.IsValidCheckID(groupCID)
if err != nil {
return errors.Wrap(err, "validating StatsD Group Check ID")
}
if !ok {
return errors.Errorf("invalid StatsD Group Check ID (%s)", groupCID)
}
groupPrefix := viper.GetString(config.KeyStatsdGroupPrefix)
if hostPrefix == "" && groupPrefix == "" {
return errors.New("invalid StatsD host/group prefix (both empty)")
}
if hostPrefix == groupPrefix {
return errors.New("invalid StatsD host/group prefix (same)")
}
counterOp := viper.GetString(config.KeyStatsdGroupCounters)
if counterOp == "" {
return errors.New("invalid StatsD counter operator (empty)")
}
if ok, err := regexp.MatchString("^(average|sum)$", counterOp); err != nil {
return errors.Wrapf(err, "invalid StatsD counter operator (%s)", counterOp)
} else if !ok {
return errors.Errorf("invalid StatsD counter operator (%s)", counterOp)
}
gaugeOp := viper.GetString(config.KeyStatsdGroupGauges)
if gaugeOp == "" {
return errors.New("invalid StatsD gauge operator (empty)")
}
if ok, err := regexp.MatchString("^(average|sum)$", gaugeOp); err != nil {
return errors.Wrapf(err, "invalid StatsD gauge operator (%s)", gaugeOp)
} else if !ok {
return errors.Errorf("invalid StatsD gauge operator (%s)", gaugeOp)
}
setOp := viper.GetString(config.KeyStatsdGroupSets)
if setOp == "" {
return errors.New("invalid StatsD set operator (empty)")
}
if ok, err := regexp.MatchString("^(average|sum)$", setOp); err != nil {
return errors.Wrapf(err, "invalid StatsD set operator (%s)", setOp)
} else if !ok {
return errors.Errorf("invalid StatsD set operator (%s)", setOp)
}
return nil
}