Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store gateway spends most of the time waiting to load series and chun… #3927

Merged
merged 4 commits into from Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Expand Up @@ -37,7 +37,7 @@
* [BUGFIX] Store-gateway: fix `cortex_bucket_store_partitioner_requested_bytes_total` metric to not double count overlapping ranges. #3769
* [BUGFIX] Update `github.com/thanos-io/objstore` to address issue with Multipart PUT on s3-compatible Object Storage. #3802 #3821
* [BUGFIX] Distributor, Query-scheduler: Make sure ring metrics include a `cortex_` prefix as expected by dashboards. #3809
* [BUGFIX] Querier: canceled requests are no longer reported as "consistency check" failures. #3837
* [BUGFIX] Querier: canceled requests are no longer reported as "consistency check" failures. #3837 #3927
* [BUGFIX] Distributor: don't panic when `metric_relabel_configs` in overrides contains null element. #3868

### Mixin
Expand Down
4 changes: 4 additions & 0 deletions pkg/querier/blocks_store_queryable.go
Expand Up @@ -743,6 +743,10 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
break
}
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return err
}

level.Warn(spanLog).Log("msg", "failed to receive series", "remote", c.RemoteAddress(), "err", err)
return nil
}
Expand Down
115 changes: 74 additions & 41 deletions pkg/querier/blocks_store_queryable_test.go
Expand Up @@ -893,50 +893,64 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
})
}

t.Run("canceled request", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
canceledRequestTests := map[string]struct {
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
produceSeries bool
}{
"canceled request on series stream": {
produceSeries: false,
},
"canceled request on receiving series stream": {
produceSeries: true,
},
}

ctx = limiter.AddQueryLimiterToContext(ctx, noOpQueryLimiter)
reg := prometheus.NewPedanticRegistry()
for testName, testData := range canceledRequestTests {
t.Run(testName, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

storeGateway := &cancelerStoreGatewayClientMock{
remoteAddr: "1.1.1.1",
cancel: cancel,
}
ctx = limiter.AddQueryLimiterToContext(ctx, noOpQueryLimiter)
reg := prometheus.NewPedanticRegistry()

stores := &blocksStoreSetMock{mockedResponses: []interface{}{
map[BlocksStoreClient][]ulid.ULID{storeGateway: {block1}},
errors.New("no store-gateway remaining after exclude"),
}}

finder := &blocksFinderMock{}
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
{ID: block1},
}, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil)

q := &blocksStoreQuerier{
ctx: ctx,
minT: minT,
maxT: maxT,
userID: "user-1",
finder: finder,
stores: stores,
consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil),
logger: log.NewNopLogger(),
metrics: newBlocksStoreQueryableMetrics(reg),
limits: &blocksStoreLimitsMock{},
}
storeGateway := &cancelerStoreGatewayClientMock{
remoteAddr: "1.1.1.1",
produceSeries: testData.produceSeries,
cancel: cancel,
}

matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName),
}
stores := &blocksStoreSetMock{mockedResponses: []interface{}{
map[BlocksStoreClient][]ulid.ULID{storeGateway: {block1}},
errors.New("no store-gateway remaining after exclude"),
}}

sp := &storage.SelectHints{Start: minT, End: maxT}
set := q.Select(true, sp, matchers...)
require.Error(t, set.Err())
require.ErrorIs(t, set.Err(), context.Canceled)
})
finder := &blocksFinderMock{}
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
{ID: block1},
}, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil)

q := &blocksStoreQuerier{
ctx: ctx,
minT: minT,
maxT: maxT,
userID: "user-1",
finder: finder,
stores: stores,
consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil),
logger: log.NewNopLogger(),
metrics: newBlocksStoreQueryableMetrics(reg),
limits: &blocksStoreLimitsMock{},
}

matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName),
}

sp := &storage.SelectHints{Start: minT, End: maxT}
set := q.Select(true, sp, matchers...)
require.Error(t, set.Err())
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
require.ErrorIs(t, set.Err(), context.Canceled)
})
}
}

func TestBlocksStoreQuerier_Labels(t *testing.T) {
Expand Down Expand Up @@ -1978,12 +1992,31 @@ func (m *storeGatewaySeriesClientMock) Recv() (*storepb.SeriesResponse, error) {
return res, nil
}

type cancelerStoreGatewaySeriesClientMock struct {
storeGatewaySeriesClientMock
ctx context.Context
cancel func()
}

func (m *cancelerStoreGatewaySeriesClientMock) Recv() (*storepb.SeriesResponse, error) {
m.cancel()
return nil, m.ctx.Err()
}

type cancelerStoreGatewayClientMock struct {
remoteAddr string
cancel func()
remoteAddr string
produceSeries bool
cancel func()
}

func (m *cancelerStoreGatewayClientMock) Series(ctx context.Context, in *storepb.SeriesRequest, opts ...grpc.CallOption) (storegatewaypb.StoreGateway_SeriesClient, error) {
if m.produceSeries {
series := &cancelerStoreGatewaySeriesClientMock{
ctx: ctx,
cancel: m.cancel,
}
return series, nil
}
m.cancel()
return nil, ctx.Err()
}
Expand Down