Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
name = "github.com/prometheus/alertmanager"
revision = "0ecc59076ca6b4cbb63252fa7720a3d89d1c81d3"

[[dependencies]]
name = "github.com/weaveworks/common"
branch = "master"

[[overrides]]
name = "github.com/Azure/azure-sdk-for-go"
revision = "bd73d950fa4440dae889bd9917bff7cef539f86e"
Expand Down
36 changes: 11 additions & 25 deletions pkg/distributor/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"fmt"
"net/http"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/prometheus/common/log"
"github.com/prometheus/prometheus/promql"

"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/cortex/pkg/ingester/client"
"github.com/weaveworks/cortex/pkg/util"
)
Expand All @@ -25,28 +23,6 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
return
}

if _, err := d.Push(r.Context(), &req); err != nil {
if grpc.Code(err) == codes.ResourceExhausted {
switch grpc.ErrorDesc(err) {
case util.ErrUserSeriesLimitExceeded.Error():
err = util.ErrUserSeriesLimitExceeded
case util.ErrMetricSeriesLimitExceeded.Error():
err = util.ErrMetricSeriesLimitExceeded
}
}

var code int
switch err {
case errIngestionRateLimitExceeded, util.ErrUserSeriesLimitExceeded, util.ErrMetricSeriesLimitExceeded:
code = http.StatusTooManyRequests
default:
code = http.StatusInternalServerError
}
http.Error(w, err.Error(), code)
log.Errorf("append err: %v", err)
return
}

if d.cfg.EnableBilling {
var samples int64
for _, ts := range req.Timeseries {
Expand All @@ -56,6 +32,16 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
log.Errorf("Error emitting billing record: %v", err)
}
}

if _, err := d.Push(r.Context(), &req); err != nil {
log.Errorf("Push error: %v", err)
if httpResp, ok := httpgrpc.HTTPResponseFromError(err); ok {
http.Error(w, string(httpResp.Body), int(httpResp.Code))
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}
}

// UserStats models ingestion statistics for one user.
Expand Down
15 changes: 10 additions & 5 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package ingester
import (
"flag"
"fmt"
"net/http"
"os"
"sync"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
Expand All @@ -18,6 +17,7 @@ import (
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/metric"

"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
cortex_chunk "github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/ingester/client"
Expand Down Expand Up @@ -273,11 +273,16 @@ func New(cfg Config, chunkStore ChunkStore) (*Ingester, error) {
func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
var lastPartialErr error
samples := util.FromWriteRequest(req)

samples:
for j := range samples {
if err := i.append(ctx, &samples[j]); err != nil {
if err == util.ErrUserSeriesLimitExceeded || err == util.ErrMetricSeriesLimitExceeded {
lastPartialErr = grpc.Errorf(codes.ResourceExhausted, err.Error())
continue
if httpResp, ok := httpgrpc.HTTPResponseFromError(err); ok {
switch httpResp.Code {
case http.StatusBadRequest, http.StatusTooManyRequests:
lastPartialErr = err
continue samples
}
}
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ package ingester

import (
"fmt"
"net/http"
"sort"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"google.golang.org/grpc"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/metric"

"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/util"
Expand Down Expand Up @@ -191,7 +192,7 @@ func TestIngesterUserSeriesLimitExceeded(t *testing.T) {

// Append to two series, expect series-exceeded error.
_, err = ing.Push(ctx, util.ToWriteRequest([]model.Sample{sample2, sample3}))
if grpc.ErrorDesc(err) != util.ErrUserSeriesLimitExceeded.Error() {
if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code != http.StatusTooManyRequests {
t.Fatalf("expected error about exceeding metrics per user, got %v", err)
}

Expand Down Expand Up @@ -262,7 +263,7 @@ func TestIngesterMetricSeriesLimitExceeded(t *testing.T) {

// Append to two series, expect series-exceeded error.
_, err = ing.Push(ctx, util.ToWriteRequest([]model.Sample{sample2, sample3}))
if grpc.ErrorDesc(err) != util.ErrMetricSeriesLimitExceeded.Error() {
if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code != http.StatusTooManyRequests {
t.Fatalf("expected error about exceeding series per metric, got %v", err)
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/ingester/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package ingester

import (
"fmt"
"net/http"
"sort"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/metric"
"github.com/weaveworks/common/httpgrpc"
)

var discardedSamples = prometheus.NewCounterVec(
Expand Down Expand Up @@ -64,11 +66,11 @@ func (s *memorySeries) add(v model.SamplePair) error {
}
if v.Timestamp == s.lastTime {
discardedSamples.WithLabelValues(duplicateSample).Inc()
return fmt.Errorf("sample with repeated timestamp but different value for series %v; last value: %v, incoming value: %v", s.metric, s.lastSampleValue, v.Value) // Caused by the caller.
return httpgrpc.Errorf(http.StatusBadRequest, "sample with repeated timestamp but different value for series %v; last value: %v, incoming value: %v", s.metric, s.lastSampleValue, v.Value)
}
if v.Timestamp < s.lastTime {
discardedSamples.WithLabelValues(outOfOrderTimestamp).Inc()
return fmt.Errorf("sample timestamp out of order for series %v; last timestamp: %v, incoming timestamp: %v", s.metric, s.lastTime, v.Timestamp) // Caused by the caller.
return httpgrpc.Errorf(http.StatusBadRequest, "sample timestamp out of order for series %v; last timestamp: %v, incoming timestamp: %v", s.metric, s.lastTime, v.Timestamp) // Caused by the caller.
}

if len(s.chunkDescs) == 0 || s.headChunkClosed {
Expand Down
6 changes: 4 additions & 2 deletions pkg/ingester/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package ingester
import (
"flag"
"fmt"
"net/http"
"sync"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/metric"
"golang.org/x/net/context"

"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"github.com/weaveworks/cortex/pkg/util"
)
Expand Down Expand Up @@ -198,7 +200,7 @@ func (u *userState) unlockedGet(metric model.Metric, cfg *UserStatesConfig) (mod
// serially), and the overshoot in allowed series would be minimal.
if u.fpToSeries.length() >= cfg.MaxSeriesPerUser {
u.fpLocker.Unlock(fp)
return fp, nil, util.ErrUserSeriesLimitExceeded
return fp, nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user series limit exceeded")
}

metricName, err := util.ExtractMetricNameFromMetric(metric)
Expand All @@ -209,7 +211,7 @@ func (u *userState) unlockedGet(metric model.Metric, cfg *UserStatesConfig) (mod

if !u.canAddSeriesFor(metricName, cfg) {
u.fpLocker.Unlock(fp)
return fp, nil, util.ErrMetricSeriesLimitExceeded
return fp, nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-metric series limit exceeded")
}

series = newMemorySeries(metric)
Expand Down
14 changes: 0 additions & 14 deletions pkg/util/error.go

This file was deleted.

20 changes: 15 additions & 5 deletions pkg/util/validate.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package util

import (
"net/http"
"regexp"

"github.com/prometheus/common/model"
"github.com/weaveworks/common/httpgrpc"
)

var (
Expand All @@ -13,26 +15,34 @@ var (
maxLabelValueLength = 4096
)

const (
errMissingMetricName = "sample missing metric name"
errInvalidMetricName = "sample invalid metric name: '%s'"
errInvalidLabel = "sample invalid label: '%s'"
errLabelNameTooLong = "label name too long: '%s'"
errLabelValueTooLong = "label value too long: '%s'"
)

// ValidateSample returns an err if the sample is invalid
func ValidateSample(s *model.Sample) error {
metricName, ok := s.Metric[model.MetricNameLabel]
if !ok {
return ErrMissingMetricName
return httpgrpc.Errorf(http.StatusBadRequest, errMissingMetricName)
}

if !validMetricNameRE.MatchString(string(metricName)) {
return ErrInvalidMetricName
return httpgrpc.Errorf(http.StatusBadRequest, errInvalidMetricName, metricName)
}

for k, v := range s.Metric {
if !validLabelRE.MatchString(string(k)) {
return ErrInvalidLabel
return httpgrpc.Errorf(http.StatusBadRequest, errInvalidLabel, k)
}
if len(k) > maxLabelNameLength {
return ErrLabelNameTooLong
return httpgrpc.Errorf(http.StatusBadRequest, errLabelNameTooLong, k)
}
if len(v) > maxLabelValueLength {
return ErrLabelValueTooLong
return httpgrpc.Errorf(http.StatusBadRequest, errLabelValueTooLong, v)
}
}
return nil
Expand Down
22 changes: 18 additions & 4 deletions pkg/util/validate_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
package util

import (
"net/http"
"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/weaveworks/common/httpgrpc"
)

func TestValidate(t *testing.T) {
for _, c := range []struct {
metric model.Metric
err error
}{
{map[model.LabelName]model.LabelValue{}, ErrMissingMetricName},
{map[model.LabelName]model.LabelValue{model.MetricNameLabel: " "}, ErrInvalidMetricName},
{map[model.LabelName]model.LabelValue{model.MetricNameLabel: "valid", "foo ": "bar"}, ErrInvalidLabel},
{map[model.LabelName]model.LabelValue{model.MetricNameLabel: "valid"}, nil},
{
map[model.LabelName]model.LabelValue{},
httpgrpc.Errorf(http.StatusBadRequest, errMissingMetricName),
},
{
map[model.LabelName]model.LabelValue{model.MetricNameLabel: " "},
httpgrpc.Errorf(http.StatusBadRequest, errInvalidMetricName, " "),
},
{
map[model.LabelName]model.LabelValue{model.MetricNameLabel: "valid", "foo ": "bar"},
httpgrpc.Errorf(http.StatusBadRequest, errInvalidLabel, "foo "),
},
{
map[model.LabelName]model.LabelValue{model.MetricNameLabel: "valid"},
nil,
},
} {
err := ValidateSample(&model.Sample{
Metric: c.metric,
Expand Down
Loading