-
Notifications
You must be signed in to change notification settings - Fork 478
/
discovery.go
230 lines (201 loc) · 6.26 KB
/
discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package discovery
import (
"context"
"sort"
"strings"
"sync"
"time"
"github.com/grafana/agent/component"
"github.com/grafana/agent/service/cluster"
"github.com/grafana/ckit/shard"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
)
// Target refers to a singular discovered endpoint found by a discovery
// component.
type Target map[string]string
// DistributedTargets uses the node's Lookup method to distribute discovery
// targets when a Flow component runs in a cluster.
type DistributedTargets struct {
useClustering bool
cluster cluster.Cluster
targets []Target
}
// NewDistributedTargets creates the abstraction that allows components to
// dynamically shard targets between components.
func NewDistributedTargets(e bool, n cluster.Cluster, t []Target) DistributedTargets {
return DistributedTargets{e, n, t}
}
// Get distributes discovery targets a clustered environment.
//
// If a cluster size is 1, then all targets will be returned.
func (t *DistributedTargets) Get() []Target {
// TODO(@tpaschalis): Make this into a single code-path to simplify logic.
if !t.useClustering || t.cluster == nil {
return t.targets
}
peerCount := len(t.cluster.Peers())
resCap := (len(t.targets) + 1)
if peerCount != 0 {
resCap = (len(t.targets) + 1) / peerCount
}
res := make([]Target, 0, resCap)
for _, tgt := range t.targets {
peers, err := t.cluster.Lookup(shard.StringKey(tgt.NonMetaLabels().String()), 1, shard.OpReadWrite)
if err != nil {
// This can only fail in case we ask for more owners than the
// available peers. This will never happen, but in any case we fall
// back to owning the target ourselves.
res = append(res, tgt)
}
if len(peers) == 0 || peers[0].Self {
res = append(res, tgt)
}
}
return res
}
// Labels converts Target into a set of sorted labels.
func (t Target) Labels() labels.Labels {
var lset labels.Labels
for k, v := range t {
lset = append(lset, labels.Label{Name: k, Value: v})
}
sort.Sort(lset)
return lset
}
func (t Target) NonMetaLabels() labels.Labels {
var lset labels.Labels
for k, v := range t {
if !strings.HasPrefix(k, model.MetaLabelPrefix) {
lset = append(lset, labels.Label{Name: k, Value: v})
}
}
sort.Sort(lset)
return lset
}
// Exports holds values which are exported by all discovery components.
type Exports struct {
Targets []Target `river:"targets,attr"`
}
// Discoverer is an alias for Prometheus' Discoverer interface, so users of this package don't need
// to import github.com/prometheus/prometheus/discover as well.
type Discoverer discovery.Discoverer
// Creator is a function provided by an implementation to create a concrete Discoverer instance.
type Creator func(component.Arguments) (Discoverer, error)
// Component is a reusable component for any discovery implementation.
// it will handle dynamic updates and exporting targets appropriately for a scrape implementation.
type Component struct {
opts component.Options
discMut sync.Mutex
latestDisc discovery.Discoverer
newDiscoverer chan struct{}
creator Creator
}
// New creates a discovery component given arguments and a concrete Discovery implementation function.
func New(o component.Options, args component.Arguments, creator Creator) (*Component, error) {
c := &Component{
opts: o,
creator: creator,
// buffered to avoid deadlock from the first immediate update
newDiscoverer: make(chan struct{}, 1),
}
return c, c.Update(args)
}
// Run implements component.Component.
func (c *Component) Run(ctx context.Context) error {
var cancel context.CancelFunc
for {
select {
case <-ctx.Done():
return nil
case <-c.newDiscoverer:
// cancel any previously running discovery
if cancel != nil {
cancel()
}
// create new context so we can cancel it if we get any future updates
// since it is derived from the main run context, it only needs to be
// canceled directly if we receive new updates
newCtx, cancelFunc := context.WithCancel(ctx)
cancel = cancelFunc
// finally run discovery
c.discMut.Lock()
disc := c.latestDisc
c.discMut.Unlock()
go c.runDiscovery(newCtx, disc)
}
}
}
// Update implements component.Component.
func (c *Component) Update(args component.Arguments) error {
disc, err := c.creator(args)
if err != nil {
return err
}
c.discMut.Lock()
c.latestDisc = disc
c.discMut.Unlock()
select {
case c.newDiscoverer <- struct{}{}:
default:
}
return nil
}
// MaxUpdateFrequency is the minimum time to wait between updating targets.
// Prometheus uses a static threshold. Do not recommend changing this, except for tests.
var MaxUpdateFrequency = 5 * time.Second
// runDiscovery is a utility for consuming and forwarding target groups from a discoverer.
// It will handle collating targets (and clearing), as well as time based throttling of updates.
func (c *Component) runDiscovery(ctx context.Context, d Discoverer) {
// all targets we have seen so far
cache := map[string]*targetgroup.Group{}
ch := make(chan []*targetgroup.Group)
go d.Run(ctx, ch)
// function to convert and send targets in format scraper expects
send := func() {
allTargets := []Target{}
for _, group := range cache {
for _, target := range group.Targets {
labels := map[string]string{}
// first add the group labels, and then the
// target labels, so that target labels take precedence.
for k, v := range group.Labels {
labels[string(k)] = string(v)
}
for k, v := range target {
labels[string(k)] = string(v)
}
allTargets = append(allTargets, labels)
}
}
c.opts.OnStateChange(Exports{Targets: allTargets})
}
ticker := time.NewTicker(MaxUpdateFrequency)
// true if we have received new targets and need to send.
haveUpdates := false
for {
select {
case <-ticker.C:
if haveUpdates {
send()
haveUpdates = false
}
case <-ctx.Done():
send()
return
case groups := <-ch:
for _, group := range groups {
// Discoverer will send an empty target set to indicate the group (keyed by Source field)
// should be removed
if len(group.Targets) == 0 {
delete(cache, group.Source)
} else {
cache[group.Source] = group
}
}
haveUpdates = true
}
}
}