/
metrics_client.go
263 lines (226 loc) · 8.29 KB
/
metrics_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and Gardener contributors
//
// SPDX-License-Identifier: Apache-2.0
package metrics_scraper
import (
"bufio"
"compress/gzip"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net/http"
"strconv"
"strings"
krest "k8s.io/client-go/rest"
)
const (
metricName = "apiserver_request_total"
)
type metricsClient interface {
// GetKapiInstanceMetrics scrapes a Kapi metric endpoint and returns the sum of all apiserver_request_total counters.
//
// Parameters:
// - url points to the metrics endpoint.
// - authSecret specifies a bearer auth token to present to the metrics endpoint.
// - caCertificates lists trusted CA certificates which are used to verify the endpoint's certificate.
//
// Returns:
// - an int64 value which is the sum of all apiserver_request_total counters from the scraped metric response.
// - an optional error
//
// Exactly one of the int64 value and the error is non-zero.
// An error is returned if the metrics data contains no apiserver_request_total counters.
//
// Remarks: For performance reasons, this function requires that if a line containing the metric of interest start with
// whitespaces, those whitespaces be only ASCII whitespaces.
GetKapiInstanceMetrics(
ctx context.Context, url string, authSecret string, caCertificates *x509.CertPool) (result int64, err error)
}
type metricsClientImpl struct {
testIsolation metricsClientTestIsolation // Provides indirections necessary to isolate the unit during tests
}
func newMetricsClient() metricsClient {
return &metricsClientImpl{
testIsolation: metricsClientTestIsolation{
NewHttpClient: newHttpClient,
},
}
}
// GetKapiInstanceMetrics scrapes a Kapi metric endpoint and returns the sum of all apiserver_request_total counters.
//
// Parameters:
// - url points to the metrics endpoint.
// - authSecret specifies a bearer auth token to present to the metrics endpoint.
// - caCertificates lists trusted CA certificates which are used to verify the endpoint's certificate.
//
// Returns:
// - an int64 value which is the sum of all apiserver_request_total counters from the scraped metric response.
// - an optional error
//
// Exactly one of the int64 value and the error is non-zero.
// An error is returned if the metrics data contains no apiserver_request_total counters.
//
// Remarks: For performance reasons, this function requires that if a line containing the metric of interest start with
// whitespaces, those whitespaces be only ASCII whitespaces.
func (mc *metricsClientImpl) GetKapiInstanceMetrics(
ctx context.Context, url string, authSecret string, caCertificates *x509.CertPool) (result int64, err error) {
// Prepare request
request, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return 0, fmt.Errorf("metrics client: creating http request object: %w", err)
}
request.Header.Set("Authorization", "Bearer "+authSecret)
request.Header.Set("Accept-Encoding", "gzip")
client := mc.testIsolation.NewHttpClient(caCertificates)
// Send request
response, err := client.Do(request)
if err != nil {
return 0, fmt.Errorf("metrics client: making http request: %w", err)
}
defer func(responseBodyStream io.ReadCloser) {
e := responseBodyStream.Close()
if e != nil && err == nil {
err = fmt.Errorf("metrics client: closing response stream: %w", e)
}
}(response.Body)
if response.StatusCode < 200 || response.StatusCode >= 300 {
return 0, fmt.Errorf("metrics client: response reported HTTP status %d", response.StatusCode)
}
// If the server returned compressed response, use decompressing reader
if response.Header.Get("Content-Encoding") == "gzip" {
reader, err := gzip.NewReader(response.Body)
if err != nil {
return 0, fmt.Errorf("metrics client: scraping '%s': reading gzip encoded response stream: %w", url, err)
}
defer reader.Close()
return getTotalRequestCount(reader)
}
return getTotalRequestCount(response.Body)
}
// getTotalRequestCount processes a metrics response stream and returns the sum of all apiserver_request_total counters.
//
// Returns:
// - an int64 value which is the sum of all apiserver_request_total counters from the scraped metric response.
// - an optional error
//
// Exactly one of the int64 value and the error is non-zero.
func getTotalRequestCount(metricsStream io.Reader) (int64, error) {
// Limit the metrics response as a general precaution. It should be < 5MiB, so if we're getting >20MiB something's wrong.
metricsStream = &io.LimitedReader{R: metricsStream, N: 20 * 1024 * 1024}
reader := bufio.NewReader(metricsStream)
totalRequestCount := int64(0)
isCounterFound := false
isLastReadPartial := false
lineBytes, isPrefix, err := reader.ReadLine()
for ; err == nil; lineBytes, isPrefix, err = reader.ReadLine() {
if isPrefix {
// Long lines are not expected, and not of interest to us. Just skip them.
isLastReadPartial = true
continue
}
if isLastReadPartial {
// That's the last fragment of a long line
isLastReadPartial = false
continue
}
line := string(lineBytes)
if len(line) > 0 && isSpace(line, 0) {
i := skipSpace(line, 1)
line = line[i:]
}
if !strings.HasPrefix(line, metricName) {
// One of the other metrics. Not of interest to us.
continue
}
_, seriesCurrentValue, err := parseLine(line)
if err != nil {
return 0, fmt.Errorf("parsing metrics line '%s': %w", line, err)
}
totalRequestCount += seriesCurrentValue
isCounterFound = true
}
if err != io.EOF {
return 0, err
}
if !isCounterFound {
return 0, fmt.Errorf(
"calculating total request count from metrics response: the response contains no '%s' counters", metricName)
}
return totalRequestCount, nil
}
// Assumes that the line starts with metricName, no leading whitespace.
// Returns (seriesId, seriesValue, error). Exactly one of seriesValue/error is nil.
func parseLine(line string) (string, int64, error) {
// Sample line: apiserver_request_total{code="200",component="apiserver",dry_run="",group="",resource="configmaps",scope="namespace",subresource="",verb="LIST",version="v1"} 15
malformedLineError := fmt.Errorf("parsing metrics line: malformed line '%s'", line)
seriesId := ""
// Process series name section, e.g: {code="200",component="apiserver",dry_run="",group="",resource="configmaps",scope="namespace",subresource="",verb="LIST",version="v1"}
i := len(metricName)
if i >= len(line) {
return "", 0, malformedLineError
}
// Process optional labels section
i = skipSpace(line, i)
if line[i] == '{' {
seriesIdStart := i + 1
for i++; i < len(line) && line[i] != '}'; i++ {
}
if i == len(line) {
return "", 0, malformedLineError
}
seriesId = line[seriesIdStart:i]
i++ // Move past '}'
}
// Process value section
i = skipSpace(line, i)
if i >= len(line) {
return "", 0, malformedLineError
}
valueEnd := i + 1
for ; valueEnd < len(line) && !isSpace(line, valueEnd); valueEnd++ {
}
valueString := line[i:valueEnd]
var seriesValue int64
var err error
if strings.Contains(valueString, "e") { // Some integer values come in scientific notation, e.g. 1.234567e+06
var floatValue float64
floatValue, err = strconv.ParseFloat(valueString, 64)
seriesValue = int64(floatValue) // The significand of double is 53 bits - should represent request count accurately
} else {
seriesValue, err = strconv.ParseInt(valueString, 10, 64)
}
if err != nil {
return "", 0, malformedLineError
}
return seriesId, seriesValue, nil
}
func isSpace(str string, i int) bool {
return str[i] == ' ' || str[i] == '\t'
}
// Starts at i and returns the index of the first non whitespace character, or one-past-end
func skipSpace(str string, i int) int {
for ; i < len(str) && isSpace(str, i); i++ {
}
return i
}
//#region Test isolation
// metricsClientTestIsolation contains all points of indirection necessary to isolate static function calls
// in the metrics client unit
type metricsClientTestIsolation struct {
// Creates a new HTTP client with default settings
NewHttpClient func(caCertificates *x509.CertPool) krest.HTTPClient
}
func newHttpClient(caCertificates *x509.CertPool) krest.HTTPClient {
return &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: caCertificates,
ServerName: "kube-apiserver",
MinVersion: tls.VersionTLS13,
},
},
}
}
//#endregion Test isolation