forked from openshift/telemeter
/
main.go
474 lines (397 loc) · 15.9 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
stdlog "log"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/coreos/go-oidc"
"github.com/go-chi/chi"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
"github.com/openshift/telemeter/pkg/tracing"
"github.com/openshift/telemeter/pkg/authorize/ssl"
telemeter_http "github.com/openshift/telemeter/pkg/http"
"github.com/openshift/telemeter/pkg/logger"
"github.com/openshift/telemeter/pkg/receive"
"github.com/openshift/telemeter/pkg/server"
)
const desc = `
Server for receiving Prometheus metrics through the remote_write API. Clients are authenticated
with mTLS.
`
func defaultOpts() *Options {
return &Options{
LimitBytes: 500 * 1024,
LimitReceiveBytes: receive.DefaultRequestLimit,
Ratelimit: 4*time.Minute + 30*time.Second,
}
}
func main() {
opt := defaultOpts()
var listen, listenInternal string
cmd := &cobra.Command{
Short: "Proxy for Prometheus remote_write API with mTLS authentication.",
Long: desc,
SilenceErrors: true,
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
listener, err := net.Listen("tcp", listen)
if err != nil {
return err
}
internalListener, err := net.Listen("tcp", listenInternal)
if err != nil {
return err
}
return opt.Run(context.Background(), listener, internalListener)
},
}
cmd.Flags().StringVar(&listen, "listen", "0.0.0.0:9003", "A host:port to listen on for upload traffic.")
cmd.Flags().StringVar(&listenInternal, "listen-internal", "localhost:9004", "A host:port to listen on for health and metrics.")
cmd.Flags().StringVar(&opt.TLSKeyPath, "tls-key", opt.TLSKeyPath, "Path to a private key to serve TLS for external traffic.")
cmd.Flags().StringVar(&opt.TLSCertificatePath, "tls-crt", opt.TLSCertificatePath, "Path to a certificate to serve TLS for external traffic.")
cmd.Flags().StringVar(&opt.TLSCACertificatePath, "tls-ca-crt", opt.TLSCACertificatePath, "Path to the trusted Certificate Authority of the rhelemeter client for mTLS.")
cmd.Flags().StringVar(&opt.ClientInfoFromRequestConfigFile, "client-info-data-file", opt.ClientInfoFromRequestConfigFile,
"Path to the file containing the PSK and information about how to extract the client cert from the request")
cmd.Flags().StringVar(&opt.InternalTLSKeyPath, "internal-tls-key", opt.InternalTLSKeyPath, "Path to a private key to serve TLS for internal traffic.")
cmd.Flags().StringVar(&opt.InternalTLSCertificatePath, "internal-tls-crt", opt.InternalTLSCertificatePath, "Path to a certificate to serve TLS for internal traffic.")
cmd.Flags().StringSliceVar(&opt.LabelFlag, "label", opt.LabelFlag, "Labels to add to each outgoing metric, in key=value form.")
cmd.Flags().StringVar(&opt.OIDCIssuer, "oidc-issuer", opt.OIDCIssuer, "The OIDC issuer URL, see https://openid.net/specs/openid-connect-discovery-1_0.html#IssuerDiscovery.")
cmd.Flags().StringVar(&opt.OIDCClientSecret, "client-secret", opt.OIDCClientSecret, "The OIDC client secret, see https://tools.ietf.org/html/rfc6749#section-2.3.")
cmd.Flags().StringVar(&opt.OIDCClientID, "client-id", opt.OIDCClientID, "The OIDC client ID, see https://tools.ietf.org/html/rfc6749#section-2.3.")
cmd.Flags().StringVar(&opt.OIDCAudienceEndpoint, "oidc-audience", opt.OIDCAudienceEndpoint, "The OIDC audience some providers like Auth0 need.")
cmd.Flags().StringVar(&opt.TenantID, "tenant-id", opt.TenantID, "Tenant ID to use for the system forwarded to.")
cmd.Flags().DurationVar(&opt.Ratelimit, "ratelimit", opt.Ratelimit, "The rate limit of metric uploads per client. Uploads happening more often than this limit will be rejected.")
cmd.Flags().StringVar(&opt.ForwardURL, "forward-url", opt.ForwardURL, "All written metrics will be written to this URL additionally")
cmd.Flags().BoolVarP(&opt.Verbose, "verbose", "v", opt.Verbose, "Show verbose output.")
cmd.Flags().StringArrayVar(&opt.Whitelist, "whitelist", opt.Whitelist, "Allowed rules for incoming metrics. If one of these rules is not matched, the metric is dropped.")
cmd.Flags().StringVar(&opt.WhitelistFile, "whitelist-file", opt.WhitelistFile, "A file of allowed rules for incoming metrics. If one of these rules is not matched, the metric is dropped; one label key per line.")
cmd.Flags().StringArrayVar(&opt.ElideLabels, "elide-label", opt.ElideLabels, "A list of labels to be elided from incoming metrics.")
cmd.Flags().Int64Var(&opt.LimitBytes, "limit-bytes", opt.LimitBytes, "The maxiumum acceptable size of a request made to the upload endpoint.")
cmd.Flags().Int64Var(&opt.LimitReceiveBytes, "limit-receive-bytes", opt.LimitReceiveBytes, "The maxiumum acceptable size of a request made to the receive endpoint.")
cmd.Flags().StringVar(&opt.LogLevel, "log-level", opt.LogLevel, "Log filtering level. e.g info, debug, warn, error")
cmd.Flags().StringVar(&opt.TracingServiceName, "internal.tracing.service-name", "rhelemeter-server",
"The service name to report to the tracing backend.")
cmd.Flags().StringVar(&opt.TracingEndpoint, "internal.tracing.endpoint", "",
"The full URL of the trace collector. If it's not set, tracing will be disabled.")
cmd.Flags().BoolVar(&opt.TracingInsecure, "internal.tracing.insecure", false,
"Allow insecure connections to the tracing endpoint.")
cmd.Flags().Float64Var(&opt.TracingSamplingFraction, "internal.tracing.sampling-fraction", 0.1,
"The fraction of traces to sample. Thus, if you set this to .5, half of traces will be sampled.")
cmd.Flags().StringVar(&opt.TracingEndpointType, "internal.tracing.endpoint-type", string(tracing.EndpointTypeAgent),
fmt.Sprintf("The tracing endpoint type. Options: '%s', '%s'.", tracing.EndpointTypeAgent, tracing.EndpointTypeCollector))
l := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
l = log.WithPrefix(l, "ts", log.DefaultTimestampUTC)
l = log.WithPrefix(l, "caller", log.DefaultCaller)
stdlog.SetOutput(log.NewStdlibAdapter(l))
opt.Logger = l
level.Info(l).Log("msg", "Rhelemeter server initialized.")
if err := cmd.Execute(); err != nil {
level.Error(l).Log("err", err)
os.Exit(1)
}
}
type Options struct {
// External server mTLS configuration
TLSKeyPath string
TLSCertificatePath string
TLSCACertificatePath string
// ClientInfoFromRequestConfigFile is the path to the file containing the PSK
// and information about how to extract the client cert like info from the request
ClientInfoFromRequestConfigFile string
// Internal server TLS configuration
InternalTLSKeyPath string
InternalTLSCertificatePath string
OIDCIssuer string
OIDCClientID string
OIDCClientSecret string
OIDCAudienceEndpoint string
TenantID string
LabelFlag []string
Labels map[string]string
LimitBytes int64
LimitReceiveBytes int64
RequiredLabelFlag []string
RequiredLabels map[string]string
Whitelist []string
ElideLabels []string
WhitelistFile string
Ratelimit time.Duration
ForwardURL string
LogLevel string
Logger log.Logger
TracingServiceName string
TracingEndpoint string
TracingEndpointType string
TracingSamplingFraction float64
TracingInsecure bool
Verbose bool
}
type Paths struct {
Paths []string `json:"paths"`
}
func (o *Options) Run(ctx context.Context, externalListener, internalListener net.Listener) error {
for _, flag := range o.LabelFlag {
values := strings.SplitN(flag, "=", 2)
if len(values) != 2 {
return fmt.Errorf("--label must be of the form key=value: %s", flag)
}
if o.Labels == nil {
o.Labels = make(map[string]string)
}
o.Labels[values[0]] = values[1]
}
for _, flag := range o.RequiredLabelFlag {
values := strings.SplitN(flag, "=", 2)
if len(values) != 2 {
return fmt.Errorf("--required-label must be of the form key=value: %s", flag)
}
if o.RequiredLabels == nil {
o.RequiredLabels = make(map[string]string)
}
o.RequiredLabels[values[0]] = values[1]
}
levelledOption := logger.LogLevelFromString(o.LogLevel)
o.Logger = level.NewFilter(o.Logger, levelledOption)
tp, err := tracing.InitTracer(
ctx,
o.TracingServiceName,
o.TracingEndpoint,
o.TracingEndpointType,
o.TracingSamplingFraction,
)
if err != nil {
return fmt.Errorf("cannot initialize tracer: %v", err)
}
otel.SetErrorHandler(tracing.OtelErrorHandler{Logger: o.Logger})
var transport http.RoundTripper = otelhttp.NewTransport(&http.Transport{
DialContext: (&net.Dialer{Timeout: 10 * time.Second}).DialContext,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 30 * time.Second,
})
if o.Verbose {
transport = telemeter_http.NewDebugRoundTripper(o.Logger, transport)
}
forwardClient := &http.Client{
Transport: telemeter_http.NewInstrumentedRoundTripper("forward", transport),
}
if o.OIDCIssuer != "" {
provider, err := oidc.NewProvider(ctx, o.OIDCIssuer)
if err != nil {
return fmt.Errorf("OIDC provider initialization failed: %v", err)
}
ctx = context.WithValue(ctx, oauth2.HTTPClient,
&http.Client{
// Note, that e.g forward timeouts after 5s.
Timeout: 20 * time.Second,
Transport: telemeter_http.NewInstrumentedRoundTripper("oauth", transport),
},
)
cfg := clientcredentials.Config{
ClientID: o.OIDCClientID,
ClientSecret: o.OIDCClientSecret,
TokenURL: provider.Endpoint().TokenURL,
}
if o.OIDCAudienceEndpoint != "" {
cfg.EndpointParams = url.Values{"audience": []string{o.OIDCAudienceEndpoint}}
}
s := cfg.TokenSource(ctx)
forwardClient.Transport = &oauth2.Transport{
Base: forwardClient.Transport,
Source: s,
}
}
var g run.Group
{
internal := http.NewServeMux()
telemeter_http.DebugRoutes(internal)
telemeter_http.MetricRoutes(internal)
telemeter_http.HealthRoutes(internal)
r := chi.NewRouter()
r.Mount("/", internal)
r.Get("/", func(w http.ResponseWriter, req *http.Request) {
internalPathJSON, _ := json.MarshalIndent(Paths{Paths: []string{"/", "/metrics", "/debug/pprof", "/healthz", "/healthz/ready"}}, "", " ")
w.Header().Add("Content-Type", "application/json")
if _, err := w.Write(internalPathJSON); err != nil {
level.Error(o.Logger).Log("msg", "could not write internal paths", "err", err)
}
})
s := &http.Server{
Handler: otelhttp.NewHandler(r, "internal", otelhttp.WithTracerProvider(tp)),
}
// Run the internal server.
g.Add(func() error {
if len(o.InternalTLSCertificatePath) > 0 {
if err := s.ServeTLS(internalListener, o.InternalTLSCertificatePath, o.InternalTLSKeyPath); err != nil && err != http.ErrServerClosed {
level.Error(o.Logger).Log("msg", "internal HTTPS server exited", "err", err)
return err
}
} else {
if err := s.Serve(internalListener); err != nil && err != http.ErrServerClosed {
level.Error(o.Logger).Log("msg", "internal HTTP server exited", "err", err)
return err
}
}
return nil
}, func(error) {
_ = s.Shutdown(context.TODO())
internalListener.Close()
})
}
{
var hasClientCertConfig bool
external := chi.NewRouter()
external.Use(logger.RequestLoggerWithTraceInfo(o.Logger))
if o.ClientInfoFromRequestConfigFile != "" {
hasClientCertConfig = true
b, err := os.ReadFile(o.ClientInfoFromRequestConfigFile)
if err != nil {
level.Error(o.Logger).Log("msg", "cannot read client info config file", "err", err)
return err
}
var conf ssl.ClientCertConfig
if err := json.Unmarshal(b, &conf); err != nil {
level.Error(o.Logger).Log("msg", "cannot unmarshal client info config file", "err", err)
return err
}
if err := conf.Validate(); err != nil {
level.Error(o.Logger).Log("msg", "cannot validate client info config file", "err", err)
return err
}
external.Use(ssl.ClientCertInfoAsHeaders(conf, o.Logger))
}
mux := http.NewServeMux()
external.Mount("/", mux)
// rhelemeter routes
{
receiver, err := receive.NewHandler(o.Logger, o.ForwardURL, forwardClient, prometheus.DefaultRegisterer, o.TenantID, o.Whitelist, o.ElideLabels)
if err != nil {
level.Error(o.Logger).Log("msg", "could not initialize receive handler", "err", err)
}
external.Handle("/metrics/v1/receive", server.InstrumentedHandler("receive", http.HandlerFunc(receiver.Receive)))
}
externalPathJSON, _ := json.MarshalIndent(Paths{Paths: []string{"/", "/healthz", "/healthz/ready", "/metrics/v1/receive"}}, "", " ")
external.Get("/", func(w http.ResponseWriter, req *http.Request) {
w.Header().Add("Content-Type", "application/json")
if _, err := w.Write(externalPathJSON); err != nil {
level.Error(o.Logger).Log("msg", "could not write external paths", "err", err)
}
})
s := &http.Server{
Handler: otelhttp.NewHandler(external, "external", otelhttp.WithTracerProvider(tp)),
ErrorLog: stdlog.New(
&filteredHTTP2ErrorWriter{
out: os.Stderr,
toDebugLogFilters: logFilter,
logger: o.Logger,
},
"",
0),
}
// Run the external server.
g.Add(func() error {
if len(o.TLSCertificatePath) > 0 {
cert, err := tls.LoadX509KeyPair(o.TLSCertificatePath, o.TLSKeyPath)
if err != nil {
return err
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
}
if o.TLSCACertificatePath != "" {
caCert, err := os.ReadFile(o.TLSCACertificatePath)
if err != nil {
return err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.ClientCAs = caCertPool
}
// if not explicitly set, require and verify client cert from the request directly
if !hasClientCertConfig {
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
}
externalTLSListener := tls.NewListener(externalListener, tlsConfig)
if err := s.Serve(externalTLSListener); err != nil && !errors.Is(err, http.ErrServerClosed) {
level.Error(o.Logger).Log("msg", "external HTTPS server exited", "err", err)
return err
}
} else {
if err := s.Serve(externalListener); err != nil && !errors.Is(err, http.ErrServerClosed) {
level.Error(o.Logger).Log("msg", "external HTTP server exited", "err", err)
return err
}
}
return nil
}, func(error) {
_ = s.Shutdown(context.TODO())
externalListener.Close()
// Close clients in order to check for leaks properly.
forwardClient.CloseIdleConnections()
if c, ok := ctx.Value(oauth2.HTTPClient).(*http.Client); ok {
c.CloseIdleConnections()
}
})
}
// Kill all when caller requests to.
gctx, gcancel := context.WithCancel(ctx)
g.Add(func() error {
<-gctx.Done()
return gctx.Err()
}, func(err error) {
gcancel()
})
level.Info(o.Logger).Log("msg", "starting rhelemeter-server", "external", externalListener.Addr().String(), "internal", internalListener.Addr().String())
return g.Run()
}
// logFilter is a list of filters
var logFilter = [][]string{
// filter out TCP probes
// see https://github.com/golang/go/issues/26918
{
"http2: server: error reading preface from client",
"read: connection reset by peer",
},
}
type filteredHTTP2ErrorWriter struct {
out io.Writer
// toDebugLogFilters is a list of filters.
// All strings within a filter must match for the filter to match.
// If any of the filters matches, the log is written to debug level.
toDebugLogFilters [][]string
logger log.Logger
}
func (w *filteredHTTP2ErrorWriter) Write(p []byte) (int, error) {
logContents := string(p)
for _, filter := range w.toDebugLogFilters {
shouldFilter := true
for _, matches := range filter {
if !strings.Contains(logContents, matches) {
shouldFilter = false
break
}
}
if shouldFilter {
level.Debug(w.logger).Log("msg", "http server error log has been filtered", "error", logContents)
return len(p), nil
}
}
return w.out.Write(p)
}