Skip to content

Commit

Permalink
Add metric for error codes resulting from forwarding requests (#2077)
Browse files Browse the repository at this point in the history
* add metric for error codes resulting from forwarding requests

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* add unit test for forwarding errors

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* linter

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* PR feedback

Co-authored-by: Ursula Kallio <ursula.kallio@grafana.com>

* key metric by target instead of user

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* improve help text

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* update tests to expect updated help string

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* updated to key by target

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* remove target and tenant labels

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

Co-authored-by: Ursula Kallio <ursula.kallio@grafana.com>
  • Loading branch information
replay and osg-grafana committed Jun 15, 2022
1 parent acc1d15 commit b794cf2
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 16 deletions.
36 changes: 23 additions & 13 deletions pkg/distributor/forwarding/forwarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"io/ioutil"
"net/http"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -60,9 +61,10 @@ type forwarder struct {
pools pools
client http.Client

requestsTotal *prometheus.CounterVec
requestLatencyHistogram *prometheus.HistogramVec
samplesTotal *prometheus.CounterVec
requestsTotal prometheus.Counter
errorsTotal *prometheus.CounterVec
samplesTotal prometheus.Counter
requestLatencyHistogram prometheus.Histogram
}

// NewForwarder returns a new forwarder, if forwarding is disabled it returns nil.
Expand All @@ -79,22 +81,27 @@ func NewForwarder(reg prometheus.Registerer, cfg Config) Forwarder {
snappy: sync.Pool{New: func() interface{} { return &[]byte{} }},
},

requestsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
requestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_forward_requests_total",
Help: "The total number of requests the Distributor made to forward samples.",
}, []string{"user"}),
samplesTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
}),
errorsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_forward_errors_total",
Help: "The total number of errors that the distributor received from forwarding targets when trying to send samples to them.",
}, []string{"status_code"}),
samplesTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_forward_samples_total",
Help: "The total number of samples the Distributor forwarded.",
}, []string{"user"}),
requestLatencyHistogram: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
}),
requestLatencyHistogram: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "distributor_forward_requests_latency_seconds",
Help: "The client-side latency of requests to forward metrics made by the Distributor.",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 20, 30},
}, []string{"user"}),
}),
}
}

Expand All @@ -110,9 +117,10 @@ func (r *forwarder) NewRequest(ctx context.Context, tenant string, rules validat
timeout: r.cfg.RequestTimeout,
propagateErrors: r.cfg.PropagateErrors,

requests: r.requestsTotal.WithLabelValues(tenant),
samples: r.samplesTotal.WithLabelValues(tenant),
latency: r.requestLatencyHistogram.WithLabelValues(tenant),
requests: r.requestsTotal,
errors: r.errorsTotal,
samples: r.samplesTotal,
latency: r.requestLatencyHistogram,
}
}

Expand All @@ -132,8 +140,9 @@ type request struct {
propagateErrors bool

requests prometheus.Counter
errors *prometheus.CounterVec
samples prometheus.Counter
latency prometheus.Observer
latency prometheus.Histogram
}

func (r *request) Add(sample mimirpb.PreallocTimeseries) bool {
Expand Down Expand Up @@ -278,6 +287,7 @@ func (r *request) sendToEndpoint(ctx context.Context, endpoint string, ts []mimi
if scanner.Scan() {
line = scanner.Text()
}
r.errors.WithLabelValues(strconv.Itoa(httpResp.StatusCode)).Inc()
err := errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
if httpResp.StatusCode/100 == 5 || httpResp.StatusCode == http.StatusTooManyRequests {
return recoverableError{err}
Expand Down
30 changes: 27 additions & 3 deletions pkg/distributor/forwarding/forwarding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"

Expand Down Expand Up @@ -88,10 +89,10 @@ func TestForwardingSamplesSuccessfully(t *testing.T) {
expectedMetrics := `
# HELP cortex_distributor_forward_requests_total The total number of requests the Distributor made to forward samples.
# TYPE cortex_distributor_forward_requests_total counter
cortex_distributor_forward_requests_total{user="tenant"} 2
cortex_distributor_forward_requests_total{} 2
# HELP cortex_distributor_forward_samples_total The total number of samples the Distributor forwarded.
# TYPE cortex_distributor_forward_samples_total counter
cortex_distributor_forward_samples_total{user="tenant"} 4
cortex_distributor_forward_samples_total{} 4
`

require.NoError(t, testutil.GatherAndCompare(
Expand Down Expand Up @@ -168,12 +169,19 @@ func TestForwardingSamplesWithDifferentErrorsWithPropagation(t *testing.T) {
const tenant = "tenant"
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
forwarder := NewForwarder(nil, tc.config)
reg := prometheus.NewRegistry()
forwarder := NewForwarder(reg, tc.config)
urls := make([]string, len(tc.remoteStatusCodes))
closers := make([]func(), len(tc.remoteStatusCodes))
expectedErrorsByStatusCode := make(map[int]int)
for i, code := range tc.remoteStatusCodes {
urls[i], _, _, closers[i] = newTestServer(t, code, false)
defer closers[i]()

if code/100 == 2 {
continue
}
expectedErrorsByStatusCode[code]++
}

rules := make(validation.ForwardingRules)
Expand Down Expand Up @@ -202,6 +210,22 @@ func TestForwardingSamplesWithDifferentErrorsWithPropagation(t *testing.T) {
require.True(t, ok)
require.Equal(t, tc.expectedError, errorType(resp.Code))
}

var expectedMetrics strings.Builder
if len(expectedErrorsByStatusCode) > 0 {
expectedMetrics.WriteString(`
# TYPE cortex_distributor_forward_errors_total counter
# HELP cortex_distributor_forward_errors_total The total number of errors that the distributor received from forwarding targets when trying to send samples to them.`)
}
for statusCode, count := range expectedErrorsByStatusCode {
expectedMetrics.WriteString(fmt.Sprintf(`
cortex_distributor_forward_errors_total{status_code="%d"} %d
`, statusCode, count))
}

assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics.String()),
"cortex_distributor_forward_errors_total",
))
})
}
}
Expand Down

0 comments on commit b794cf2

Please sign in to comment.