-
Notifications
You must be signed in to change notification settings - Fork 11.7k
/
sender.go
200 lines (165 loc) · 5.37 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
package sender
import (
"context"
"net/url"
"strings"
"sync"
"time"
"github.com/grafana/grafana/pkg/infra/log"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/client_golang/prometheus"
common_config "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/pkg/labels"
)
const (
defaultMaxQueueCapacity = 10000
defaultTimeout = 10 * time.Second
)
// Sender is responsible for dispatching alert notifications to an external Alertmanager service.
type Sender struct {
logger log.Logger
wg sync.WaitGroup
manager *notifier.Manager
sdCancel context.CancelFunc
sdManager *discovery.Manager
}
func New(_ *metrics.Scheduler) (*Sender, error) {
l := log.New("sender")
sdCtx, sdCancel := context.WithCancel(context.Background())
s := &Sender{
logger: l,
sdCancel: sdCancel,
}
s.manager = notifier.NewManager(
// Injecting a new registry here means these metrics are not exported.
// Once we fix the individual Alertmanager metrics we should fix this scenario too.
¬ifier.Options{QueueCapacity: defaultMaxQueueCapacity, Registerer: prometheus.NewRegistry()},
s.logger,
)
s.sdManager = discovery.NewManager(sdCtx, s.logger)
return s, nil
}
// ApplyConfig syncs a configuration with the sender.
func (s *Sender) ApplyConfig(cfg *ngmodels.AdminConfiguration) error {
notifierCfg, err := buildNotifierConfig(cfg)
if err != nil {
return err
}
if err := s.manager.ApplyConfig(notifierCfg); err != nil {
return err
}
sdCfgs := make(map[string]discovery.Configs)
for k, v := range notifierCfg.AlertingConfig.AlertmanagerConfigs.ToMap() {
sdCfgs[k] = v.ServiceDiscoveryConfigs
}
return s.sdManager.ApplyConfig(sdCfgs)
}
func (s *Sender) Run() {
s.wg.Add(2)
go func() {
if err := s.sdManager.Run(); err != nil {
s.logger.Error("failed to start the sender service discovery manager", "err", err)
}
s.wg.Done()
}()
go func() {
s.manager.Run(s.sdManager.SyncCh())
s.wg.Done()
}()
}
// SendAlerts sends a set of alerts to the configured Alertmanager(s).
func (s *Sender) SendAlerts(alerts apimodels.PostableAlerts) {
if len(alerts.PostableAlerts) == 0 {
s.logger.Debug("no alerts to send to external Alertmanager(s)")
return
}
as := make([]*notifier.Alert, 0, len(alerts.PostableAlerts))
for _, a := range alerts.PostableAlerts {
na := alertToNotifierAlert(a)
as = append(as, na)
}
s.logger.Debug("sending alerts to the external Alertmanager(s)", "am_count", len(s.manager.Alertmanagers()), "alert_count", len(as))
s.manager.Send(as...)
}
// Stop shuts down the sender.
func (s *Sender) Stop() {
s.sdCancel()
s.manager.Stop()
s.wg.Wait()
}
// Alertmanagers returns a list of the discovered Alertmanager(s).
func (s *Sender) Alertmanagers() []*url.URL {
return s.manager.Alertmanagers()
}
// DroppedAlertmanagers returns a list of Alertmanager(s) we no longer send alerts to.
func (s *Sender) DroppedAlertmanagers() []*url.URL {
return s.manager.DroppedAlertmanagers()
}
func buildNotifierConfig(cfg *ngmodels.AdminConfiguration) (*config.Config, error) {
amConfigs := make([]*config.AlertmanagerConfig, 0, len(cfg.Alertmanagers))
for _, amURL := range cfg.Alertmanagers {
u, err := url.Parse(amURL)
if err != nil {
return nil, err
}
sdConfig := discovery.Configs{
discovery.StaticConfig{
{
Targets: []model.LabelSet{{model.AddressLabel: model.LabelValue(u.Host)}},
},
},
}
amConfig := &config.AlertmanagerConfig{
APIVersion: config.AlertmanagerAPIVersionV2,
Scheme: u.Scheme,
PathPrefix: u.Path,
Timeout: model.Duration(defaultTimeout),
ServiceDiscoveryConfigs: sdConfig,
}
// Check the URL for basic authentication information first
if u.User != nil {
amConfig.HTTPClientConfig.BasicAuth = &common_config.BasicAuth{
Username: u.User.Username(),
}
if password, isSet := u.User.Password(); isSet {
amConfig.HTTPClientConfig.BasicAuth.Password = common_config.Secret(password)
}
}
amConfigs = append(amConfigs, amConfig)
}
notifierConfig := &config.Config{
AlertingConfig: config.AlertingConfig{
AlertmanagerConfigs: amConfigs,
},
}
return notifierConfig, nil
}
func alertToNotifierAlert(alert models.PostableAlert) *notifier.Alert {
ls := make(labels.Labels, 0, len(alert.Alert.Labels))
a := make(labels.Labels, 0, len(alert.Annotations))
// Prometheus does not allow spaces in labels or annotations while Grafana does, we need to make sure we
// remove them before sending the alerts.
for k, v := range alert.Alert.Labels {
ls = append(ls, labels.Label{Name: removeSpaces(k), Value: v})
}
for k, v := range alert.Annotations {
a = append(a, labels.Label{Name: removeSpaces(k), Value: v})
}
return ¬ifier.Alert{
Labels: ls,
Annotations: a,
StartsAt: time.Time(alert.StartsAt),
EndsAt: time.Time(alert.EndsAt),
GeneratorURL: alert.Alert.GeneratorURL.String(),
}
}
func removeSpaces(labelName string) string {
return strings.Join(strings.Fields(labelName), "")
}