Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
)

type config struct {
Listen string `env:"CT_LISTEN"`
ListenPprof string `yaml:"listen_pprof" env:"CT_LISTEN_PPROF"`
Listen string `env:"CT_LISTEN"`
ListenPprof string `yaml:"listen_pprof" env:"CT_LISTEN_PPROF"`
ListenMetricsAddress string `yaml:"listen_metrics_address" env: "CT_LISTEN_METRICS_ADDRESS"`
MetricsIncludeTenant bool `yaml:"metrics_include_tenant" env: "CT_METRICS_INCLUDE_TENANT"`

Target string `env:"CT_TARGET"`
EnableIPv6 bool `yaml:"enable_ipv6" env:"CT_ENABLE_IPV6"`
Expand Down Expand Up @@ -68,6 +70,10 @@ func configLoad(file string) (*config, error) {
cfg.Listen = "127.0.0.1:8081"
}

if cfg.ListenMetricsAddress == "" {
cfg.ListenMetricsAddress = "0.0.0.0:9090"
}

if cfg.LogLevel == "" {
cfg.LogLevel = "warn"
}
Expand Down
4 changes: 3 additions & 1 deletion config.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
listen: 0.0.0.0:8080
listen_pprof: 0.0.0.0:7008
listen_metrics_address: 0.0.0.0:9090
metrics_include_tenant: true

target: http://127.0.0.1:9091/receive
enable_ipv6: false
Expand All @@ -22,4 +24,4 @@ tenant:
label_remove: true
header: X-Scope-OrgID
default: ""
accept_all: false
accept_all: false
9 changes: 9 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@ require (

require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.15.1 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
golang.org/x/sys v0.8.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
25 changes: 25 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/blind-oracle/go-common v1.0.7 h1:pDBnhwu7JaJRDNF2PcJEZdscR2rcgPLH2sBGIzlnV/0=
github.com/blind-oracle/go-common v1.0.7/go.mod h1:Sw3z/RG/QEPZ3oabRuwcMjVkVOkeeEhKyRpcimQZoUs=
github.com/caarlos0/env/v8 v8.0.0 h1:POhxHhSpuxrLMIdvTGARuZqR4Jjm8AYmoi/JKlcScs0=
github.com/caarlos0/env/v8 v8.0.0/go.mod h1:7K4wMY9bH0esiXSSHlfHLX5xKGQMnkH5Fk4TDSSSzfo=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand All @@ -30,11 +40,21 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.15.1 h1:8tXpTmJbyH5lydzFPoxSIJ0J46jdh3tylbvM1xCv0LI=
github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk=
github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4=
github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w=
github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM=
github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc=
github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI=
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
github.com/prometheus/prometheus v0.44.0 h1:sgn8Fdx+uE5tHQn0/622swlk2XnIj6udoZCnbVjHIgc=
github.com/prometheus/prometheus v0.44.0/go.mod h1:aPsmIK3py5XammeTguyqTmuqzX/jeCdyOWWobLHNKQg=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
Expand Down Expand Up @@ -66,6 +86,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -86,6 +107,10 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
Expand Down
8 changes: 8 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
_ "net/http/pprof"

"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
)

Expand All @@ -34,6 +35,13 @@ func main() {
}()
}

go func() {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's already cfg.ListenPprof -> maybe we can use that instead of another hardcoded HTTP port? Maybe we can rename cfg.ListenPprof to something more general, like ListenAux or whatever. Or add additional config variable ListenMetrics and use that (with same default value of :9090)

http.Handle("/metrics", promhttp.Handler())
if err := http.ListenAndServe(cfg.ListenMetricsAddress, nil); err != nil {
log.Fatalf("Unable to listen on %s: %s", cfg.ListenMetricsAddress, err)
}
}()

lvl, err := log.ParseLevel(cfg.LogLevel)
if err != nil {
log.Fatalf("Unable to parse log level: %s", err)
Expand Down
94 changes: 79 additions & 15 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/base64"
"fmt"
"net"
"strconv"
"sync"
"sync/atomic"
"time"
Expand All @@ -15,14 +16,55 @@ import (
"github.com/google/uuid"
me "github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/prompb"
fh "github.com/valyala/fasthttp"
)

var (
metricTimeseriesBatchesReceived = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "cortex_tenant",
Name: "timeseries_batches_received",
Help: "The total number of batches received.",
})
metricTimeseriesBatchesReceivedBytes = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex_tenant",
Name: "timeseries_batches_received_bytes",
Help: "Size in bytes of timeseries batches received.",
Buckets: []float64{0.5, 1, 10, 25, 100, 250, 500, 1000, 5000, 10000, 30000, 300000, 600000, 1800000, 3600000},
})
metricTimeseriesReceived = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex_tenant",
Name: "timeseries_received",
Help: "The total number of timeseries received.",
}, []string{"tenant"})
metricTimeseriesRequestDurationMilliseconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex_tenant",
Name: "timeseries_request_duration_milliseconds",
Help: "HTTP write request duration for tenant-specific timeseries in milliseconds, filtered by response code.",
Buckets: []float64{0.5, 1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 30000, 60000, 1800000, 3600000},
},
[]string{"code", "tenant"},
)
metricTimeseriesRequestErrors = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex_tenant",
Name: "timeseries_request_errors",
Help: "The total number of tenant-specific timeseries writes that yielded errors.",
}, []string{"tenant"})
metricTimeseriesRequests = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex_tenant",
Name: "timeseries_requests",
Help: "The total number of tenant-specific timeseries writes.",
}, []string{"tenant"})
)

type result struct {
code int
body []byte
err error
code int
body []byte
duration float64
tenant string
err error
}

type processor struct {
Expand Down Expand Up @@ -119,6 +161,8 @@ func (p *processor) handle(ctx *fh.RequestCtx) {
return
}

metricTimeseriesBatchesReceivedBytes.Observe(float64(ctx.Request.Header.ContentLength()))
metricTimeseriesBatchesReceived.Inc()
wrReqIn, err := p.unmarshal(ctx.Request.Body())
if err != nil {
ctx.Error(err.Error(), fh.StatusBadRequest)
Expand All @@ -132,15 +176,14 @@ func (p *processor) handle(ctx *fh.RequestCtx) {
// If there's metadata - just accept the request and drop it
if len(wrReqIn.Metadata) > 0 {
if p.cfg.Metadata && p.cfg.Tenant.Default != "" {
code, body, err := p.send(clientIP, reqID, p.cfg.Tenant.Default, wrReqIn)
if err != nil {
r := p.send(clientIP, reqID, p.cfg.Tenant.Default, wrReqIn)
if r.err != nil {
ctx.Error(err.Error(), fh.StatusInternalServerError)
p.Errorf("src=%s req_id=%s: unable to proxy metadata: %s", clientIP, reqID, err)
p.Errorf("src=%s req_id=%s: unable to proxy metadata: %s", clientIP, reqID, r.err)
return
}

ctx.SetStatusCode(code)
ctx.SetBody(body)
ctx.SetStatusCode(r.code)
ctx.SetBody(r.body)
}

return
Expand All @@ -156,6 +199,7 @@ func (p *processor) handle(ctx *fh.RequestCtx) {
return
}

metricTenant := ""
var errs *me.Error
results := p.dispatch(clientIP, reqID, m)

Expand All @@ -168,7 +212,14 @@ func (p *processor) handle(ctx *fh.RequestCtx) {
}

for _, r := range results {
if p.cfg.MetricsIncludeTenant {
metricTenant = r.tenant
}

metricTimeseriesRequests.WithLabelValues(metricTenant).Inc()

if r.err != nil {
metricTimeseriesRequestErrors.WithLabelValues(metricTenant).Inc()
errs = me.Append(errs, r.err)
p.Errorf("src=%s %s", clientIP, r.err)
continue
Expand All @@ -183,6 +234,8 @@ func (p *processor) handle(ctx *fh.RequestCtx) {
if r.code > code {
code, body = r.code, r.body
}

metricTimeseriesRequestDurationMilliseconds.WithLabelValues(strconv.Itoa(r.code), metricTenant).Observe(r.duration)
}

if errs.ErrorOrNil() != nil {
Expand All @@ -206,6 +259,12 @@ func (p *processor) createWriteRequests(wrReqIn *prompb.WriteRequest) (map[strin
return nil, err
}

if p.cfg.MetricsIncludeTenant {
metricTimeseriesReceived.WithLabelValues(tenant).Inc()
} else {
metricTimeseriesReceived.WithLabelValues("").Inc()
}

wrReqOut, ok := m[tenant]
if !ok {
wrReqOut = &prompb.WriteRequest{}
Expand Down Expand Up @@ -255,8 +314,7 @@ func (p *processor) dispatch(clientIP net.Addr, reqID uuid.UUID, m map[string]*p
go func(idx int, tenant string, wrReq *prompb.WriteRequest) {
defer wg.Done()

var r result
r.code, r.body, r.err = p.send(clientIP, reqID, tenant, wrReq)
r := p.send(clientIP, reqID, tenant, wrReq)
res[idx] = r
}(i, tenant, wrReq)

Expand Down Expand Up @@ -293,7 +351,10 @@ func (p *processor) processTimeseries(ts *prompb.TimeSeries) (tenant string, err
return
}

func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr *prompb.WriteRequest) (code int, body []byte, err error) {
func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr *prompb.WriteRequest) (r result) {
start := time.Now()
r.tenant = tenant

req := fh.AcquireRequest()
resp := fh.AcquireResponse()

Expand All @@ -304,6 +365,7 @@ func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr *

buf, err := p.marshal(wr)
if err != nil {
r.err = err
return
}

Expand All @@ -318,12 +380,14 @@ func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr *
req.SetBody(buf)

if err = p.cli.DoTimeout(req, resp, p.cfg.Timeout); err != nil {
r.err = err
return
}

code = resp.Header.StatusCode()
body = make([]byte, len(resp.Body()))
copy(body, resp.Body())
r.code = resp.Header.StatusCode()
r.body = make([]byte, len(resp.Body()))
copy(r.body, resp.Body())
r.duration = time.Since(start).Seconds() / 1000

return
}
Expand Down