Skip to content

Commit

Permalink
Manually apply suggestions from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
zenador committed Jun 17, 2022
1 parent 48a6d42 commit b8f6b57
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 48 deletions.
18 changes: 6 additions & 12 deletions docs/sources/operators-guide/mimir-runbooks/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1394,10 +1394,6 @@ How it **works**:

- If the incoming timestamp is more than 1 hour older than the most recent timestamp ingested for the tenant, the sample will be rejected.

How to **fix** it:

- This error may occur when the system time of your Prometheus instance has been shifted backwards. If this was a mistake, fix the system time back to normal. Otherwise, wait until the system time catches up to the time it was changed, or delete all ingested samples with a timestamp later than system time.

### err-mimir-sample-out-of-order

This error occurs when the ingester rejects a sample because another sample with a more recent timestamp has already been ingested.
Expand All @@ -1411,29 +1407,27 @@ Common **causes**:
- Your code has a single target that exposes the same time series multiple times, or multiple targets with identical labels.
- System time of your Prometheus instance has been shifted backwards. If this was a mistake, fix the system time back to normal. Otherwise, wait until the system time catches up to the time it was changed, or delete all ingested samples with a timestamp later than system time.
- You are running multiple Prometheus instances pushing the same metrics and [your high-availability tracker is not properly configured for deduplication]({{< relref "../configuring/configuring-high-availability-deduplication.md" >}}).
- Prometheus relabelling has been configured and it causes series to clash after the relabelling. Check the error message for information about which series has received a sample out of order.
- A Prometheus instance was restarted, and it pushed all data from its Write-Ahead Log to remote write upon restart, some of which has already been pushed and ingested. This is normal and can be ignored.

[More details here](https://www.robustperception.io/debugging-out-of-order-samples/) [and here](https://github.com/cortexproject/cortex/issues/3411) [and here](https://github.com/cortexproject/cortex/issues/2662).
[More details here](https://www.robustperception.io/debugging-out-of-order-samples/).

### err-mimir-sample-duplicate-timestamp

This error occurs when the ingester rejects a sample because it is a duplicate of a previously received sample with the same timestamp but different value in the same time series.

How to **fix** it:
Common **causes**:

- Check if you have multiple endpoints exporting the same metrics (especially if timestamp is included e.g. cAdvisor), or multiple Prometheus instances scraping different metrics with identical labels, and fix that.
- Multiple endpoints are exporting the same metrics, or multiple Prometheus instances are scraping different metrics with identical labels.
- Prometheus relabelling has been configured and it causes series to clash after the relabelling. Check the error message for information about which series has received a duplicate sample.

### err-mimir-exemplar-series-missing

This error occurs when the ingester rejects an exemplar because its related series has not been ingested yet.

How it **works**:

- The series must already exist before exemplars can be appended, as we do not create new series upon ingesting exemplars.

How to **fix** it:

- First send a sample from the related series to be ingested.
- The series must already exist before exemplars can be appended, as we do not create new series upon ingesting exemplars. The series will be created when a sample from it is ingested.

## Mimir routes by path

Expand Down
48 changes: 18 additions & 30 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,7 @@ const (
// IngesterRingKey is the key under which we store the ingesters ring in the KVStore.
IngesterRingKey = "ring"

errTSDBCreateIncompatibleState = "cannot create a new TSDB while the ingester is not in active state (current state: %s)"
errTSDBIngest = "%v. The affected sample has timestamp %s and is from series %s"
errTSDBIngestExemplarMissingSeries = "%v. The affected exemplar is %s with timestamp %s for series %s"
errTSDBIngestExemplarOther = "err: %v. timestamp=%s, series=%s, exemplar=%s"
errTSDBCreateIncompatibleState = "cannot create a new TSDB while the ingester is not in active state (current state: %s)"

// Jitter applied to the idle timeout to prevent compaction in all ingesters concurrently.
compactionIdleTimeoutJitter = 0.25
Expand All @@ -92,10 +89,6 @@ const (
sampleOutOfBounds = "sample-out-of-bounds"
)

var (
errExemplarMissingSeriesRef = errors.New("the exemplar has been rejected because the related series has not been ingested yet")
)

// BlocksUploader interface is used to have an easy way to mock it in tests.
type BlocksUploader interface {
Sync(ctx context.Context) (uploaded int, err error)
Expand Down Expand Up @@ -637,7 +630,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
sampleOutOfBoundsCount += len(ts.Samples)

updateFirstPartial(func() error {
return wrappedTSDBIngestErrSampleTimestampTooOld(model.Time(ts.Samples[0].TimestampMs), ts.Labels)
return newIngestErrSampleTimestampTooOld(model.Time(ts.Samples[0].TimestampMs), ts.Labels)
})
continue
}
Expand Down Expand Up @@ -677,18 +670,18 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
switch cause := errors.Cause(err); cause {
case storage.ErrOutOfBounds:
sampleOutOfBoundsCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErrSampleTimestampTooOld(model.Time(s.TimestampMs), ts.Labels) })
updateFirstPartial(func() error { return newIngestErrSampleTimestampTooOld(model.Time(s.TimestampMs), ts.Labels) })
continue

case storage.ErrOutOfOrderSample:
sampleOutOfOrderCount++
updateFirstPartial(func() error { return wrappedTSDBIngestErrSampleOutOfOrder(model.Time(s.TimestampMs), ts.Labels) })
updateFirstPartial(func() error { return newIngestErrSampleOutOfOrder(model.Time(s.TimestampMs), ts.Labels) })
continue

case storage.ErrDuplicateSampleForTimestamp:
newValueForTimestampCount++
updateFirstPartial(func() error {
return wrappedTSDBIngestErrSampleDuplicateTimestamp(model.Time(s.TimestampMs), ts.Labels)
return newIngestErrSampleDuplicateTimestamp(model.Time(s.TimestampMs), ts.Labels)
})
continue

Expand Down Expand Up @@ -725,8 +718,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques
// already exist. If it does not then drop.
if ref == 0 {
updateFirstPartial(func() error {
return wrappedTSDBIngestErrExemplarMissingSeries(errExemplarMissingSeriesRef,
model.Time(ts.Exemplars[0].TimestampMs), ts.Labels, ts.Exemplars[0].Labels)
return newIngestErrExemplarMissingSeries(model.Time(ts.Exemplars[0].TimestampMs), ts.Labels, ts.Exemplars[0].Labels)
})
failedExemplarsCount += len(ts.Exemplars)
} else { // Note that else is explicit, rather than a continue in the above if, in case of additional logic post exemplar processing.
Expand Down Expand Up @@ -2080,29 +2072,25 @@ func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

func wrappedTSDBIngestErr(errID globalerror.ID, errMsg string, timestamp model.Time, labels []mimirpb.LabelAdapter) error {
return fmt.Errorf(errTSDBIngest, errID.Message(errMsg), timestamp.Time().UTC().Format(time.RFC3339Nano), mimirpb.FromLabelAdaptersToLabels(labels).String())
func newIngestErr(errID globalerror.ID, errMsg string, timestamp model.Time, labels []mimirpb.LabelAdapter) error {
return fmt.Errorf("%v. The affected sample has timestamp %s and is from series %s", errID.Message(errMsg), timestamp.Time().UTC().Format(time.RFC3339Nano), mimirpb.FromLabelAdaptersToLabels(labels).String())
}

func wrappedTSDBIngestErrSampleTimestampTooOld(timestamp model.Time, labels []mimirpb.LabelAdapter) error {
return wrappedTSDBIngestErr(globalerror.SampleTimestampTooOld, "the sample has been rejected because its timestamp is too old", timestamp, labels)
func newIngestErrSampleTimestampTooOld(timestamp model.Time, labels []mimirpb.LabelAdapter) error {
return newIngestErr(globalerror.SampleTimestampTooOld, "the sample has been rejected because its timestamp is too old", timestamp, labels)
}

func wrappedTSDBIngestErrSampleOutOfOrder(timestamp model.Time, labels []mimirpb.LabelAdapter) error {
return wrappedTSDBIngestErr(globalerror.SampleOutOfOrder, "the sample has been rejected because another sample with a more recent timestamp has already been ingested and out of order samples are not allowed", timestamp, labels)
func newIngestErrSampleOutOfOrder(timestamp model.Time, labels []mimirpb.LabelAdapter) error {
return newIngestErr(globalerror.SampleOutOfOrder, "the sample has been rejected because another sample with a more recent timestamp has already been ingested and out of order samples are not allowed", timestamp, labels)
}

func wrappedTSDBIngestErrSampleDuplicateTimestamp(timestamp model.Time, labels []mimirpb.LabelAdapter) error {
return wrappedTSDBIngestErr(globalerror.SampleDuplicateTimestamp, "the sample has been rejected because another sample with the same timestamp, but a different value, has already been ingested", timestamp, labels)
func newIngestErrSampleDuplicateTimestamp(timestamp model.Time, labels []mimirpb.LabelAdapter) error {
return newIngestErr(globalerror.SampleDuplicateTimestamp, "the sample has been rejected because another sample with the same timestamp, but a different value, has already been ingested", timestamp, labels)
}

func wrappedTSDBIngestErrExemplarMissingSeries(ingestErr error, timestamp model.Time, seriesLabels, exemplarLabels []mimirpb.LabelAdapter) error {
if ingestErr == nil {
return nil
}

return fmt.Errorf(errTSDBIngestExemplarMissingSeries,
globalerror.ExemplarSeriesMissing.Message(ingestErr.Error()),
func newIngestErrExemplarMissingSeries(timestamp model.Time, seriesLabels, exemplarLabels []mimirpb.LabelAdapter) error {
return fmt.Errorf("%v. The affected exemplar is %s with timestamp %s for series %s",
globalerror.ExemplarSeriesMissing.Message("the exemplar has been rejected because the related series has not been ingested yet"),
mimirpb.FromLabelAdaptersToLabels(exemplarLabels).String(),
timestamp.Time().UTC().Format(time.RFC3339Nano),
mimirpb.FromLabelAdaptersToLabels(seriesLabels).String(),
Expand All @@ -2114,7 +2102,7 @@ func wrappedTSDBIngestExemplarOtherErr(ingestErr error, timestamp model.Time, se
return nil
}

return fmt.Errorf(errTSDBIngestExemplarOther, ingestErr, timestamp.Time().UTC().Format(time.RFC3339Nano),
return fmt.Errorf("err: %v. timestamp=%s, series=%s, exemplar=%s", ingestErr, timestamp.Time().UTC().Format(time.RFC3339Nano),
mimirpb.FromLabelAdaptersToLabels(seriesLabels).String(),
mimirpb.FromLabelAdaptersToLabels(exemplarLabels).String(),
)
Expand Down
43 changes: 38 additions & 5 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func TestIngester_Push(t *testing.T) {
mimirpb.API,
),
},
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(wrappedTSDBIngestErrSampleOutOfOrder(model.Time(9), mimirpb.FromLabelsToLabelAdapters(metricLabels)), userID).Error()),
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(newIngestErrSampleOutOfOrder(model.Time(9), mimirpb.FromLabelsToLabelAdapters(metricLabels)), userID).Error()),
expectedIngested: model.Matrix{
&model.SampleStream{Metric: metricLabelSet, Values: []model.SamplePair{{Value: 2, Timestamp: 10}}},
},
Expand Down Expand Up @@ -370,7 +370,7 @@ func TestIngester_Push(t *testing.T) {
},
},
},
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(wrappedTSDBIngestErrSampleTimestampTooOld(model.Time(1575043969-(86400*1000)), mimirpb.FromLabelsToLabelAdapters(metricLabels)), userID).Error()),
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(newIngestErrSampleTimestampTooOld(model.Time(1575043969-(86400*1000)), mimirpb.FromLabelsToLabelAdapters(metricLabels)), userID).Error()),
expectedIngested: model.Matrix{
&model.SampleStream{Metric: metricLabelSet, Values: []model.SamplePair{{Value: 2, Timestamp: 1575043969}}},
},
Expand Down Expand Up @@ -425,7 +425,7 @@ func TestIngester_Push(t *testing.T) {
},
},
},
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(wrappedTSDBIngestErrSampleTimestampTooOld(model.Time(1575043969-(86400*1000)), mimirpb.FromLabelsToLabelAdapters(metricLabels)), userID).Error()),
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(newIngestErrSampleTimestampTooOld(model.Time(1575043969-(86400*1000)), mimirpb.FromLabelsToLabelAdapters(metricLabels)), userID).Error()),
expectedIngested: model.Matrix{
&model.SampleStream{Metric: metricLabelSet, Values: []model.SamplePair{{Value: 2, Timestamp: 1575043969}, {Value: 3, Timestamp: 1575043969 + 1}}},
},
Expand Down Expand Up @@ -473,7 +473,7 @@ func TestIngester_Push(t *testing.T) {
mimirpb.API,
),
},
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(wrappedTSDBIngestErrSampleDuplicateTimestamp(model.Time(1575043969), mimirpb.FromLabelsToLabelAdapters(metricLabels)), userID).Error()),
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(newIngestErrSampleDuplicateTimestamp(model.Time(1575043969), mimirpb.FromLabelsToLabelAdapters(metricLabels)), userID).Error()),
expectedIngested: model.Matrix{
&model.SampleStream{Metric: metricLabelSet, Values: []model.SamplePair{{Value: 2, Timestamp: 1575043969}}},
},
Expand Down Expand Up @@ -526,7 +526,7 @@ func TestIngester_Push(t *testing.T) {
},
},
},
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(wrappedTSDBIngestErrExemplarMissingSeries(errExemplarMissingSeriesRef, model.Time(1000), mimirpb.FromLabelsToLabelAdapters(metricLabels), []mimirpb.LabelAdapter{{Name: "traceID", Value: "123"}}), userID).Error()),
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(newIngestErrExemplarMissingSeries(model.Time(1000), mimirpb.FromLabelsToLabelAdapters(metricLabels), []mimirpb.LabelAdapter{{Name: "traceID", Value: "123"}}), userID).Error()),
expectedIngested: nil,
expectedMetadataIngested: nil,
additionalMetrics: []string{
Expand Down Expand Up @@ -5653,3 +5653,36 @@ func TestGetIgnoreSeriesLimitForMetricNamesMap(t *testing.T) {
cfg.IgnoreSeriesLimitForMetricNames = "foo, bar, ,"
require.Equal(t, map[string]struct{}{"foo": {}, "bar": {}}, cfg.getIgnoreSeriesLimitForMetricNamesMap())
}

func TestNewIngestErrMsgs(t *testing.T) {
timestamp := model.Time(1575043969)
metricLabelAdapters := []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}}

tests := map[string]struct {
err error
msg string
}{
"newIngestErrSampleTimestampTooOld": {
err: newIngestErrSampleTimestampTooOld(timestamp, metricLabelAdapters),
msg: `the sample has been rejected because its timestamp is too old (err-mimir-sample-timestamp-too-old). The affected sample has timestamp 1970-01-19T05:30:43.969Z and is from series {__name__="test"}`,
},
"newIngestErrSampleOutOfOrder": {
err: newIngestErrSampleOutOfOrder(timestamp, metricLabelAdapters),
msg: `the sample has been rejected because another sample with a more recent timestamp has already been ingested and out of order samples are not allowed (err-mimir-sample-out-of-order). The affected sample has timestamp 1970-01-19T05:30:43.969Z and is from series {__name__="test"}`,
},
"newIngestErrSampleDuplicateTimestamp": {
err: newIngestErrSampleDuplicateTimestamp(timestamp, metricLabelAdapters),
msg: `the sample has been rejected because another sample with the same timestamp, but a different value, has already been ingested (err-mimir-sample-duplicate-timestamp). The affected sample has timestamp 1970-01-19T05:30:43.969Z and is from series {__name__="test"}`,
},
"newIngestErrExemplarMissingSeries": {
err: newIngestErrExemplarMissingSeries(timestamp, metricLabelAdapters, []mimirpb.LabelAdapter{{Name: "traceID", Value: "123"}}),
msg: `the exemplar has been rejected because the related series has not been ingested yet (err-mimir-exemplar-series-missing). The affected exemplar is {traceID="123"} with timestamp 1970-01-19T05:30:43.969Z for series {__name__="test"}`,
},
}

for testName, tc := range tests {
t.Run(testName, func(t *testing.T) {
assert.Equal(t, tc.err.Error(), tc.msg)
})
}
}
2 changes: 1 addition & 1 deletion pkg/util/validation/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func NewRequestRateLimitedError(limit float64, burst int) LimitError {

func NewIngestionRateLimitedError(limit float64, burst int) LimitError {
return LimitError(globalerror.IngestionRateLimited.MessageWithLimitConfig(
fmt.Sprintf("the request has been rejected because the tenant exceeded the ingestion rate limit, set to %v items/s with a maximum allowed burst of %d. This limit is applied on the total number of samples, exemplars and metadata received across all distributors.", limit, burst),
fmt.Sprintf("the request has been rejected because the tenant exceeded the ingestion rate limit, set to %v items/s with a maximum allowed burst of %d. This limit is applied on the total number of samples, exemplars and metadata received across all distributors", limit, burst),
ingestionRateFlag, ingestionBurstSizeFlag))
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/util/validation/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,13 @@ func TestNewMaxQueryLengthError(t *testing.T) {
err := NewMaxQueryLengthError(time.Hour, time.Minute)
assert.Equal(t, "the query time range exceeds the limit (query length: 1h0m0s, limit: 1m0s) (err-mimir-max-query-length). You can adjust the related per-tenant limit by configuring -store.max-query-length, or by contacting your service administrator.", err.Error())
}

func TestNewRequestRateLimitedError(t *testing.T) {
err := NewRequestRateLimitedError(10, 5)
assert.Equal(t, "the request has been rejected because the tenant exceeded the request rate limit, set to 10 requests/s across all distributors with a maximum allowed burst of 5 (err-mimir-tenant-max-request-rate). You can adjust the related per-tenant limits by configuring -distributor.request-rate-limit and -distributor.request-burst-size, or by contacting your service administrator.", err.Error())
}

func TestNewIngestionRateLimitedError(t *testing.T) {
err := NewIngestionRateLimitedError(10, 5)
assert.Equal(t, "the request has been rejected because the tenant exceeded the ingestion rate limit, set to 10 items/s with a maximum allowed burst of 5. This limit is applied on the total number of samples, exemplars and metadata received across all distributors (err-mimir-tenant-max-ingestion-rate). You can adjust the related per-tenant limits by configuring -distributor.ingestion-rate-limit and -distributor.ingestion-burst-size, or by contacting your service administrator.", err.Error())
}

0 comments on commit b8f6b57

Please sign in to comment.