Skip to content

Commit

Permalink
storegateway: start snapshots for discovered blocks
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>
  • Loading branch information
narqo committed Jun 17, 2024
1 parent c3d9666 commit 0dc3231
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 23 deletions.
42 changes: 25 additions & 17 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,18 @@ type BucketStoreStats struct {
// This makes them smaller, but takes extra CPU and memory.
// When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller.
type BucketStore struct {
userID string
logger log.Logger
metrics *BucketStoreMetrics
bkt objstore.InstrumentedBucketReader
fetcher block.MetadataFetcher
dir string
indexCache indexcache.IndexCache
indexReaderPool *indexheader.ReaderPool
indexHeadersSnapshotter *indexheader.Snapshotter
seriesHashCache *hashcache.SeriesHashCache
userID string
logger log.Logger
metrics *BucketStoreMetrics
bkt objstore.InstrumentedBucketReader
fetcher block.MetadataFetcher
dir string
indexCache indexcache.IndexCache
indexReaderPool *indexheader.ReaderPool
seriesHashCache *hashcache.SeriesHashCache

snapshotter *indexheader.Snapshotter
snapshotterStartOnce sync.Once

// Sets of blocks that have the same labels. They are indexed by a hash over their label set.
blocksMx sync.RWMutex
Expand Down Expand Up @@ -243,11 +245,11 @@ func NewBucketStore(
Path: dir,
UserID: userID,
}
s.indexHeadersSnapshotter = indexheader.NewSnapshotter(s.logger, snapConfig)
s.snapshotter = indexheader.NewSnapshotter(s.logger, snapConfig)

var lazyLoadedBlocks map[ulid.ULID]int64
if bucketStoreConfig.IndexHeader.EagerLoadingStartupEnabled {
lazyLoadedBlocks = s.indexHeadersSnapshotter.RestoreLoadedBlocks()
lazyLoadedBlocks = s.snapshotter.RestoreLoadedBlocks()
}
s.indexReaderPool = indexheader.NewReaderPool(s.logger, bucketStoreConfig.IndexHeader, s.lazyLoadingGate, metrics.indexHeaderReaderMetrics, lazyLoadedBlocks)

Expand All @@ -263,7 +265,7 @@ func (s *BucketStore) RemoveBlocksAndClose() error {
err := s.removeAllBlocks()

// Release other resources even if it failed to close some blocks.
s.indexHeadersSnapshotter.Stop()
s.snapshotter.Stop()
s.indexReaderPool.Close()

return err
Expand Down Expand Up @@ -358,6 +360,16 @@ func (s *BucketStore) syncBlocks(ctx context.Context, initialSync bool) error {
level.Info(s.logger).Log("msg", "dropped outdated block", "block", id)
}

// Start snapshotter in the end of the sync, but do that only once per BucketStore's life time.
// We do that here so the snapshotter watched after blocks from both initial sync and those discovered later.
var err error
s.snapshotterStartOnce.Do(func() {
err = s.snapshotter.Start(ctx, s.indexReaderPool)
})
if err != nil {
return errors.Wrap(err, "start index headers snapshotter")
}

return nil
}

Expand All @@ -368,10 +380,6 @@ func (s *BucketStore) InitialSync(ctx context.Context) error {
return errors.Wrap(err, "sync block")
}

if err := s.indexHeadersSnapshotter.Start(ctx, s.indexReaderPool); err != nil {
return errors.Wrap(err, "start index headers snapshotter")
}

fis, err := os.ReadDir(s.dir)
if err != nil {
return errors.Wrap(err, "read dir")
Expand Down
11 changes: 9 additions & 2 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1504,9 +1504,12 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries
NewBucketStoreMetrics(reg),
testData.options...,
)
assert.NoError(t, err)
require.NoError(t, err)

t.Run(testName, func(t test.TB) {
t.Cleanup(func() {
st.RemoveBlocksAndClose()
})
runTestWithStore(t, st, reg)
})
}
Expand Down Expand Up @@ -1635,6 +1638,10 @@ func TestBucketStore_Series_Concurrency(t *testing.T) {
require.NoError(t, err)
require.NoError(t, store.SyncBlocks(ctx))

t.Cleanup(func() {
store.RemoveBlocksAndClose()
})

// Run workers.
wg := sync.WaitGroup{}
wg.Add(numWorkers)
Expand Down Expand Up @@ -2013,7 +2020,7 @@ func TestBucketStore_Series_CanceledRequest(t *testing.T) {
WithLogger(logger),
WithQueryGate(gate.NewBlocking(0)),
)
assert.NoError(t, err)
require.NoError(t, err)
defer func() { assert.NoError(t, store.RemoveBlocksAndClose()) }()

req := &storepb.SeriesRequest{
Expand Down
8 changes: 4 additions & 4 deletions pkg/storegateway/indexheader/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ type Snapshotter struct {
logger log.Logger
conf SnapshotterConfig

done chan struct{}
stop chan struct{}
}

func NewSnapshotter(logger log.Logger, conf SnapshotterConfig) *Snapshotter {
return &Snapshotter{
logger: logger,
conf: conf,
done: make(chan struct{}),
stop: make(chan struct{}),
}
}

Expand All @@ -60,7 +60,7 @@ func (s *Snapshotter) Start(ctx context.Context, bl blocksLoader) error {
select {
case <-ctx.Done():
return
case <-s.done:
case <-s.stop:
return
case <-tick.C:
if err := s.PersistLoadedBlocks(bl); err != nil {
Expand All @@ -74,7 +74,7 @@ func (s *Snapshotter) Start(ctx context.Context, bl blocksLoader) error {
}

func (s *Snapshotter) Stop() {
close(s.done)
close(s.stop)
}

func (s *Snapshotter) PersistLoadedBlocks(bl blocksLoader) error {
Expand Down

0 comments on commit 0dc3231

Please sign in to comment.