Skip to content

Commit

Permalink
Update test to check runtime change of OutOfOrderTimeWindow
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
  • Loading branch information
codesome committed Jun 24, 2022
1 parent a4517e7 commit 8bb0d88
Showing 1 changed file with 106 additions and 103 deletions.
209 changes: 106 additions & 103 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5757,124 +5757,127 @@ func TestGetIgnoreSeriesLimitForMetricNamesMap(t *testing.T) {
require.Equal(t, map[string]struct{}{"foo": {}, "bar": {}}, cfg.getIgnoreSeriesLimitForMetricNamesMap())
}

func Test_Ingester_QueryOutOfOrder(t *testing.T) {
tests := map[string]struct {
queryFrom int64
queryTo int64
oooFirstSample int64
oooLastSample int64
expPushError bool
}{
"should return in order and out of order data": {
queryFrom: math.MinInt64,
queryTo: math.MaxInt64,
oooFirstSample: 70 * time.Minute.Milliseconds(),
oooLastSample: 99 * time.Minute.Milliseconds(),
expPushError: false,
},
"if ooo samples go back past allowance writing should return an error": {
queryFrom: math.MinInt64,
queryTo: math.MaxInt64,
oooFirstSample: 69 * time.Minute.Milliseconds(), // Allowance is 30 min, first sample is at minute 100 so first ooo sample is too old
oooLastSample: 99 * time.Minute.Milliseconds(),
expPushError: true,
},
}

// Configure ingester with OOO Allowance
// Test_Ingester_OutOfOrder tests basic ingestion and query of out of order samples.
// It also tests if the OutOfOrderTimeWindow gets changed during runtime.
// The correctness of changed runtime is already tested in Prometheus, so we only check if the
// change is being applied here.
func Test_Ingester_OutOfOrder(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMin = 4
cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax = 32
cfg.TSDBConfigUpdatePeriod = 1 * time.Second
l := defaultLimitsTestConfig()
i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, l, "", nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Run tests
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Create ingester
l := defaultLimitsTestConfig()
l.OutOfOrderTimeWindow = model.Duration(30 * time.Minute)
i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, l, "", nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
// Wait until it's healthy
test.Poll(t, 1*time.Second, 1, func() interface{} {
return i.lifecycler.HealthyInstancesCount()
})

// Wait until it's healthy
test.Poll(t, 1*time.Second, 1, func() interface{} {
return i.lifecycler.HealthyInstancesCount()
})
ctx := user.InjectOrgID(context.Background(), "test")

ctx := user.InjectOrgID(context.Background(), "test")
pushSamples := func(start, end int64, expErr bool) {
start = start * time.Minute.Milliseconds()
end = end * time.Minute.Milliseconds()

// Push first in-order sample at minute 100
firstInOrderSample := 100 * time.Minute.Milliseconds()
serie := series{
lbls: labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}},
value: float64(firstInOrderSample),
timestamp: firstInOrderSample,
}
expSamples := []model.SamplePair{
{
Timestamp: model.Time(firstInOrderSample),
Value: model.SampleValue(firstInOrderSample),
},
}
s := labels.FromStrings(labels.MetricName, "test_1", "status", "200")
var samples []mimirpb.Sample
var lbls []labels.Labels
for ts := start; ts <= end; ts += time.Minute.Milliseconds() {
samples = append(samples, mimirpb.Sample{
TimestampMs: ts,
Value: float64(ts),
})
lbls = append(lbls, s)
}

wReq, _, _, _ := mockWriteRequest(t, serie.lbls, serie.value, serie.timestamp)
_, err = i.Push(ctx, wReq)
wReq := mimirpb.ToWriteRequest(lbls, samples, nil, nil, mimirpb.API)
_, err = i.Push(ctx, wReq)
if expErr {
require.Error(t, err, "should have failed on push")
require.ErrorAs(t, err, &storage.ErrTooOldSample)
} else {
require.NoError(t, err)
}
}

// Push ooo samples within the allowance time in a single request
var samples []mimirpb.Sample
var lbls []labels.Labels
for j := testData.oooFirstSample / time.Minute.Milliseconds(); j < testData.oooLastSample/time.Minute.Milliseconds(); j++ {
min := int64(j) * time.Minute.Milliseconds()
samples = append(samples, mimirpb.Sample{
TimestampMs: min,
Value: float64(min),
})
lbls = append(lbls, serie.lbls)
expSamples = append(expSamples, model.SamplePair{
Timestamp: model.Time(min),
Value: model.SampleValue(min),
})
}
wReq = mimirpb.ToWriteRequest(lbls, samples, nil, nil, mimirpb.API)
_, err = i.Push(ctx, wReq)
if testData.expPushError {
require.Error(t, err, "should have failed on push")
require.ErrorAs(t, err, &storage.ErrTooOldSample)
return
}
require.NoError(t, err)
verifySamples := func(start, end int64) {
start = start * time.Minute.Milliseconds()
end = end * time.Minute.Milliseconds()

// Sort samples by timestamp for later comparison
sort.Slice(expSamples, func(i, j int) bool {
return expSamples[i].Timestamp < expSamples[j].Timestamp
var expSamples []model.SamplePair
for ts := start; ts <= end; ts += time.Minute.Milliseconds() {
expSamples = append(expSamples, model.SamplePair{
Timestamp: model.Time(ts),
Value: model.SampleValue(ts),
})
}
expMatrix := model.Matrix{{
Metric: model.Metric{"__name__": "test_1", "status": "200"},
Values: expSamples,
}}

expMatrix := model.Matrix{
{
Metric: model.Metric{"__name__": "test_1", "status": "200"},
Values: expSamples,
},
}
req := &client.QueryRequest{
StartTimestampMs: math.MinInt64,
EndTimestampMs: math.MaxInt64,
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_1"},
},
}

req := &client.QueryRequest{
StartTimestampMs: testData.queryFrom,
EndTimestampMs: testData.queryTo,
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_1"},
},
}
s := stream{ctx: ctx}
err = i.QueryStream(req, &s)
require.NoError(t, err)

s := stream{ctx: ctx}
err = i.QueryStream(req, &s)
require.NoError(t, err)
res, err := chunkcompat.StreamsToMatrix(model.Earliest, model.Latest, s.responses)
require.NoError(t, err)
assert.ElementsMatch(t, expMatrix, res)
}

res, err := chunkcompat.StreamsToMatrix(model.Earliest, model.Latest, s.responses)
require.NoError(t, err)
assert.ElementsMatch(t, expMatrix, res)
})
updateConfig := func(oooTW model.Duration) {
limits := defaultLimitsTestConfig()
tenantOverride := new(TenantLimitsMock)
tenantOverride.On("ByUserID", "test").Return(&validation.Limits{OutOfOrderTimeWindow: oooTW})
override, err := validation.NewOverrides(limits, tenantOverride)
require.NoError(t, err)
i.limits = override

// TSDB config is updated every second.
<-time.After(1500 * time.Millisecond)
}

// Push first in-order sample at minute 100.
pushSamples(100, 100, false)
verifySamples(100, 100)

// OOO is not enabled. So it errors out. No sample ingested.
pushSamples(90, 99, true)
verifySamples(100, 100)

// Increasing the OOO time window.
updateConfig(model.Duration(30 * time.Minute))
// Now it works.
pushSamples(90, 99, false)
verifySamples(90, 100)

// Gives an error for sample 69 since it's outside time window, but rest is ingested.
pushSamples(69, 99, true)
verifySamples(70, 100)

// All beyond the ooo time window. None ingested.
pushSamples(50, 69, true)
verifySamples(70, 100)

// Increase the time window again. It works.
updateConfig(model.Duration(60 * time.Minute))
pushSamples(50, 69, false)
verifySamples(50, 100)

// Decrease the time window again. Same push should fail.
updateConfig(model.Duration(30 * time.Minute))
pushSamples(50, 69, true)
verifySamples(50, 100)
}

func TestNewIngestErrMsgs(t *testing.T) {
Expand Down

0 comments on commit 8bb0d88

Please sign in to comment.