-
-
Notifications
You must be signed in to change notification settings - Fork 65
/
statsd_writer.go
146 lines (115 loc) · 2.79 KB
/
statsd_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package metrics
import (
"fmt"
"log/slog"
"strings"
"sync"
"github.com/smira/go-statsd"
)
type StatsdConfig struct {
Host string
Prefix string
TagFormat string
MaxPacketSize int
}
type StatsdLogger struct {
log *slog.Logger
}
func (lg *StatsdLogger) Printf(msg string, args ...interface{}) {
msg = strings.TrimPrefix(msg, "[STATSD] ")
// Statsd only prints errors and warnings
if strings.Contains(msg, "Error") {
lg.log.Error(fmt.Sprintf(msg, args...))
} else {
lg.log.Warn(fmt.Sprintf(msg, args...))
}
}
func NewStatsdConfig() StatsdConfig {
return StatsdConfig{Prefix: "anycable_go.", MaxPacketSize: 1400, TagFormat: "datadog"}
}
func (c StatsdConfig) Enabled() bool {
return c.Host != ""
}
type StatsdWriter struct {
client *statsd.Client
config StatsdConfig
tags map[string]string
log *slog.Logger
mu sync.Mutex
}
var _ IntervalWriter = (*StatsdWriter)(nil)
func NewStatsdWriter(c StatsdConfig, tags map[string]string, l *slog.Logger) *StatsdWriter {
return &StatsdWriter{config: c, tags: tags, log: l}
}
func (sw *StatsdWriter) Run(interval int) error {
sl := StatsdLogger{sw.log.With("service", "statsd")}
opts := []statsd.Option{
statsd.MaxPacketSize(sw.config.MaxPacketSize),
statsd.MetricPrefix(sw.config.Prefix),
statsd.Logger(&sl),
}
var tagsInfo string
if sw.tags != nil {
tagsStyle, err := resolveTagsStyle(sw.config.TagFormat)
if err != nil {
return err
}
tags := convertTags(sw.tags)
opts = append(opts,
statsd.TagStyle(tagsStyle),
statsd.DefaultTags(tags...),
)
tagsInfo = fmt.Sprintf(", tags=%v, style=%s", sw.tags, sw.config.TagFormat)
}
sw.client = statsd.NewClient(
sw.config.Host,
opts...,
)
sw.log.Info(
fmt.Sprintf(
"Send statsd metrics to %s with every %vs (prefix=%s%s)",
sw.config.Host, interval, sw.config.Prefix, tagsInfo,
),
)
return nil
}
func (sw *StatsdWriter) Stop() {
sw.mu.Lock()
defer sw.mu.Unlock()
sw.client.Close()
sw.client = nil
}
func (sw *StatsdWriter) Write(m *Metrics) error {
sw.mu.Lock()
defer sw.mu.Unlock()
if sw.client == nil {
return nil
}
m.EachCounter(func(counter *Counter) {
sw.client.Incr(counter.Name(), int64(counter.IntervalValue()))
})
m.EachGauge(func(gauge *Gauge) {
sw.client.Gauge(gauge.Name(), int64(gauge.Value()))
})
return nil
}
func resolveTagsStyle(name string) (*statsd.TagFormat, error) {
switch name {
case "datadog":
return statsd.TagFormatDatadog, nil
case "influxdb":
return statsd.TagFormatInfluxDB, nil
case "graphite":
return statsd.TagFormatGraphite, nil
}
return nil, fmt.Errorf("unknown StatsD tags format: %s", name)
}
func convertTags(tags map[string]string) []statsd.Tag {
buf := make([]statsd.Tag, len(tags))
i := 0
for k, v := range tags {
buf[i] = statsd.StringTag(k, v)
i++
}
return buf
}