forked from grafana/loki
-
Notifications
You must be signed in to change notification settings - Fork 1
/
pushtargetmanager.go
111 lines (94 loc) · 3.05 KB
/
pushtargetmanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package lokipush
import (
"errors"
"fmt"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/util/strutil"
"github.com/grafana/loki/clients/pkg/logentry/stages"
"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"
)
// PushTargetManager manages a series of PushTargets.
type PushTargetManager struct {
logger log.Logger
targets map[string]*PushTarget
}
// NewPushTargetManager creates a new PushTargetManager.
func NewPushTargetManager(
reg prometheus.Registerer,
logger log.Logger,
client api.EntryHandler,
scrapeConfigs []scrapeconfig.Config,
) (*PushTargetManager, error) {
tm := &PushTargetManager{
logger: logger,
targets: make(map[string]*PushTarget),
}
if err := validateJobName(scrapeConfigs); err != nil {
return nil, err
}
for _, cfg := range scrapeConfigs {
pipeline, err := stages.NewPipeline(log.With(logger, "component", "push_pipeline_"+cfg.JobName), cfg.PipelineStages, &cfg.JobName, reg)
if err != nil {
return nil, err
}
t, err := NewPushTarget(logger, pipeline.Wrap(client), cfg.RelabelConfigs, cfg.JobName, cfg.PushConfig)
if err != nil {
return nil, err
}
tm.targets[cfg.JobName] = t
}
return tm, nil
}
func validateJobName(scrapeConfigs []scrapeconfig.Config) error {
jobNames := map[string]struct{}{}
for i, cfg := range scrapeConfigs {
if cfg.JobName == "" {
return errors.New("`job_name` must be defined for the `push` scrape_config with a " +
"unique name to properly register metrics, " +
"at least one `push` scrape_config has no `job_name` defined")
}
if _, ok := jobNames[cfg.JobName]; ok {
return fmt.Errorf("`job_name` must be unique for each `push` scrape_config, "+
"a duplicate `job_name` of %s was found", cfg.JobName)
}
jobNames[cfg.JobName] = struct{}{}
scrapeConfigs[i].JobName = strutil.SanitizeLabelName(cfg.JobName)
}
return nil
}
// Ready returns true if at least one PushTarget is also ready.
func (tm *PushTargetManager) Ready() bool {
for _, t := range tm.targets {
if t.Ready() {
return true
}
}
return false
}
// Stop stops the PushTargetManager and all of its PushTargets.
func (tm *PushTargetManager) Stop() {
for _, t := range tm.targets {
if err := t.Stop(); err != nil {
level.Error(t.logger).Log("msg", "error stopping PushTarget", "err", err.Error())
}
}
}
// ActiveTargets returns the list of PushTargets where Push data
// is being read. ActiveTargets is an alias to AllTargets as
// PushTargets cannot be deactivated, only stopped.
func (tm *PushTargetManager) ActiveTargets() map[string][]target.Target {
return tm.AllTargets()
}
// AllTargets returns the list of all targets where Push data
// is currently being read.
func (tm *PushTargetManager) AllTargets() map[string][]target.Target {
result := make(map[string][]target.Target, len(tm.targets))
for k, v := range tm.targets {
result[k] = []target.Target{v}
}
return result
}