-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscraper.go
119 lines (96 loc) · 2.56 KB
/
scraper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package exporter
import (
"context"
"errors"
"fmt"
"net/http"
"time"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
const (
maxConcurrentScrapes = 15
)
type MetricFamilyMap map[string]*dto.MetricFamily
type Scraper interface {
Scrape(ctx context.Context, urls []string) ([]MetricFamilyMap, error)
}
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}
type result struct {
metricFamilyMap MetricFamilyMap
err error
ts time.Time
}
type scraper struct {
httpClient HTTPClient
parser expfmt.TextParser
log logrus.FieldLogger
}
func NewScraper(httpClient HTTPClient, log logrus.FieldLogger) Scraper {
return &scraper{
httpClient: httpClient,
log: log,
}
}
func (s scraper) Scrape(ctx context.Context, urls []string) ([]MetricFamilyMap, error) {
var g errgroup.Group
g.SetLimit(maxConcurrentScrapes)
resultsChan := make(chan result, maxConcurrentScrapes)
now := time.Now().UTC()
for i := range urls {
url := urls[i]
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
metrics, err := s.scrapeURL(ctx, url)
if err != nil {
err = fmt.Errorf("error while fetching metrics from '%s' %w", url, err)
}
resultsChan <- result{metricFamilyMap: metrics, err: err, ts: now}
}
return nil
})
}
go func() {
if err := g.Wait(); err != nil && !errors.Is(err, context.Canceled) {
s.log.Errorf("error while scraping metrics %v", err)
}
close(resultsChan)
}()
metrics := make([]MetricFamilyMap, 0, len(urls))
for result := range resultsChan {
if result.err != nil {
s.log.Error(result.err)
continue
}
metrics = append(metrics, result.metricFamilyMap)
}
return metrics, nil
}
func (s scraper) scrapeURL(ctx context.Context, url string) (map[string]*dto.MetricFamily, error) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
req, err := http.NewRequestWithContext(ctxWithTimeout, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("cannot create request %w", err)
}
resp, err := s.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("error while making http request %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("request failed with status code: %d", resp.StatusCode)
}
defer resp.Body.Close()
metrics, err := s.parser.TextToMetricFamilies(resp.Body)
if err != nil {
return nil, fmt.Errorf("cannot parse metrics %w", err)
}
return metrics, nil
}