Skip to content

Commit

Permalink
indexheader: unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>
  • Loading branch information
narqo committed Jun 12, 2024
1 parent 9472d88 commit 925aa09
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 177 deletions.
186 changes: 18 additions & 168 deletions pkg/storegateway/indexheader/reader_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ package indexheader
import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"
Expand All @@ -33,7 +31,7 @@ func TestReaderPool_NewBinaryReader(t *testing.T) {
lazyReaderIdleTimeout time.Duration
eagerLoadReaderEnabled bool
initialSync bool
createLazyLoadedHeadersSnapshotFn func(blockId ulid.ULID) lazyLoadedHeadersSnapshot
createLoadedBlocksSnapshotFn func(blockId ulid.ULID) map[ulid.ULID]int64
expectedLoadCountMetricBeforeLabelNamesCall int
expectedLoadCountMetricAfterLabelNamesCall int
}{
Expand All @@ -58,11 +56,8 @@ func TestReaderPool_NewBinaryReader(t *testing.T) {
initialSync: true,
expectedLoadCountMetricBeforeLabelNamesCall: 1, // the index header will be eagerly loaded before the operation
expectedLoadCountMetricAfterLabelNamesCall: 1,
createLazyLoadedHeadersSnapshotFn: func(blockId ulid.ULID) lazyLoadedHeadersSnapshot {
return lazyLoadedHeadersSnapshot{
IndexHeaderLastUsedTime: map[ulid.ULID]int64{blockId: time.Now().UnixMilli()},
UserID: "anonymous",
}
createLoadedBlocksSnapshotFn: func(blockId ulid.ULID) map[ulid.ULID]int64 {
return map[ulid.ULID]int64{blockId: time.Now().UnixMilli()}
},
},
"block is present in pre-shutdown loaded blocks and eager-loading is enabled, loading index header after initial sync": {
Expand All @@ -72,11 +67,8 @@ func TestReaderPool_NewBinaryReader(t *testing.T) {
initialSync: false,
expectedLoadCountMetricBeforeLabelNamesCall: 0, // the index header is not eager loaded if not during initial-sync
expectedLoadCountMetricAfterLabelNamesCall: 1,
createLazyLoadedHeadersSnapshotFn: func(blockId ulid.ULID) lazyLoadedHeadersSnapshot {
return lazyLoadedHeadersSnapshot{
IndexHeaderLastUsedTime: map[ulid.ULID]int64{blockId: time.Now().UnixMilli()},
UserID: "anonymous",
}
createLoadedBlocksSnapshotFn: func(blockId ulid.ULID) map[ulid.ULID]int64 {
return map[ulid.ULID]int64{blockId: time.Now().UnixMilli()}
},
},
"block is not present in pre-shutdown loaded blocks snapshot and eager-loading is enabled": {
Expand All @@ -86,15 +78,11 @@ func TestReaderPool_NewBinaryReader(t *testing.T) {
initialSync: true,
expectedLoadCountMetricBeforeLabelNamesCall: 0, // although eager loading is enabled, this test will not do eager loading because the block ID is not in the lazy loaded file.
expectedLoadCountMetricAfterLabelNamesCall: 1,
createLazyLoadedHeadersSnapshotFn: func(_ ulid.ULID) lazyLoadedHeadersSnapshot {
createLoadedBlocksSnapshotFn: func(_ ulid.ULID) map[ulid.ULID]int64 {
// let's create a random fake blockID to be stored in lazy loaded headers file
fakeBlockID := ulid.MustNew(ulid.Now(), rand.Reader)
// this snapshot will refer to fake block, hence eager load wouldn't be executed for the real block that we test

return lazyLoadedHeadersSnapshot{
IndexHeaderLastUsedTime: map[ulid.ULID]int64{fakeBlockID: time.Now().UnixMilli()},
UserID: "anonymous",
}
return map[ulid.ULID]int64{fakeBlockID: time.Now().UnixMilli()}
},
},
"pre-shutdown loaded blocks snapshot doesn't exist and eager-loading is enabled": {
Expand All @@ -112,14 +100,10 @@ func TestReaderPool_NewBinaryReader(t *testing.T) {

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
snapshotConfig := LazyLoadedHeadersSnapshotConfig{
Path: tmpDir,
UserID: "anonymous",
}
if testData.createLazyLoadedHeadersSnapshotFn != nil {
lazyLoadedSnapshot := testData.createLazyLoadedHeadersSnapshotFn(blockID)
err := lazyLoadedSnapshot.persist(snapshotConfig.Path)
require.NoError(t, err)
var lazyLoadedBlocks map[ulid.ULID]int64
if testData.createLoadedBlocksSnapshotFn != nil {
lazyLoadedBlocks = testData.createLoadedBlocksSnapshotFn(blockID)
require.NotNil(t, lazyLoadedBlocks)
}

metrics := NewReaderPoolMetrics(nil)
Expand All @@ -128,7 +112,7 @@ func TestReaderPool_NewBinaryReader(t *testing.T) {
LazyLoadingIdleTimeout: testData.lazyReaderIdleTimeout,
EagerLoadingStartupEnabled: testData.eagerLoadReaderEnabled,
}
pool := NewReaderPool(log.NewNopLogger(), indexHeaderConfig, gate.NewNoop(), metrics, snapshotConfig)
pool := NewReaderPool(log.NewNopLogger(), indexHeaderConfig, gate.NewNoop(), metrics, lazyLoadedBlocks)
defer pool.Close()

r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, indexHeaderConfig, testData.initialSync)
Expand Down Expand Up @@ -199,154 +183,20 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) {

func TestReaderPool_LoadedBlocks(t *testing.T) {
usedAt := time.Now()
minUsedAt := usedAt.Add(-time.Minute)

id1 := ulid.MustNew(ulid.Now(), rand.Reader)
lb1 := LazyBinaryReader{
blockID: id1,
usedAt: atomic.NewInt64(usedAt.Add(-5 * time.Minute).UnixNano()), // idle block
// we just set to make reader != nil
reader: &StreamBinaryReader{},
}
id, err := ulid.New(ulid.Now(), rand.Reader)
require.NoError(t, err)

id2 := ulid.MustNew(ulid.Now(), rand.Reader)
lb2 := LazyBinaryReader{
blockID: id2,
lb := LazyBinaryReader{
blockID: id,
usedAt: atomic.NewInt64(usedAt.UnixNano()),
// we just set to make reader != nil
reader: &StreamBinaryReader{},
}
rp := ReaderPool{
lazyReaderEnabled: true,
lazyReaders: map[*LazyBinaryReader]struct{}{
&lb1: {},
&lb2: {},
},
lazyReaders: map[*LazyBinaryReader]struct{}{&lb: {}},
}
require.Equal(t, map[ulid.ULID]int64{id2: usedAt.UnixMilli()}, rp.LoadedBlocks(minUsedAt.UnixMilli()))
}

func TestReaderPool_PersistLazyLoadedBlock(t *testing.T) {
const idleTimeout = time.Second
ctx, tmpDir, bkt, blockID, metrics := prepareReaderPool(t)

// Note that we are creating a ReaderPool that doesn't run a background cleanup task for idle
// Reader instances. We'll manually invoke the cleanup task when we need it as part of this test.
pool := newReaderPool(log.NewNopLogger(), Config{
LazyLoadingEnabled: true,
LazyLoadingIdleTimeout: idleTimeout,
EagerLoadingStartupEnabled: true,
}, gate.NewNoop(), metrics, nil)
defer pool.Close()

r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{}, false)
require.NoError(t, err)
defer func() { require.NoError(t, r.Close()) }()

// Ensure it can read data.
labelNames, err := r.LabelNames()
require.NoError(t, err)
require.Equal(t, []string{"a"}, labelNames)
require.Equal(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.loadCount))
require.Equal(t, float64(0), promtestutil.ToFloat64(metrics.lazyReader.unloadCount))

minUsedAt := time.Now().Add(-time.Minute)

snapshot := lazyLoadedHeadersSnapshot{
IndexHeaderLastUsedTime: pool.LoadedBlocks(minUsedAt.UnixMilli()),
UserID: "anonymous",
}

err = snapshot.persist(tmpDir)
require.NoError(t, err)

persistedFile := filepath.Join(tmpDir, lazyLoadedHeadersListFileName)
persistedData, err := os.ReadFile(persistedFile)
require.NoError(t, err)

var expected string
// we know that there is only one lazyReader, hence just use formatter to set the ULID and timestamp.
require.Equal(t, 1, len(pool.lazyReaders), "expecting only one lazyReaders")
for r := range pool.lazyReaders {
expected = fmt.Sprintf(`{"index_header_last_used_time":{"%s":%d},"user_id":"anonymous"}`, r.blockID, r.usedAt.Load()/int64(time.Millisecond))
}
require.JSONEq(t, expected, string(persistedData))

// Wait enough time before checking it.
time.Sleep(idleTimeout * 2)
pool.closeIdleReaders()

// LoadedBlocks will update the IndexHeaderLastUsedTime map with the removal of idle blocks.
snapshot.IndexHeaderLastUsedTime = pool.LoadedBlocks(minUsedAt.UnixMilli())
err = snapshot.persist(tmpDir)
require.NoError(t, err)

persistedData, err = os.ReadFile(persistedFile)
require.NoError(t, err)

require.JSONEq(t, `{"index_header_last_used_time":{},"user_id":"anonymous"}`, string(persistedData), "index_header_last_used_time should be cleared")
}

func TestReaderPool_PersistLazyLoadedBlock_restoredSnapshot(t *testing.T) {
const idleTimeout = time.Second
ctx, tmpDir, bkt, blockID, metrics := prepareReaderPool(t)

testNow := time.Now()
minUsedAt := testNow.Add(-time.Hour)

fakeBlockID := ulid.MustNew(ulid.Now(), rand.Reader)
restoredSnapshot := lazyLoadedHeadersSnapshot{
IndexHeaderLastUsedTime: map[ulid.ULID]int64{fakeBlockID: testNow.UnixMilli()},
UserID: "anonymous",
}

// Note that we are creating a ReaderPool that doesn't run a background cleanup task for idle
// Reader instances. We'll manually invoke the cleanup task when we need it as part of this test.
pool := newReaderPool(log.NewNopLogger(), Config{
LazyLoadingEnabled: true,
LazyLoadingIdleTimeout: idleTimeout,
EagerLoadingStartupEnabled: true,
}, gate.NewNoop(), metrics, &restoredSnapshot)
defer pool.Close()

r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, Config{}, false)
require.NoError(t, err)
defer func() { require.NoError(t, r.Close()) }()

// Ensure it can read data.
labelNames, err := r.LabelNames()
require.NoError(t, err)
require.Equal(t, []string{"a"}, labelNames)
require.Equal(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.loadCount))
require.Equal(t, float64(0), promtestutil.ToFloat64(metrics.lazyReader.unloadCount))

snapshot := lazyLoadedHeadersSnapshot{
IndexHeaderLastUsedTime: pool.LoadedBlocks(minUsedAt.UnixMilli()),
UserID: "anonymous",
}

err = snapshot.persist(tmpDir)
require.NoError(t, err)

persistedFile := filepath.Join(tmpDir, lazyLoadedHeadersListFileName)
persistedData, err := os.ReadFile(persistedFile)
require.NoError(t, err)

expectedIndex := map[string]any{
fakeBlockID.String(): testNow.UnixMilli(),
}
// we know that there is only one lazyReader, hence simply set the ULID and timestamp.
require.Equal(t, 1, len(pool.lazyReaders), "expecting only one lazyReaders")
for r := range pool.lazyReaders {
expectedIndex[r.blockID.String()] = r.usedAt.Load() / int64(time.Millisecond)
}
expected := map[string]any{
"user_id": "anonymous",
"index_header_last_used_time": expectedIndex,
}
expectedData, err := json.Marshal(expected)
require.NoError(t, err)
require.JSONEq(t, string(expectedData), string(persistedData))
require.Equal(t, map[ulid.ULID]int64{id: usedAt.UnixMilli()}, rp.LoadedBlocks())
}

func prepareReaderPool(t *testing.T) (context.Context, string, *filesystem.Bucket, ulid.ULID, *ReaderPoolMetrics) {
Expand Down
18 changes: 9 additions & 9 deletions pkg/storegateway/indexheader/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ type blocksLoader interface {
LoadedBlocks() map[ulid.ULID]int64
}

func (s *Snapshotter) Start(ctx context.Context, l blocksLoader) error {
func (s *Snapshotter) Start(ctx context.Context, bl blocksLoader) error {
if !s.conf.Enabled {
return nil
}

err := s.persistLoadedBlocks(l)
err := s.PersistLoadedBlocks(bl)
if err != nil {
return fmt.Errorf("persist initial list of lazy-loaded index headers: %w", err)
}
Expand All @@ -67,7 +67,7 @@ func (s *Snapshotter) Start(ctx context.Context, l blocksLoader) error {
case <-s.done:
return
case <-tick.C:
if err := s.persistLoadedBlocks(l); err != nil {
if err := s.PersistLoadedBlocks(bl); err != nil {
level.Warn(s.logger).Log("msg", "failed to persist list of lazy-loaded index headers", "err", err)
}
}
Expand All @@ -77,9 +77,13 @@ func (s *Snapshotter) Start(ctx context.Context, l blocksLoader) error {
return nil
}

func (s *Snapshotter) persistLoadedBlocks(l blocksLoader) error {
func (s *Snapshotter) Stop() {
close(s.done)
}

func (s *Snapshotter) PersistLoadedBlocks(bl blocksLoader) error {
snapshot := &indexHeadersSnapshot{
IndexHeaderLastUsedTime: l.LoadedBlocks(),
IndexHeaderLastUsedTime: bl.LoadedBlocks(),
UserID: s.conf.UserID,
}
data, err := json.Marshal(snapshot)
Expand All @@ -96,10 +100,6 @@ func (s *Snapshotter) persistLoadedBlocks(l blocksLoader) error {
return atomicfs.CreateFileAndMove(tmpPath, finalPath, bytes.NewReader(data))
}

func (s *Snapshotter) Stop() {
close(s.done)
}

func (s *Snapshotter) RestoreLoadedBlocks() map[ulid.ULID]int64 {
if !s.conf.Enabled {
return nil
Expand Down
56 changes: 56 additions & 0 deletions pkg/storegateway/indexheader/snapshotter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package indexheader

import (
"crypto/rand"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/stretchr/testify/require"
)

func TestSnapshotter_PersistAndRestoreLoadedBlocks(t *testing.T) {
tmpDir := t.TempDir()

usedAt := time.Now()
testBlockID := ulid.MustNew(ulid.Now(), rand.Reader)

origBlocks := map[ulid.ULID]int64{
testBlockID: usedAt.UnixMilli(),
}
testBlocksLoader := testBlocksLoaderFunc(func() map[ulid.ULID]int64 { return origBlocks })

config := SnapshotterConfig{
Enabled: true,
Path: tmpDir,
UserID: "anonymous",
}

// First instance persists the original snapshot.
s1 := NewSnapshotter(log.NewNopLogger(), config)
err := s1.PersistLoadedBlocks(testBlocksLoader)
require.NoError(t, err)

persistedFile := filepath.Join(tmpDir, lazyLoadedHeadersListFileName)
data, err := os.ReadFile(persistedFile)
require.NoError(t, err)

expected := fmt.Sprintf(`{"index_header_last_used_time":{"%s":%d},"user_id":"anonymous"}`, testBlockID, usedAt.UnixMilli())
require.JSONEq(t, expected, string(data))

// Another instance restores the snapshot persisted earlier.
s2 := NewSnapshotter(log.NewNopLogger(), config)

restoredBlocks := s2.RestoreLoadedBlocks()
require.Equal(t, origBlocks, restoredBlocks)
}

type testBlocksLoaderFunc func() map[ulid.ULID]int64

func (f testBlocksLoaderFunc) LoadedBlocks() map[ulid.ULID]int64 {
return f()
}

0 comments on commit 925aa09

Please sign in to comment.