Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: remove unused code from BucketStore #1816

Merged
merged 6 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 0 additions & 98 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/thanos-io/thanos/pkg/store/hintspb"
Expand Down Expand Up @@ -78,28 +76,11 @@ const (
chunkBytesPoolMinSize = 64 * 1024 // 64 KiB
chunkBytesPoolMaxSize = 64 * 1024 * 1024 // 64 MiB

// CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility
// with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels.
// Now with newer Store Gateway advertising all the external labels it has access to, there was simple case where
// Querier was blocking Store Gateway as duplicate with sidecar.
//
// Newer Queriers are not strict, no duplicated external labels check is there anymore.
// Additionally newer Queriers removes/ignore this exact labels from UI and querying.
//
// This label name is intentionally against Prometheus label style.
// TODO(bwplotka): Remove it at some point.
CompatibilityTypeLabelName = "@thanos_compatibility_store_type"

// Labels for metrics.
labelEncode = "encode"
labelDecode = "decode"
)

// FilterConfig is a configuration, which Store uses for filtering metrics based on time.
type FilterConfig struct {
MinTime, MaxTime model.TimeOrDurationValue
}

type BucketStoreStats struct {
// BlocksLoaded is the number of blocks currently loaded in the bucket store.
BlocksLoaded int
Expand Down Expand Up @@ -143,10 +124,6 @@ type BucketStore struct {
seriesLimiterFactory SeriesLimiterFactory
partitioner Partitioner

filterConfig *FilterConfig
advLabelSets []labelpb.ZLabelSet
enableCompatibilityLabel bool

// Threadpool for performing operations that block the OS thread (mmap page faults)
threadPool *mimir_indexheader.Threadpool

Expand Down Expand Up @@ -225,13 +202,6 @@ func WithChunkPool(chunkPool pool.Bytes) BucketStoreOption {
}
}

// WithFilterConfig sets a filter which Store uses for filtering metrics based on time.
func WithFilterConfig(filter *FilterConfig) BucketStoreOption {
return func(s *BucketStore) {
s.filterConfig = filter
}
}

// WithDebugLogging enables debug logging.
func WithDebugLogging() BucketStoreOption {
return func(s *BucketStore) {
Expand All @@ -251,7 +221,6 @@ func NewBucketStore(
partitioner Partitioner,
threadPool *mimir_indexheader.Threadpool,
blockSyncConcurrency int,
enableCompatibilityLabel bool,
postingOffsetsInMemSampling int,
enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility.
lazyIndexReaderEnabled bool,
Expand All @@ -275,7 +244,6 @@ func NewBucketStore(
seriesLimiterFactory: seriesLimiterFactory,
partitioner: partitioner,
threadPool: threadPool,
enableCompatibilityLabel: enableCompatibilityLabel,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
seriesHashCache: seriesHashCache,
Expand Down Expand Up @@ -375,18 +343,6 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
s.metrics.blockDrops.Inc()
}

// Sync advertise labels.
var storeLabels labels.Labels
s.mtx.Lock()
s.advLabelSets = make([]labelpb.ZLabelSet, 0, len(s.advLabelSets))
for _, bs := range s.blockSets {
storeLabels = storeLabels[:0]
s.advLabelSets = append(s.advLabelSets, labelpb.ZLabelSet{Labels: labelpb.ZLabelsFromPromLabels(append(storeLabels, bs.labels...))})
}
sort.Slice(s.advLabelSets, func(i, j int) bool {
return s.advLabelSets[i].String() < s.advLabelSets[j].String()
})
s.mtx.Unlock()
return nil
}

Expand Down Expand Up @@ -550,61 +506,9 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) {
}
}

mint = s.limitMinTime(mint)
maxt = s.limitMaxTime(maxt)

return mint, maxt
}

// Info implements the storepb.StoreServer interface.
func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) {
mint, maxt := s.TimeRange()
res := &storepb.InfoResponse{
StoreType: component.Store.ToProto(),
MinTime: mint,
MaxTime: maxt,
}

s.mtx.RLock()
res.LabelSets = s.advLabelSets
s.mtx.RUnlock()

if s.enableCompatibilityLabel && len(res.LabelSets) > 0 {
// This is for compatibility with Querier v0.7.0.
// See query.StoreCompatibilityTypeLabelName comment for details.
res.LabelSets = append(res.LabelSets, labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: CompatibilityTypeLabelName, Value: "store"}}})
}
return res, nil
}

func (s *BucketStore) limitMinTime(mint int64) int64 {
if s.filterConfig == nil {
return mint
}

filterMinTime := s.filterConfig.MinTime.PrometheusTimestamp()

if mint < filterMinTime {
return filterMinTime
}

return mint
}

func (s *BucketStore) limitMaxTime(maxt int64) int64 {
if s.filterConfig == nil {
return maxt
}

filterMaxTime := s.filterConfig.MaxTime.PrometheusTimestamp()

if maxt > filterMaxTime {
maxt = filterMaxTime
}

return maxt
}

type seriesEntry struct {
lset labels.Labels
refs []chunks.ChunkRef
Expand Down Expand Up @@ -1000,8 +904,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
req.MinTime = s.limitMinTime(req.MinTime)
req.MaxTime = s.limitMaxTime(req.MaxTime)

// Check if matchers include the query shard selector.
shardSelector, matchers, err := sharding.RemoveShardFromMatchers(matchers)
Expand Down
89 changes: 13 additions & 76 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/gogo/status"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/tsdb/hashcache"
"github.com/stretchr/testify/assert"
Expand All @@ -38,14 +37,10 @@ import (
)

var (
minTime = time.Unix(0, 0)
maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z")
minTimeDuration = model.TimeOrDurationValue{Time: &minTime}
maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime}
allowAllFilterConf = &FilterConfig{
MinTime: minTimeDuration,
MaxTime: maxTimeDuration,
}
minTime = time.Unix(0, 0)
maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z")
minTimeDuration = model.TimeOrDurationValue{Time: &minTime}
maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime}
)

type swappableCache struct {
Expand Down Expand Up @@ -136,7 +131,7 @@ func newCustomSeriesLimiterFactory(limit uint64, code codes.Code) SeriesLimiterF
}
}

func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory) *storeSuite {
series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
Expand All @@ -147,10 +142,10 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
labels.FromStrings("a", "2", "c", "1"),
labels.FromStrings("a", "2", "c", "2"),
}
return prepareStoreWithTestBlocksForSeries(t, dir, bkt, manyParts, chunksLimiterFactory, seriesLimiterFactory, relabelConfig, filterConf, series)
return prepareStoreWithTestBlocksForSeries(t, dir, bkt, manyParts, chunksLimiterFactory, seriesLimiterFactory, series)
}

func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig, series []labels.Labels) *storeSuite {
func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, series []labels.Labels) *storeSuite {
extLset := labels.FromStrings("ext1", "value1")

minTime, maxTime := prepareTestBlocks(t, time.Now(), 3, dir, bkt, series, extLset)
Expand All @@ -162,10 +157,7 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.
maxTime: maxTime,
}

metaFetcher, err := block.NewMetaFetcher(s.logger, 20, objstore.WithNoopInstr(bkt), dir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
})
metaFetcher, err := block.NewMetaFetcher(s.logger, 20, objstore.WithNoopInstr(bkt), dir, nil, []block.MetadataFilter{})
assert.NoError(t, err)

store, err := NewBucketStore(
Expand All @@ -178,7 +170,6 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
nil,
20,
true,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
true,
true,
Expand All @@ -187,7 +178,6 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.
NewBucketStoreMetrics(nil),
WithLogger(s.logger),
WithIndexCache(s.cache),
WithFilterConfig(filterConf),
)
assert.NoError(t, err)
t.Cleanup(func() {
Expand Down Expand Up @@ -426,7 +416,7 @@ func TestBucketStore_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0))

if ok := t.Run("no index cache", func(t *testing.T) {
s.cache.SwapWith(noopCache{})
Expand Down Expand Up @@ -479,7 +469,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, true, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0))

indexCache, err := indexcache.NewInMemoryIndexCacheWithConfig(s.logger, nil, indexcache.InMemoryIndexCacheConfig{
MaxItemSize: 1e5,
Expand All @@ -492,59 +482,6 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {
})
}

func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bkt := objstore.NewInMemBucket()

dir := t.TempDir()

hourAfter := time.Now().Add(1 * time.Hour)
filterMaxTime := model.TimeOrDurationValue{Time: &hourAfter}

// The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks.
expectedChunks := uint64(2 * 2)

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(expectedChunks), NewSeriesLimiterFactory(0), emptyRelabelConfig, &FilterConfig{
MinTime: minTimeDuration,
MaxTime: filterMaxTime,
})
assert.NoError(t, s.store.SyncBlocks(ctx))

mint, maxt := s.store.TimeRange()
assert.Equal(t, s.minTime, mint)
assert.Equal(t, filterMaxTime.PrometheusTimestamp(), maxt)

req := &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
},
MinTime: mint,
MaxTime: timestamp.FromTime(time.Now().AddDate(0, 0, 1)),
}

expectedLabels := [][]labelpb.ZLabel{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}},
}

s.cache.SwapWith(noopCache{})
srv := newBucketStoreSeriesServer(ctx)

assert.NoError(t, s.store.Series(req, srv))
assert.Equal(t, len(expectedLabels), len(srv.SeriesSet))

for i, s := range srv.SeriesSet {
assert.Equal(t, expectedLabels[i], s.Labels)

// prepareTestBlocks makes 3 chunks containing 2 hour data,
// we should only get 1, as we are filtering by time.
assert.Equal(t, 1, len(s.Chunks))
}
}

func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
// The query will fetch 2 series from 6 blocks, so we do expect to hit a total of 12 chunks.
expectedChunks := uint64(2 * 6)
Expand Down Expand Up @@ -584,7 +521,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, newCustomChunksLimiterFactory(testData.maxChunksLimit, testData.code), newCustomSeriesLimiterFactory(testData.maxSeriesLimit, testData.code))
assert.NoError(t, s.store.SyncBlocks(ctx))

req := &storepb.SeriesRequest{
Expand Down Expand Up @@ -619,7 +556,7 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0))
s.cache.SwapWith(noopCache{})

mint, maxt := s.store.TimeRange()
Expand Down Expand Up @@ -719,7 +656,7 @@ func TestBucketStore_LabelValues_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0))
s.cache.SwapWith(noopCache{})

mint, maxt := s.store.TimeRange()
Expand Down
1 change: 0 additions & 1 deletion pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,6 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) {
u.partitioner,
u.threadPool,
u.cfg.BucketStore.BlockSyncConcurrency,
false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers
u.cfg.BucketStore.PostingOffsetsInMemSampling,
true, // Enable series hints.
u.cfg.BucketStore.IndexHeaderLazyLoadingEnabled,
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func BenchmarkBucketStoreLabelValues(tb *testing.B) {
series := generateSeries(card)
tb.Logf("Total %d series generated", len(series))

s := prepareStoreWithTestBlocksForSeries(tb, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf, series)
s := prepareStoreWithTestBlocksForSeries(tb, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), series)
mint, maxt := s.store.TimeRange()
assert.Equal(tb, s.minTime, mint)
assert.Equal(tb, s.maxTime, maxt)
Expand Down
Loading