From cbe2d95570709a284806fff9632cebb1db838ee0 Mon Sep 17 00:00:00 2001 From: Vladimir Varankin Date: Wed, 5 Jun 2024 14:46:04 +0200 Subject: [PATCH] indexheader: keep pre-shutdown snapshot between restarts Signed-off-by: Vladimir Varankin --- pkg/storegateway/indexheader/reader_pool.go | 30 +++++- .../indexheader/reader_pool_test.go | 95 +++++++++++++++++-- 2 files changed, 110 insertions(+), 15 deletions(-) diff --git a/pkg/storegateway/indexheader/reader_pool.go b/pkg/storegateway/indexheader/reader_pool.go index 2a9dfbcdec..83ee1c3ae4 100644 --- a/pkg/storegateway/indexheader/reader_pool.go +++ b/pkg/storegateway/indexheader/reader_pool.go @@ -128,9 +128,13 @@ func NewReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate return case <-tickerIdleReader.C: p.closeIdleReaders() - case <-lazyLoadC: + case t := <-lazyLoadC: + // minUsedAt is the threshold for how recently used should the block be to stay in the snapshot; + // we add an extra couple of minutes to make sure the pool closes the idle readers. + dur := p.lazyReaderIdleTimeout + (10 * time.Minute) + minUsedAt := t.Truncate(time.Minute).Add(-dur).UnixMilli() snapshot := lazyLoadedHeadersSnapshot{ - IndexHeaderLastUsedTime: p.LoadedBlocks(), + IndexHeaderLastUsedTime: p.LoadedBlocks(minUsedAt), UserID: lazyLoadedSnapshotConfig.UserID, } @@ -217,7 +221,7 @@ func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt return nil, lazyErr } - // we only try to eager load only during initialSync + // we only try to eager load during initialSync if initialSync && p.preShutdownLoadedBlocks != nil { // we only eager load if we have preShutdownLoadedBlocks for the given block id if p.preShutdownLoadedBlocks.IndexHeaderLastUsedTime[id] > 0 { @@ -292,14 +296,30 @@ func (p *ReaderPool) onLazyReaderClosed(r *LazyBinaryReader) { } // LoadedBlocks returns a new map of lazy-loaded block IDs and the last time they were used in milliseconds. -func (p *ReaderPool) LoadedBlocks() map[ulid.ULID]int64 { +// It skips blocks, which weren't in use after minUsedAt. +func (p *ReaderPool) LoadedBlocks(minUsedAt int64) map[ulid.ULID]int64 { p.lazyReadersMx.Lock() defer p.lazyReadersMx.Unlock() blocks := make(map[ulid.ULID]int64, len(p.lazyReaders)) for r := range p.lazyReaders { if r.reader != nil { - blocks[r.blockID] = r.usedAt.Load() / int64(time.Millisecond) + usedAt := r.usedAt.Load() / int64(time.Millisecond) + if usedAt > minUsedAt { + blocks[r.blockID] = usedAt + } + } + } + + // Add blocks from the pre-shutdown snapshot if those are still "fresh". + if p.lazyReaderEnabled && p.preShutdownLoadedBlocks != nil { + for id, usedAt := range p.preShutdownLoadedBlocks.IndexHeaderLastUsedTime { + if _, ok := blocks[id]; ok { + continue + } + if usedAt > minUsedAt { + blocks[id] = usedAt + } } } diff --git a/pkg/storegateway/indexheader/reader_pool_test.go b/pkg/storegateway/indexheader/reader_pool_test.go index 8058f328f8..33378097a5 100644 --- a/pkg/storegateway/indexheader/reader_pool_test.go +++ b/pkg/storegateway/indexheader/reader_pool_test.go @@ -8,6 +8,7 @@ package indexheader import ( "context" "crypto/rand" + "encoding/json" "fmt" "os" "path/filepath" @@ -198,20 +199,31 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { func TestReaderPool_LoadedBlocks(t *testing.T) { usedAt := time.Now() - id, err := ulid.New(ulid.Now(), rand.Reader) - require.NoError(t, err) + minUsedAt := usedAt.Add(-time.Minute) - lb := LazyBinaryReader{ - blockID: id, + id1 := ulid.MustNew(ulid.Now(), rand.Reader) + lb1 := LazyBinaryReader{ + blockID: id1, + usedAt: atomic.NewInt64(usedAt.Add(-5 * time.Minute).UnixNano()), // idle block + // we just set to make reader != nil + reader: &StreamBinaryReader{}, + } + + id2 := ulid.MustNew(ulid.Now(), rand.Reader) + lb2 := LazyBinaryReader{ + blockID: id2, usedAt: atomic.NewInt64(usedAt.UnixNano()), // we just set to make reader != nil reader: &StreamBinaryReader{}, } rp := ReaderPool{ lazyReaderEnabled: true, - lazyReaders: map[*LazyBinaryReader]struct{}{&lb: {}}, + lazyReaders: map[*LazyBinaryReader]struct{}{ + &lb1: {}, + &lb2: {}, + }, } - require.Equal(t, map[ulid.ULID]int64{id: usedAt.UnixMilli()}, rp.LoadedBlocks()) + require.Equal(t, map[ulid.ULID]int64{id2: usedAt.UnixMilli()}, rp.LoadedBlocks(minUsedAt.UnixMilli())) } func TestReaderPool_PersistLazyLoadedBlock(t *testing.T) { @@ -238,8 +250,10 @@ func TestReaderPool_PersistLazyLoadedBlock(t *testing.T) { require.Equal(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) require.Equal(t, float64(0), promtestutil.ToFloat64(metrics.lazyReader.unloadCount)) + minUsedAt := time.Now().Add(-time.Minute) + snapshot := lazyLoadedHeadersSnapshot{ - IndexHeaderLastUsedTime: pool.LoadedBlocks(), + IndexHeaderLastUsedTime: pool.LoadedBlocks(minUsedAt.UnixMilli()), UserID: "anonymous", } @@ -262,9 +276,8 @@ func TestReaderPool_PersistLazyLoadedBlock(t *testing.T) { time.Sleep(idleTimeout * 2) pool.closeIdleReaders() - // LoadedBlocks will update the IndexHeaderLastUsedTime map with the removal of - // idle blocks. - snapshot.IndexHeaderLastUsedTime = pool.LoadedBlocks() + // LoadedBlocks will update the IndexHeaderLastUsedTime map with the removal of idle blocks. + snapshot.IndexHeaderLastUsedTime = pool.LoadedBlocks(minUsedAt.UnixMilli()) err = snapshot.persist(tmpDir) require.NoError(t, err) @@ -274,6 +287,68 @@ func TestReaderPool_PersistLazyLoadedBlock(t *testing.T) { require.JSONEq(t, `{"index_header_last_used_time":{},"user_id":"anonymous"}`, string(persistedData), "index_header_last_used_time should be cleared") } +func TestReaderPool_PersistLazyLoadedBlock_restoredSnapshot(t *testing.T) { + const idleTimeout = time.Second + ctx, tmpDir, bkt, blockID, metrics := prepareReaderPool(t) + + testNow := time.Now() + minUsedAt := testNow.Add(-time.Hour) + + fakeBlockID := ulid.MustNew(ulid.Now(), rand.Reader) + restoredSnapshot := lazyLoadedHeadersSnapshot{ + IndexHeaderLastUsedTime: map[ulid.ULID]int64{fakeBlockID: testNow.UnixMilli()}, + UserID: "anonymous", + } + + // Note that we are creating a ReaderPool that doesn't run a background cleanup task for idle + // Reader instances. We'll manually invoke the cleanup task when we need it as part of this test. + pool := newReaderPool(log.NewNopLogger(), Config{ + LazyLoadingEnabled: true, + LazyLoadingIdleTimeout: idleTimeout, + EagerLoadingStartupEnabled: true, + }, gate.NewNoop(), metrics, &restoredSnapshot) + defer pool.Close() + + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{}, false) + require.NoError(t, err) + defer func() { require.NoError(t, r.Close()) }() + + // Ensure it can read data. + labelNames, err := r.LabelNames() + require.NoError(t, err) + require.Equal(t, []string{"a"}, labelNames) + require.Equal(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) + require.Equal(t, float64(0), promtestutil.ToFloat64(metrics.lazyReader.unloadCount)) + + snapshot := lazyLoadedHeadersSnapshot{ + IndexHeaderLastUsedTime: pool.LoadedBlocks(minUsedAt.UnixMilli()), + UserID: "anonymous", + } + + err = snapshot.persist(tmpDir) + require.NoError(t, err) + + persistedFile := filepath.Join(tmpDir, lazyLoadedHeadersListFileName) + persistedData, err := os.ReadFile(persistedFile) + require.NoError(t, err) + + expectedIndex := map[string]any{ + fakeBlockID.String(): testNow.UnixMilli(), + } + // we know that there is only one lazyReader, hence simply set the ULID and timestamp. + require.Equal(t, 1, len(pool.lazyReaders), "expecting only one lazyReaders") + for r := range pool.lazyReaders { + expectedIndex[r.blockID.String()] = r.usedAt.Load() / int64(time.Millisecond) + } + expected := map[string]any{ + "user_id": "anonymous", + "index_header_last_used_time": expectedIndex, + } + expectedData, err := json.Marshal(expected) + require.NoError(t, err) + require.JSONEq(t, string(expectedData), string(persistedData)) +} + func prepareReaderPool(t *testing.T) (context.Context, string, *filesystem.Bucket, ulid.ULID, *ReaderPoolMetrics) { ctx := context.Background()