-
Notifications
You must be signed in to change notification settings - Fork 485
/
manager.go
257 lines (214 loc) · 7.72 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
package integrations
import (
"context"
"fmt"
"path/filepath"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gorilla/mux"
"github.com/grafana/agent/pkg/integrations/agent"
integrationCfg "github.com/grafana/agent/pkg/integrations/config"
"github.com/grafana/agent/pkg/integrations/node_exporter"
"github.com/grafana/agent/pkg/prom/ha"
"github.com/grafana/agent/pkg/prom/instance"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
)
var (
integrationAbnormalExits = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "agent_prometheus_integration_abnormal_exits_total",
Help: "Total number of times an agent integration exited unexpectedly, causing it to be restarted.",
}, []string{"integration_name"})
)
var (
DefaultConfig = Config{
IntegrationRestartBackoff: 5 * time.Second,
UseHostnameLabel: true,
}
)
// Config holds the configuration for all integrations.
type Config struct {
// When true, adds an agent_hostname label to all samples from integrations.
UseHostnameLabel bool `yaml:"use_hostname_label"`
Agent agent.Config `yaml:"agent"`
NodeExporter node_exporter.Config `yaml:"node_exporter"`
// Extra labels to add for all integration samples
Labels model.LabelSet `yaml:"labels"`
// Prometheus RW configs to use for all integrations.
PrometheusRemoteWrite []*config.RemoteWriteConfig `yaml:"prometheus_remote_write,omitempty"`
IntegrationRestartBackoff time.Duration `yaml:"integration_restart_backoff,omitempty"`
// ListenPort tells the integration Manager which port the Agent is
// listening on for generating Prometheus instance configs.
ListenPort *int `yaml:"-"`
}
// UnmarshalYAML implements yaml.Unmarshaler for Config.
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultConfig
type plain Config
return unmarshal((*plain)(c))
}
type Integration interface {
// Name returns the name of the integration. Each registered integration must
// have a unique name.
Name() string
// CommonConfig returns the set of common configuration values present across
// all integrations.
CommonConfig() integrationCfg.Common
// RegisterRoutes should register any HTTP handlers used for the integration.
//
// The router provided to RegisterRoutes is a subrouter for the path
// /integrations/<integration name>. All routes should register to the
// relative root path and will be automatically combined to the subroute. For
// example, if a metric "database" registers a /metrics endpoint, it will
// be exposed as /integrations/database/metrics.
RegisterRoutes(r *mux.Router) error
// ScrapeConfigs should return a set of integration scrape configs that inform
// the integration how samples should be collected.
ScrapeConfigs() []integrationCfg.ScrapeConfig
// Run should start the integration and do any required tasks. Run should *not*
// exit until context is canceled. If an integration doesn't need to do anything,
// it should simply wait for ctx to be canceled.
Run(ctx context.Context) error
}
// Manager manages a set of integrations and runs them.
type Manager struct {
c Config
logger log.Logger
integrations []Integration
hostname string
im ha.InstanceManager
cancel context.CancelFunc
done chan bool
}
// NewManager creates a new integrations manager. NewManager must be given an
// InstanceManager which is responsible for accepting instance configs to
// scrape and send metrics from running integrations.
func NewManager(c Config, logger log.Logger, im ha.InstanceManager) (*Manager, error) {
var integrations []Integration
if c.Agent.Enabled {
integrations = append(integrations, agent.New(c.Agent))
}
if c.NodeExporter.Enabled {
l := log.With(logger, "integration", "node_exporter")
i, err := node_exporter.New(l, c.NodeExporter)
if err != nil {
return nil, err
}
integrations = append(integrations, i)
}
return newManager(c, logger, im, integrations)
}
func newManager(c Config, logger log.Logger, im ha.InstanceManager, integrations []Integration) (*Manager, error) {
ctx, cancel := context.WithCancel(context.Background())
m := &Manager{
c: c,
logger: logger,
integrations: integrations,
im: im,
cancel: cancel,
done: make(chan bool),
}
if c.UseHostnameLabel {
var err error
m.hostname, err = instance.Hostname()
if err != nil {
return nil, err
}
}
go m.run(ctx)
return m, nil
}
func (m *Manager) run(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(len(m.integrations))
for _, i := range m.integrations {
go func(i Integration) {
m.runIntegration(ctx, i)
wg.Done()
}(i)
}
wg.Wait()
close(m.done)
}
func (m *Manager) runIntegration(ctx context.Context, i Integration) {
// Apply the config so an instance is launched to scrape our integration.
instanceConfig := m.instanceConfigForIntegration(i)
if err := m.im.ApplyConfig(instanceConfig); err != nil {
level.Error(m.logger).Log("msg", "failed to apply integration. integration will not run. THIS IS A BUG!", "err", err, "integration", i.Name())
return
}
for {
err := i.Run(ctx)
if err != nil && err != context.Canceled {
integrationAbnormalExits.WithLabelValues(i.Name()).Inc()
level.Error(m.logger).Log("msg", "integration stopped abnormally, restarting after backoff", "err", err, "integration", i.Name(), "backoff", m.c.IntegrationRestartBackoff)
time.Sleep(m.c.IntegrationRestartBackoff)
} else {
level.Info(m.logger).Log("msg", "stopped integration", "integration", i.Name())
break
}
}
}
func (m *Manager) instanceConfigForIntegration(i Integration) instance.Config {
prometheusName := fmt.Sprintf("integration/%s", i.Name())
common := i.CommonConfig()
var scrapeConfigs []*config.ScrapeConfig
for _, cfg := range i.ScrapeConfigs() {
sc := &config.ScrapeConfig{
JobName: fmt.Sprintf("integrations/%s", cfg.JobName),
MetricsPath: filepath.Join("/integrations", i.Name(), cfg.MetricsPath),
Scheme: "http",
HonorLabels: false,
HonorTimestamps: true,
ScrapeInterval: model.Duration(common.ScrapeInterval),
ScrapeTimeout: model.Duration(common.ScrapeTimeout),
ServiceDiscoveryConfig: m.scrapeServiceDiscovery(),
RelabelConfigs: common.RelabelConfigs,
MetricRelabelConfigs: common.MetricRelabelConfigs,
}
scrapeConfigs = append(scrapeConfigs, sc)
}
instanceCfg := instance.DefaultConfig
instanceCfg.Name = prometheusName
instanceCfg.ScrapeConfigs = scrapeConfigs
instanceCfg.RemoteWrite = m.c.PrometheusRemoteWrite
return instanceCfg
}
func (m *Manager) scrapeServiceDiscovery() sd_config.ServiceDiscoveryConfig {
localAddr := fmt.Sprintf("127.0.0.1:%d", *m.c.ListenPort)
labels := model.LabelSet{}
if m.c.UseHostnameLabel {
labels[model.LabelName("agent_hostname")] = model.LabelValue(m.hostname)
}
for k, v := range m.c.Labels {
labels[k] = v
}
return sd_config.ServiceDiscoveryConfig{
StaticConfigs: []*targetgroup.Group{{
Targets: []model.LabelSet{{model.AddressLabel: model.LabelValue(localAddr)}},
Labels: labels,
}},
}
}
func (m *Manager) WireAPI(r *mux.Router) error {
for _, i := range m.integrations {
integrationsRoot := fmt.Sprintf("/integrations/%s", i.Name())
subRouter := r.PathPrefix(integrationsRoot).Subrouter()
err := i.RegisterRoutes(subRouter)
if err != nil {
return err
}
}
return nil
}
// Stop stops the maanger and all of its integrations.
func (m *Manager) Stop() {
m.cancel()
<-m.done
}