Skip to content

Commit

Permalink
indexheader: keep pre-shutdown snapshot between restarts
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>
  • Loading branch information
narqo committed Jun 5, 2024
1 parent 011c02f commit cbe2d95
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 15 deletions.
30 changes: 25 additions & 5 deletions pkg/storegateway/indexheader/reader_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,13 @@ func NewReaderPool(logger log.Logger, indexHeaderConfig Config, lazyLoadingGate
return
case <-tickerIdleReader.C:
p.closeIdleReaders()
case <-lazyLoadC:
case t := <-lazyLoadC:
// minUsedAt is the threshold for how recently used should the block be to stay in the snapshot;
// we add an extra couple of minutes to make sure the pool closes the idle readers.
dur := p.lazyReaderIdleTimeout + (10 * time.Minute)
minUsedAt := t.Truncate(time.Minute).Add(-dur).UnixMilli()
snapshot := lazyLoadedHeadersSnapshot{
IndexHeaderLastUsedTime: p.LoadedBlocks(),
IndexHeaderLastUsedTime: p.LoadedBlocks(minUsedAt),
UserID: lazyLoadedSnapshotConfig.UserID,
}

Expand Down Expand Up @@ -217,7 +221,7 @@ func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt
return nil, lazyErr
}

// we only try to eager load only during initialSync
// we only try to eager load during initialSync
if initialSync && p.preShutdownLoadedBlocks != nil {
// we only eager load if we have preShutdownLoadedBlocks for the given block id
if p.preShutdownLoadedBlocks.IndexHeaderLastUsedTime[id] > 0 {
Expand Down Expand Up @@ -292,14 +296,30 @@ func (p *ReaderPool) onLazyReaderClosed(r *LazyBinaryReader) {
}

// LoadedBlocks returns a new map of lazy-loaded block IDs and the last time they were used in milliseconds.
func (p *ReaderPool) LoadedBlocks() map[ulid.ULID]int64 {
// It skips blocks, which weren't in use after minUsedAt.
func (p *ReaderPool) LoadedBlocks(minUsedAt int64) map[ulid.ULID]int64 {
p.lazyReadersMx.Lock()
defer p.lazyReadersMx.Unlock()

blocks := make(map[ulid.ULID]int64, len(p.lazyReaders))
for r := range p.lazyReaders {
if r.reader != nil {
blocks[r.blockID] = r.usedAt.Load() / int64(time.Millisecond)
usedAt := r.usedAt.Load() / int64(time.Millisecond)
if usedAt > minUsedAt {
blocks[r.blockID] = usedAt
}
}
}

// Add blocks from the pre-shutdown snapshot if those are still "fresh".
if p.lazyReaderEnabled && p.preShutdownLoadedBlocks != nil {
for id, usedAt := range p.preShutdownLoadedBlocks.IndexHeaderLastUsedTime {
if _, ok := blocks[id]; ok {
continue
}
if usedAt > minUsedAt {
blocks[id] = usedAt
}
}
}

Expand Down
95 changes: 85 additions & 10 deletions pkg/storegateway/indexheader/reader_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package indexheader
import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -198,20 +199,31 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) {

func TestReaderPool_LoadedBlocks(t *testing.T) {
usedAt := time.Now()
id, err := ulid.New(ulid.Now(), rand.Reader)
require.NoError(t, err)
minUsedAt := usedAt.Add(-time.Minute)

lb := LazyBinaryReader{
blockID: id,
id1 := ulid.MustNew(ulid.Now(), rand.Reader)
lb1 := LazyBinaryReader{
blockID: id1,
usedAt: atomic.NewInt64(usedAt.Add(-5 * time.Minute).UnixNano()), // idle block
// we just set to make reader != nil
reader: &StreamBinaryReader{},
}

id2 := ulid.MustNew(ulid.Now(), rand.Reader)
lb2 := LazyBinaryReader{
blockID: id2,
usedAt: atomic.NewInt64(usedAt.UnixNano()),
// we just set to make reader != nil
reader: &StreamBinaryReader{},
}
rp := ReaderPool{
lazyReaderEnabled: true,
lazyReaders: map[*LazyBinaryReader]struct{}{&lb: {}},
lazyReaders: map[*LazyBinaryReader]struct{}{
&lb1: {},
&lb2: {},
},
}
require.Equal(t, map[ulid.ULID]int64{id: usedAt.UnixMilli()}, rp.LoadedBlocks())
require.Equal(t, map[ulid.ULID]int64{id2: usedAt.UnixMilli()}, rp.LoadedBlocks(minUsedAt.UnixMilli()))
}

func TestReaderPool_PersistLazyLoadedBlock(t *testing.T) {
Expand All @@ -238,8 +250,10 @@ func TestReaderPool_PersistLazyLoadedBlock(t *testing.T) {
require.Equal(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.loadCount))
require.Equal(t, float64(0), promtestutil.ToFloat64(metrics.lazyReader.unloadCount))

minUsedAt := time.Now().Add(-time.Minute)

snapshot := lazyLoadedHeadersSnapshot{
IndexHeaderLastUsedTime: pool.LoadedBlocks(),
IndexHeaderLastUsedTime: pool.LoadedBlocks(minUsedAt.UnixMilli()),
UserID: "anonymous",
}

Expand All @@ -262,9 +276,8 @@ func TestReaderPool_PersistLazyLoadedBlock(t *testing.T) {
time.Sleep(idleTimeout * 2)
pool.closeIdleReaders()

// LoadedBlocks will update the IndexHeaderLastUsedTime map with the removal of
// idle blocks.
snapshot.IndexHeaderLastUsedTime = pool.LoadedBlocks()
// LoadedBlocks will update the IndexHeaderLastUsedTime map with the removal of idle blocks.
snapshot.IndexHeaderLastUsedTime = pool.LoadedBlocks(minUsedAt.UnixMilli())
err = snapshot.persist(tmpDir)
require.NoError(t, err)

Expand All @@ -274,6 +287,68 @@ func TestReaderPool_PersistLazyLoadedBlock(t *testing.T) {
require.JSONEq(t, `{"index_header_last_used_time":{},"user_id":"anonymous"}`, string(persistedData), "index_header_last_used_time should be cleared")
}

func TestReaderPool_PersistLazyLoadedBlock_restoredSnapshot(t *testing.T) {
const idleTimeout = time.Second
ctx, tmpDir, bkt, blockID, metrics := prepareReaderPool(t)

testNow := time.Now()
minUsedAt := testNow.Add(-time.Hour)

fakeBlockID := ulid.MustNew(ulid.Now(), rand.Reader)
restoredSnapshot := lazyLoadedHeadersSnapshot{
IndexHeaderLastUsedTime: map[ulid.ULID]int64{fakeBlockID: testNow.UnixMilli()},
UserID: "anonymous",
}

// Note that we are creating a ReaderPool that doesn't run a background cleanup task for idle
// Reader instances. We'll manually invoke the cleanup task when we need it as part of this test.
pool := newReaderPool(log.NewNopLogger(), Config{
LazyLoadingEnabled: true,
LazyLoadingIdleTimeout: idleTimeout,
EagerLoadingStartupEnabled: true,
}, gate.NewNoop(), metrics, &restoredSnapshot)
defer pool.Close()

r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{}, false)
require.NoError(t, err)
defer func() { require.NoError(t, r.Close()) }()

// Ensure it can read data.
labelNames, err := r.LabelNames()
require.NoError(t, err)
require.Equal(t, []string{"a"}, labelNames)
require.Equal(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.loadCount))
require.Equal(t, float64(0), promtestutil.ToFloat64(metrics.lazyReader.unloadCount))

snapshot := lazyLoadedHeadersSnapshot{
IndexHeaderLastUsedTime: pool.LoadedBlocks(minUsedAt.UnixMilli()),
UserID: "anonymous",
}

err = snapshot.persist(tmpDir)
require.NoError(t, err)

persistedFile := filepath.Join(tmpDir, lazyLoadedHeadersListFileName)
persistedData, err := os.ReadFile(persistedFile)
require.NoError(t, err)

expectedIndex := map[string]any{
fakeBlockID.String(): testNow.UnixMilli(),
}
// we know that there is only one lazyReader, hence simply set the ULID and timestamp.
require.Equal(t, 1, len(pool.lazyReaders), "expecting only one lazyReaders")
for r := range pool.lazyReaders {
expectedIndex[r.blockID.String()] = r.usedAt.Load() / int64(time.Millisecond)
}
expected := map[string]any{
"user_id": "anonymous",
"index_header_last_used_time": expectedIndex,
}
expectedData, err := json.Marshal(expected)
require.NoError(t, err)
require.JSONEq(t, string(expectedData), string(persistedData))
}

func prepareReaderPool(t *testing.T) (context.Context, string, *filesystem.Bucket, ulid.ULID, *ReaderPoolMetrics) {
ctx := context.Background()

Expand Down

0 comments on commit cbe2d95

Please sign in to comment.