-
Notifications
You must be signed in to change notification settings - Fork 134
/
prometheus_scraping.go
120 lines (105 loc) · 2.72 KB
/
prometheus_scraping.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package utilisation
import (
"fmt"
"io"
"net/http"
"sync"
discovery "k8s.io/api/discovery/v1"
commonUtil "github.com/armadaproject/armada/internal/common/util"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
log "github.com/sirupsen/logrus"
)
type httpGetter interface {
Get(url string) (resp *http.Response, err error)
}
func getUrlsToScrape(endpointSlices []*discovery.EndpointSlice, nodeNames []string) []string {
nodeNamesSet := commonUtil.StringListToSet(nodeNames)
var urlsToScrape []string
for _, endpointSlice := range endpointSlices {
if len(endpointSlice.Ports) < 1 {
continue
}
port := *endpointSlice.Ports[0].Port
for _, endpoint := range endpointSlice.Endpoints {
if !nodeNamesSet[*endpoint.NodeName] {
continue
}
if !*endpoint.Conditions.Ready {
continue
}
if !*endpoint.Conditions.Serving {
continue
}
if *endpoint.Conditions.Terminating {
continue
}
if len(endpoint.Addresses) < 1 {
continue
}
url := fmt.Sprintf("http://%s:%d/metrics", endpoint.Addresses[0], port)
urlsToScrape = append(urlsToScrape, url)
}
}
return urlsToScrape
}
func scrapeUrls(urls []string, metricNames []string, client httpGetter) model.Vector {
vectors := make(chan model.Vector, len(urls))
wg := sync.WaitGroup{}
for _, url := range urls {
wg.Add(1)
go func(url string) {
defer wg.Done()
vector, err := scrapeUrl(url, metricNames, client)
if err != nil {
log.Warnf("Error scraping custom prometheus stats from url %s: %v", url, err)
return
}
vectors <- vector
}(url)
}
go func() {
wg.Wait()
close(vectors)
}()
var allVectors model.Vector
for v := range vectors {
for _, sample := range v {
allVectors = append(allVectors, sample)
}
}
return allVectors
}
func scrapeUrl(url string, metricNamesWanted []string, httpClient httpGetter) (model.Vector, error) {
resp, err := httpClient.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return parseResponse(resp, metricNamesWanted)
}
func parseResponse(resp *http.Response, metricNamesWanted []string) (model.Vector, error) {
metricNamesWantedSet := commonUtil.StringListToSet(metricNamesWanted)
decoder := &expfmt.SampleDecoder{
Dec: expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)),
Opts: &expfmt.DecodeOptions{},
}
var allSamples model.Vector
for {
var samples model.Vector
err := decoder.Decode(&samples)
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
for _, sample := range samples {
metricName := sample.Metric[model.MetricNameLabel]
if _, ok := metricNamesWantedSet[string(metricName)]; ok {
allSamples = append(allSamples, sample)
}
}
}
return allSamples, nil
}