Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metric for error codes resulting from forwarding requests #2077

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions pkg/distributor/forwarding/forwarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"io/ioutil"
"net/http"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -60,9 +61,10 @@ type forwarder struct {
pools pools
client http.Client

requestsTotal *prometheus.CounterVec
requestLatencyHistogram *prometheus.HistogramVec
samplesTotal *prometheus.CounterVec
requestsTotal prometheus.Counter
errorsTotal *prometheus.CounterVec
samplesTotal prometheus.Counter
requestLatencyHistogram prometheus.Histogram
}

// NewForwarder returns a new forwarder, if forwarding is disabled it returns nil.
Expand All @@ -79,22 +81,27 @@ func NewForwarder(reg prometheus.Registerer, cfg Config) Forwarder {
snappy: sync.Pool{New: func() interface{} { return &[]byte{} }},
},

requestsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
requestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_forward_requests_total",
Help: "The total number of requests the Distributor made to forward samples.",
}, []string{"user"}),
samplesTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
}),
errorsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_forward_errors_total",
Help: "The total number of errors that the distributor received from forwarding targets when trying to send samples to them.",
}, []string{"status_code"}),
samplesTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_forward_samples_total",
Help: "The total number of samples the Distributor forwarded.",
}, []string{"user"}),
requestLatencyHistogram: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
}),
requestLatencyHistogram: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "distributor_forward_requests_latency_seconds",
Help: "The client-side latency of requests to forward metrics made by the Distributor.",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30},
}, []string{"user"}),
}),
}
}

Expand All @@ -110,9 +117,10 @@ func (r *forwarder) NewRequest(ctx context.Context, tenant string, rules validat
timeout: r.cfg.RequestTimeout,
propagateErrors: r.cfg.PropagateErrors,

requests: r.requestsTotal.WithLabelValues(tenant),
samples: r.samplesTotal.WithLabelValues(tenant),
latency: r.requestLatencyHistogram.WithLabelValues(tenant),
requests: r.requestsTotal,
errors: r.errorsTotal,
samples: r.samplesTotal,
latency: r.requestLatencyHistogram,
}
}

Expand All @@ -132,8 +140,9 @@ type request struct {
propagateErrors bool

requests prometheus.Counter
errors *prometheus.CounterVec
samples prometheus.Counter
latency prometheus.Observer
latency prometheus.Histogram
}

func (r *request) Add(sample mimirpb.PreallocTimeseries) bool {
Expand Down Expand Up @@ -278,6 +287,7 @@ func (r *request) sendToEndpoint(ctx context.Context, endpoint string, ts []mimi
if scanner.Scan() {
line = scanner.Text()
}
r.errors.WithLabelValues(strconv.Itoa(httpResp.StatusCode)).Inc()
err := errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
if httpResp.StatusCode/100 == 5 || httpResp.StatusCode == http.StatusTooManyRequests {
return recoverableError{err}
Expand Down
30 changes: 27 additions & 3 deletions pkg/distributor/forwarding/forwarding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"

Expand Down Expand Up @@ -88,10 +89,10 @@ func TestForwardingSamplesSuccessfully(t *testing.T) {
expectedMetrics := `
# HELP cortex_distributor_forward_requests_total The total number of requests the Distributor made to forward samples.
# TYPE cortex_distributor_forward_requests_total counter
cortex_distributor_forward_requests_total{user="tenant"} 2
cortex_distributor_forward_requests_total{} 2
# HELP cortex_distributor_forward_samples_total The total number of samples the Distributor forwarded.
# TYPE cortex_distributor_forward_samples_total counter
cortex_distributor_forward_samples_total{user="tenant"} 4
cortex_distributor_forward_samples_total{} 4
`

require.NoError(t, testutil.GatherAndCompare(
Expand Down Expand Up @@ -168,12 +169,19 @@ func TestForwardingSamplesWithDifferentErrorsWithPropagation(t *testing.T) {
const tenant = "tenant"
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
forwarder := NewForwarder(nil, tc.config)
reg := prometheus.NewRegistry()
forwarder := NewForwarder(reg, tc.config)
urls := make([]string, len(tc.remoteStatusCodes))
closers := make([]func(), len(tc.remoteStatusCodes))
expectedErrorsByStatusCode := make(map[int]int)
for i, code := range tc.remoteStatusCodes {
urls[i], _, _, closers[i] = newTestServer(t, code, false)
defer closers[i]()

if code/100 == 2 {
continue
}
expectedErrorsByStatusCode[code]++
}

rules := make(validation.ForwardingRules)
Expand Down Expand Up @@ -202,6 +210,22 @@ func TestForwardingSamplesWithDifferentErrorsWithPropagation(t *testing.T) {
require.True(t, ok)
require.Equal(t, tc.expectedError, errorType(resp.Code))
}

var expectedMetrics strings.Builder
if len(expectedErrorsByStatusCode) > 0 {
expectedMetrics.WriteString(`
# TYPE cortex_distributor_forward_errors_total counter
# HELP cortex_distributor_forward_errors_total The total number of errors that the distributor received from forwarding targets when trying to send samples to them.`)
}
for statusCode, count := range expectedErrorsByStatusCode {
expectedMetrics.WriteString(fmt.Sprintf(`
cortex_distributor_forward_errors_total{status_code="%d"} %d
`, statusCode, count))
}

assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics.String()),
"cortex_distributor_forward_errors_total",
))
})
}
}
Expand Down