From 981fa1eb79fe41c78dc4b9e6e17e2d87339364e5 Mon Sep 17 00:00:00 2001 From: AlexDHoffer Date: Tue, 13 Jun 2023 10:04:49 -0700 Subject: [PATCH 1/6] Instrument cortex tenant --- go.mod | 9 ++++++++ go.sum | 25 ++++++++++++++++++++ main.go | 6 +++++ processor.go | 64 +++++++++++++++++++++++++++++++++++++++++++++++----- 4 files changed, 98 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 95bdc82..87c22d4 100644 --- a/go.mod +++ b/go.mod @@ -19,13 +19,22 @@ require ( require ( github.com/andybalholm/brotli v1.0.5 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/protobuf v1.5.3 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/klauspost/compress v1.16.5 // indirect github.com/kr/pretty v0.3.1 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_golang v1.15.1 // indirect + github.com/prometheus/client_model v0.3.0 // indirect + github.com/prometheus/common v0.42.0 // indirect + github.com/prometheus/procfs v0.9.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect golang.org/x/sys v0.8.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 818a79f..e2f979e 100644 --- a/go.sum +++ b/go.sum @@ -1,17 +1,27 @@ github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/blind-oracle/go-common v1.0.7 h1:pDBnhwu7JaJRDNF2PcJEZdscR2rcgPLH2sBGIzlnV/0= github.com/blind-oracle/go-common v1.0.7/go.mod h1:Sw3z/RG/QEPZ3oabRuwcMjVkVOkeeEhKyRpcimQZoUs= github.com/caarlos0/env/v8 v8.0.0 h1:POhxHhSpuxrLMIdvTGARuZqR4Jjm8AYmoi/JKlcScs0= github.com/caarlos0/env/v8 v8.0.0/go.mod h1:7K4wMY9bH0esiXSSHlfHLX5xKGQMnkH5Fk4TDSSSzfo= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -30,11 +40,21 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= +github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.15.1 h1:8tXpTmJbyH5lydzFPoxSIJ0J46jdh3tylbvM1xCv0LI= +github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk= +github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= +github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= +github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= +github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= +github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= +github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= github.com/prometheus/prometheus v0.44.0 h1:sgn8Fdx+uE5tHQn0/622swlk2XnIj6udoZCnbVjHIgc= github.com/prometheus/prometheus v0.44.0/go.mod h1:aPsmIK3py5XammeTguyqTmuqzX/jeCdyOWWobLHNKQg= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= @@ -66,6 +86,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -86,6 +107,10 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/main.go b/main.go index 5765c47..7f9036f 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ import ( "net/http" _ "net/http/pprof" + "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" ) @@ -34,6 +35,11 @@ func main() { }() } + go func() { + http.Handle("/metrics", promhttp.Handler()) + http.ListenAndServe(":9090", nil) + }() + lvl, err := log.ParseLevel(cfg.LogLevel) if err != nil { log.Fatalf("Unable to parse log level: %s", err) diff --git a/processor.go b/processor.go index 9cc181c..e65b73b 100644 --- a/processor.go +++ b/processor.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "fmt" "net" + "strconv" "sync" "sync/atomic" "time" @@ -15,14 +16,54 @@ import ( "github.com/google/uuid" me "github.com/hashicorp/go-multierror" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/prompb" fh "github.com/valyala/fasthttp" ) +var ( + metricTimeseriesBatchesReceived = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex_tenant", + Name: "timeseries_batches_received", + Help: "The total number of batches received.", + }) + metricTimeseriesBatchesReceivedBytes = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "cortex_tenant", + Name: "timeseries_batches_received_bytes", + Help: "Size in bytes of timeseries batches received.", + Buckets: []float64{0.5, 1, 10, 25, 100, 250, 500, 1000, 5000, 10000, 30000, 300000, 600000, 1800000, 3600000}, + }) + metricTimeseriesReceived = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex_tenant", + Name: "timeseries_received", + Help: "The total number of timeseries received.", + }) + metricTimeseriesRequestDurationMilliseconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex_tenant", + Name: "timeseries_request_duration_milliseconds", + Help: "HTTP write request duration for tenant-specific timeseries in milliseconds, filtered by response code.", + Buckets: []float64{0.5, 1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 30000, 60000, 1800000, 3600000}, + }, + []string{"code"}, + ) + metricTimeseriesRequestErrors = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex_tenant", + Name: "timeseries_request_errors", + Help: "The total number of tenant-specific timeseries writes that yielded errors.", + }) + metricTimeseriesRequests = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex_tenant", + Name: "timeseries_requests", + Help: "The total number of tenant-specific timeseries writes.", + }) +) + type result struct { - code int - body []byte - err error + code int + body []byte + duration float64 + err error } type processor struct { @@ -119,6 +160,8 @@ func (p *processor) handle(ctx *fh.RequestCtx) { return } + metricTimeseriesBatchesReceivedBytes.Observe(float64(ctx.Request.Header.ContentLength())) + metricTimeseriesBatchesReceived.Inc() wrReqIn, err := p.unmarshal(ctx.Request.Body()) if err != nil { ctx.Error(err.Error(), fh.StatusBadRequest) @@ -132,7 +175,7 @@ func (p *processor) handle(ctx *fh.RequestCtx) { // If there's metadata - just accept the request and drop it if len(wrReqIn.Metadata) > 0 { if p.cfg.Metadata && p.cfg.Tenant.Default != "" { - code, body, err := p.send(clientIP, reqID, p.cfg.Tenant.Default, wrReqIn) + code, body, _, err := p.send(clientIP, reqID, p.cfg.Tenant.Default, wrReqIn) if err != nil { ctx.Error(err.Error(), fh.StatusInternalServerError) p.Errorf("src=%s req_id=%s: unable to proxy metadata: %s", clientIP, reqID, err) @@ -168,7 +211,10 @@ func (p *processor) handle(ctx *fh.RequestCtx) { } for _, r := range results { + metricTimeseriesRequests.Inc() + if r.err != nil { + metricTimeseriesRequestErrors.Inc() errs = me.Append(errs, r.err) p.Errorf("src=%s %s", clientIP, r.err) continue @@ -183,6 +229,8 @@ func (p *processor) handle(ctx *fh.RequestCtx) { if r.code > code { code, body = r.code, r.body } + + metricTimeseriesRequestDurationMilliseconds.WithLabelValues(strconv.Itoa(r.code)).Observe(r.duration) } if errs.ErrorOrNil() != nil { @@ -201,6 +249,7 @@ func (p *processor) createWriteRequests(wrReqIn *prompb.WriteRequest) (map[strin m := map[string]*prompb.WriteRequest{} for _, ts := range wrReqIn.Timeseries { + metricTimeseriesReceived.Inc() tenant, err := p.processTimeseries(&ts) if err != nil { return nil, err @@ -256,7 +305,7 @@ func (p *processor) dispatch(clientIP net.Addr, reqID uuid.UUID, m map[string]*p defer wg.Done() var r result - r.code, r.body, r.err = p.send(clientIP, reqID, tenant, wrReq) + r.code, r.body, r.duration, r.err = p.send(clientIP, reqID, tenant, wrReq) res[idx] = r }(i, tenant, wrReq) @@ -293,7 +342,9 @@ func (p *processor) processTimeseries(ts *prompb.TimeSeries) (tenant string, err return } -func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr *prompb.WriteRequest) (code int, body []byte, err error) { +func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr *prompb.WriteRequest) (code int, body []byte, duration float64, err error) { + start := time.Now() + req := fh.AcquireRequest() resp := fh.AcquireResponse() @@ -321,6 +372,7 @@ func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr * return } + duration = float64(time.Since(start).Milliseconds()) code = resp.Header.StatusCode() body = make([]byte, len(resp.Body())) copy(body, resp.Body()) From 25a5d8afc8deab493bccee571b3c62ec56cc3780 Mon Sep 17 00:00:00 2001 From: AlexDHoffer Date: Wed, 14 Jun 2023 09:37:44 -0700 Subject: [PATCH 2/6] Add configurable metrics endpoint --- config.go | 1 + config.yml | 1 + main.go | 12 ++++++++---- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/config.go b/config.go index d67eec8..c60df2c 100644 --- a/config.go +++ b/config.go @@ -14,6 +14,7 @@ import ( type config struct { Listen string `env:"CT_LISTEN"` ListenPprof string `yaml:"listen_pprof" env:"CT_LISTEN_PPROF"` + ListenMetrics string `yaml:"listen_metrics" env:"CT_LISTEN_METRICS"` Target string `env:"CT_TARGET"` EnableIPv6 bool `yaml:"enable_ipv6" env:"CT_ENABLE_IPV6"` diff --git a/config.yml b/config.yml index a0998ab..dab4c49 100644 --- a/config.yml +++ b/config.yml @@ -1,5 +1,6 @@ listen: 0.0.0.0:8080 listen_pprof: 0.0.0.0:7008 +listen_metrics: 0.0.0.0:9090 target: http://127.0.0.1:9091/receive enable_ipv6: false diff --git a/main.go b/main.go index 7f9036f..1d40534 100644 --- a/main.go +++ b/main.go @@ -35,10 +35,14 @@ func main() { }() } - go func() { - http.Handle("/metrics", promhttp.Handler()) - http.ListenAndServe(":9090", nil) - }() + if cfg.ListenMetrics != "" { + go func() { + http.Handle("/metrics", promhttp.Handler()) + if err := http.ListenAndServe(cfg.ListenMetrics, nil); err != nil { + log.Fatalf("Unable to listen on %s: %s", cfg.ListenMetrics, err) + } + }() + } lvl, err := log.ParseLevel(cfg.LogLevel) if err != nil { From deaea29d26e180b0b417ea43ca2387d7695963c7 Mon Sep 17 00:00:00 2001 From: AlexDHoffer Date: Wed, 14 Jun 2023 10:46:06 -0700 Subject: [PATCH 3/6] Return result rather than n-tuple --- processor.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/processor.go b/processor.go index e65b73b..f47d309 100644 --- a/processor.go +++ b/processor.go @@ -175,15 +175,14 @@ func (p *processor) handle(ctx *fh.RequestCtx) { // If there's metadata - just accept the request and drop it if len(wrReqIn.Metadata) > 0 { if p.cfg.Metadata && p.cfg.Tenant.Default != "" { - code, body, _, err := p.send(clientIP, reqID, p.cfg.Tenant.Default, wrReqIn) - if err != nil { + r := p.send(clientIP, reqID, p.cfg.Tenant.Default, wrReqIn) + if r.err != nil { ctx.Error(err.Error(), fh.StatusInternalServerError) - p.Errorf("src=%s req_id=%s: unable to proxy metadata: %s", clientIP, reqID, err) + p.Errorf("src=%s req_id=%s: unable to proxy metadata: %s", clientIP, reqID, r.err) return } - - ctx.SetStatusCode(code) - ctx.SetBody(body) + ctx.SetStatusCode(r.code) + ctx.SetBody(r.body) } return @@ -304,8 +303,7 @@ func (p *processor) dispatch(clientIP net.Addr, reqID uuid.UUID, m map[string]*p go func(idx int, tenant string, wrReq *prompb.WriteRequest) { defer wg.Done() - var r result - r.code, r.body, r.duration, r.err = p.send(clientIP, reqID, tenant, wrReq) + r := p.send(clientIP, reqID, tenant, wrReq) res[idx] = r }(i, tenant, wrReq) @@ -342,7 +340,7 @@ func (p *processor) processTimeseries(ts *prompb.TimeSeries) (tenant string, err return } -func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr *prompb.WriteRequest) (code int, body []byte, duration float64, err error) { +func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr *prompb.WriteRequest) (r result) { start := time.Now() req := fh.AcquireRequest() @@ -355,6 +353,7 @@ func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr * buf, err := p.marshal(wr) if err != nil { + r.err = err return } @@ -369,13 +368,14 @@ func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr * req.SetBody(buf) if err = p.cli.DoTimeout(req, resp, p.cfg.Timeout); err != nil { + r.err = err return } - duration = float64(time.Since(start).Milliseconds()) - code = resp.Header.StatusCode() - body = make([]byte, len(resp.Body())) - copy(body, resp.Body()) + r.code = resp.Header.StatusCode() + r.body = make([]byte, len(resp.Body())) + copy(r.body, resp.Body()) + r.duration = float64(time.Since(start).Milliseconds()) return } From b40b0c649a335e9d862debae8041e9164c838190 Mon Sep 17 00:00:00 2001 From: AlexDHoffer Date: Wed, 14 Jun 2023 10:50:40 -0700 Subject: [PATCH 4/6] Cleaner millisecond calculation --- processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor.go b/processor.go index f47d309..49d98d5 100644 --- a/processor.go +++ b/processor.go @@ -375,7 +375,7 @@ func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr * r.code = resp.Header.StatusCode() r.body = make([]byte, len(resp.Body())) copy(r.body, resp.Body()) - r.duration = float64(time.Since(start).Milliseconds()) + r.duration = time.Since(start).Seconds() / 1000 return } From 6f0c0c6afc55cd257ca975652b67ea275ea36379 Mon Sep 17 00:00:00 2001 From: AlexDHoffer Date: Wed, 14 Jun 2023 14:20:30 -0700 Subject: [PATCH 5/6] Add support for multitenancy --- config.go | 11 ++++++++--- config.yml | 3 ++- main.go | 14 ++++++-------- processor.go | 50 +++++++++++++++++++++++++++++++------------------- 4 files changed, 47 insertions(+), 31 deletions(-) diff --git a/config.go b/config.go index c60df2c..066a268 100644 --- a/config.go +++ b/config.go @@ -12,9 +12,10 @@ import ( ) type config struct { - Listen string `env:"CT_LISTEN"` - ListenPprof string `yaml:"listen_pprof" env:"CT_LISTEN_PPROF"` - ListenMetrics string `yaml:"listen_metrics" env:"CT_LISTEN_METRICS"` + Listen string `env:"CT_LISTEN"` + ListenPprof string `yaml:"listen_pprof" env:"CT_LISTEN_PPROF"` + ListenMetricsAddress string `yaml:"listen_metrics_address" env: "CT_LISTEN_METRICS_ADDRESS"` + ListenMetricsIncludeTenant bool `yaml:"listen_metrics_include_tenant" env: "CT_LISTEN_METRICS_INCLUDE_TENANT"` Target string `env:"CT_TARGET"` EnableIPv6 bool `yaml:"enable_ipv6" env:"CT_ENABLE_IPV6"` @@ -69,6 +70,10 @@ func configLoad(file string) (*config, error) { cfg.Listen = "127.0.0.1:8081" } + if cfg.ListenMetricsAddress == "" { + cfg.ListenMetricsAddress = "0.0.0.0:9090" + } + if cfg.LogLevel == "" { cfg.LogLevel = "warn" } diff --git a/config.yml b/config.yml index dab4c49..607a9ea 100644 --- a/config.yml +++ b/config.yml @@ -1,6 +1,7 @@ listen: 0.0.0.0:8080 listen_pprof: 0.0.0.0:7008 -listen_metrics: 0.0.0.0:9090 +listen_metrics_address: 0.0.0.0:9090 +listen_metrics_include_tenant: true target: http://127.0.0.1:9091/receive enable_ipv6: false diff --git a/main.go b/main.go index 1d40534..a8d9f74 100644 --- a/main.go +++ b/main.go @@ -35,14 +35,12 @@ func main() { }() } - if cfg.ListenMetrics != "" { - go func() { - http.Handle("/metrics", promhttp.Handler()) - if err := http.ListenAndServe(cfg.ListenMetrics, nil); err != nil { - log.Fatalf("Unable to listen on %s: %s", cfg.ListenMetrics, err) - } - }() - } + go func() { + http.Handle("/metrics", promhttp.Handler()) + if err := http.ListenAndServe(cfg.ListenMetricsAddress, nil); err != nil { + log.Fatalf("Unable to listen on %s: %s", cfg.ListenMetricsAddress, err) + } + }() lvl, err := log.ParseLevel(cfg.LogLevel) if err != nil { diff --git a/processor.go b/processor.go index 49d98d5..1d2178f 100644 --- a/processor.go +++ b/processor.go @@ -34,35 +34,36 @@ var ( Help: "Size in bytes of timeseries batches received.", Buckets: []float64{0.5, 1, 10, 25, 100, 250, 500, 1000, 5000, 10000, 30000, 300000, 600000, 1800000, 3600000}, }) - metricTimeseriesReceived = promauto.NewCounter(prometheus.CounterOpts{ + metricTimeseriesReceived = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex_tenant", Name: "timeseries_received", Help: "The total number of timeseries received.", - }) + }, []string{"tenant"}) metricTimeseriesRequestDurationMilliseconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex_tenant", Name: "timeseries_request_duration_milliseconds", Help: "HTTP write request duration for tenant-specific timeseries in milliseconds, filtered by response code.", Buckets: []float64{0.5, 1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 30000, 60000, 1800000, 3600000}, }, - []string{"code"}, + []string{"code", "tenant"}, ) - metricTimeseriesRequestErrors = promauto.NewCounter(prometheus.CounterOpts{ + metricTimeseriesRequestErrors = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex_tenant", Name: "timeseries_request_errors", Help: "The total number of tenant-specific timeseries writes that yielded errors.", - }) - metricTimeseriesRequests = promauto.NewCounter(prometheus.CounterOpts{ + }, []string{"tenant"}) + metricTimeseriesRequests = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex_tenant", Name: "timeseries_requests", Help: "The total number of tenant-specific timeseries writes.", - }) + }, []string{"tenant"}) ) type result struct { code int body []byte duration float64 + tenant string err error } @@ -175,8 +176,8 @@ func (p *processor) handle(ctx *fh.RequestCtx) { // If there's metadata - just accept the request and drop it if len(wrReqIn.Metadata) > 0 { if p.cfg.Metadata && p.cfg.Tenant.Default != "" { - r := p.send(clientIP, reqID, p.cfg.Tenant.Default, wrReqIn) - if r.err != nil { + r := p.send(clientIP, reqID, p.cfg.Tenant.Default, wrReqIn) + if r.err != nil { ctx.Error(err.Error(), fh.StatusInternalServerError) p.Errorf("src=%s req_id=%s: unable to proxy metadata: %s", clientIP, reqID, r.err) return @@ -198,6 +199,7 @@ func (p *processor) handle(ctx *fh.RequestCtx) { return } + metricTenant := "" var errs *me.Error results := p.dispatch(clientIP, reqID, m) @@ -210,10 +212,14 @@ func (p *processor) handle(ctx *fh.RequestCtx) { } for _, r := range results { - metricTimeseriesRequests.Inc() + if p.cfg.ListenMetricsIncludeTenant { + metricTenant = r.tenant + } + + metricTimeseriesRequests.WithLabelValues(metricTenant).Inc() if r.err != nil { - metricTimeseriesRequestErrors.Inc() + metricTimeseriesRequestErrors.WithLabelValues(metricTenant).Inc() errs = me.Append(errs, r.err) p.Errorf("src=%s %s", clientIP, r.err) continue @@ -229,7 +235,7 @@ func (p *processor) handle(ctx *fh.RequestCtx) { code, body = r.code, r.body } - metricTimeseriesRequestDurationMilliseconds.WithLabelValues(strconv.Itoa(r.code)).Observe(r.duration) + metricTimeseriesRequestDurationMilliseconds.WithLabelValues(strconv.Itoa(r.code), metricTenant).Observe(r.duration) } if errs.ErrorOrNil() != nil { @@ -248,12 +254,17 @@ func (p *processor) createWriteRequests(wrReqIn *prompb.WriteRequest) (map[strin m := map[string]*prompb.WriteRequest{} for _, ts := range wrReqIn.Timeseries { - metricTimeseriesReceived.Inc() tenant, err := p.processTimeseries(&ts) if err != nil { return nil, err } + if p.cfg.ListenMetricsIncludeTenant { + metricTimeseriesReceived.WithLabelValues(tenant).Inc() + } else { + metricTimeseriesReceived.WithLabelValues("").Inc() + } + wrReqOut, ok := m[tenant] if !ok { wrReqOut = &prompb.WriteRequest{} @@ -342,6 +353,7 @@ func (p *processor) processTimeseries(ts *prompb.TimeSeries) (tenant string, err func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr *prompb.WriteRequest) (r result) { start := time.Now() + r.tenant = tenant req := fh.AcquireRequest() resp := fh.AcquireResponse() @@ -353,7 +365,7 @@ func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr * buf, err := p.marshal(wr) if err != nil { - r.err = err + r.err = err return } @@ -368,14 +380,14 @@ func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr * req.SetBody(buf) if err = p.cli.DoTimeout(req, resp, p.cfg.Timeout); err != nil { - r.err = err + r.err = err return } - r.code = resp.Header.StatusCode() - r.body = make([]byte, len(resp.Body())) - copy(r.body, resp.Body()) - r.duration = time.Since(start).Seconds() / 1000 + r.code = resp.Header.StatusCode() + r.body = make([]byte, len(resp.Body())) + copy(r.body, resp.Body()) + r.duration = time.Since(start).Seconds() / 1000 return } From 955312f11a3f83be58567851e0dac40540cd5d6d Mon Sep 17 00:00:00 2001 From: AlexDHoffer Date: Thu, 15 Jun 2023 08:25:10 -0700 Subject: [PATCH 6/6] Change var name --- config.go | 8 ++++---- config.yml | 4 ++-- processor.go | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/config.go b/config.go index 066a268..acb0cac 100644 --- a/config.go +++ b/config.go @@ -12,10 +12,10 @@ import ( ) type config struct { - Listen string `env:"CT_LISTEN"` - ListenPprof string `yaml:"listen_pprof" env:"CT_LISTEN_PPROF"` - ListenMetricsAddress string `yaml:"listen_metrics_address" env: "CT_LISTEN_METRICS_ADDRESS"` - ListenMetricsIncludeTenant bool `yaml:"listen_metrics_include_tenant" env: "CT_LISTEN_METRICS_INCLUDE_TENANT"` + Listen string `env:"CT_LISTEN"` + ListenPprof string `yaml:"listen_pprof" env:"CT_LISTEN_PPROF"` + ListenMetricsAddress string `yaml:"listen_metrics_address" env: "CT_LISTEN_METRICS_ADDRESS"` + MetricsIncludeTenant bool `yaml:"metrics_include_tenant" env: "CT_METRICS_INCLUDE_TENANT"` Target string `env:"CT_TARGET"` EnableIPv6 bool `yaml:"enable_ipv6" env:"CT_ENABLE_IPV6"` diff --git a/config.yml b/config.yml index 607a9ea..7ceba34 100644 --- a/config.yml +++ b/config.yml @@ -1,7 +1,7 @@ listen: 0.0.0.0:8080 listen_pprof: 0.0.0.0:7008 listen_metrics_address: 0.0.0.0:9090 -listen_metrics_include_tenant: true +metrics_include_tenant: true target: http://127.0.0.1:9091/receive enable_ipv6: false @@ -24,4 +24,4 @@ tenant: label_remove: true header: X-Scope-OrgID default: "" - accept_all: false + accept_all: false \ No newline at end of file diff --git a/processor.go b/processor.go index 1d2178f..d18b1c2 100644 --- a/processor.go +++ b/processor.go @@ -212,7 +212,7 @@ func (p *processor) handle(ctx *fh.RequestCtx) { } for _, r := range results { - if p.cfg.ListenMetricsIncludeTenant { + if p.cfg.MetricsIncludeTenant { metricTenant = r.tenant } @@ -259,7 +259,7 @@ func (p *processor) createWriteRequests(wrReqIn *prompb.WriteRequest) (map[strin return nil, err } - if p.cfg.ListenMetricsIncludeTenant { + if p.cfg.MetricsIncludeTenant { metricTimeseriesReceived.WithLabelValues(tenant).Inc() } else { metricTimeseriesReceived.WithLabelValues("").Inc()