Skip to content

Commit

Permalink
perf: Re-introduce fixed size memory pool for bloom querier (#13172)
Browse files Browse the repository at this point in the history
This PR re-introduces the fixed size memory pool that was originally introduced with #13039 but reverted with #13162
Additionally, it removes the package-global variable to store the type of allocator that should be used. Instead, the allocator type is passed during the bloom store creation.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Jun 20, 2024
1 parent 9289493 commit 4117b6c
Show file tree
Hide file tree
Showing 39 changed files with 797 additions and 135 deletions.
4 changes: 2 additions & 2 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Builder struct {
logger log.Logger

tsdbStore common.TSDBStore
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreBase
chunkLoader ChunkLoader

client protos.PlannerForBuilderClient
Expand All @@ -51,7 +51,7 @@ func New(
storeCfg storage.Config,
storageMetrics storage.ClientMetrics,
fetcherProvider stores.ChunkFetcherProvider,
bloomStore bloomshipper.Store,
bloomStore bloomshipper.StoreBase,
logger log.Logger,
r prometheus.Registerer,
) (*Builder, error) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/loki/v3/pkg/chunkenc"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBlooms, refs []bloomshipper.BlockRef) {
Expand Down Expand Up @@ -74,7 +75,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(b, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +153,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize).Iter()
bq := v1.NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter()
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Planner struct {
schemaCfg config.SchemaConfig

tsdbStore common.TSDBStore
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreBase

tasksQueue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
Expand All @@ -57,7 +57,7 @@ func New(
schemaCfg config.SchemaConfig,
storeCfg storage.Config,
storageMetrics storage.ClientMetrics,
bloomStore bloomshipper.Store,
bloomStore bloomshipper.StoreBase,
logger log.Logger,
r prometheus.Registerer,
) (*Planner, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

var testDay = parseDayTime("2023-09-01")
Expand Down Expand Up @@ -411,7 +412,7 @@ func createPlanner(
reg := prometheus.NewPedanticRegistry()
metasCache := cache.NewNoopCache()
blocksCache := bloomshipper.NewFsBlocksCache(storageCfg.BloomShipperConfig.BlocksCache, reg, logger)
bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, reg, logger)
bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
require.NoError(t, err)

planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg)
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Compactor struct {
retentionManager *RetentionManager

// temporary workaround until bloomStore has implemented read/write shipper interface
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreBase

sharding util_ring.TenantSharding

Expand All @@ -69,7 +69,7 @@ func New(
ring ring.ReadRing,
ringLifeCycler *ring.BasicLifecycler,
limits Limits,
store bloomshipper.StoreWithMetrics,
store bloomshipper.Store,
logger log.Logger,
r prometheus.Registerer,
) (*Compactor, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

type SimpleBloomController struct {
tsdbStore TSDBStore
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreBase
chunkLoader ChunkLoader
metrics *Metrics
limits Limits
Expand All @@ -32,7 +32,7 @@ type SimpleBloomController struct {

func NewSimpleBloomController(
tsdbStore TSDBStore,
blockStore bloomshipper.Store,
blockStore bloomshipper.StoreBase,
chunkLoader ChunkLoader,
limits Limits,
metrics *Metrics,
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type RetentionLimits interface {
type RetentionManager struct {
cfg RetentionConfig
limits RetentionLimits
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreBase
sharding retentionSharding
metrics *Metrics
logger log.Logger
Expand All @@ -108,7 +108,7 @@ type RetentionManager struct {
func NewRetentionManager(
cfg RetentionConfig,
limits RetentionLimits,
bloomStore bloomshipper.Store,
bloomStore bloomshipper.StoreBase,
sharding retentionSharding,
metrics *Metrics,
logger log.Logger,
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloomcompactor/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/types"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/mempool"
lokiring "github.com/grafana/loki/v3/pkg/util/ring"
"github.com/grafana/loki/v3/pkg/validation"
)
Expand Down Expand Up @@ -822,7 +823,7 @@ func NewMockBloomStoreWithWorkDir(t *testing.T, workDir string) (*bloomshipper.B
metasCache := cache.NewMockCache()
blocksCache := bloomshipper.NewFsBlocksCache(storageConfig.BloomShipperConfig.BlocksCache, prometheus.NewPedanticRegistry(), logger)

store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageConfig, metrics, metasCache, blocksCache, reg, logger)
store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageConfig, metrics, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
if err == nil {
t.Cleanup(store.Stop)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/loki/v3/pkg/chunkenc"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBlooms, refs []bloomshipper.BlockRef) {
Expand Down Expand Up @@ -74,7 +75,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(b, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +153,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBlooms, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize).Iter()
bq := v1.NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, v1.DefaultMaxPageSize).Iter()
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Gateway struct {

queue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
bloomStore bloomshipper.StoreWithMetrics
bloomStore bloomshipper.Store

pendingTasks *atomic.Int64

Expand All @@ -72,7 +72,7 @@ func (l *fixedQueueLimits) MaxConsumers(_ string, _ int) int {
}

// New returns a new instance of the Bloom Gateway.
func New(cfg Config, store bloomshipper.StoreWithMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
utillog.WarnExperimentalUse("Bloom Gateway", logger)
g := &Gateway{
cfg: cfg,
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/util/mempool"
"github.com/grafana/loki/v3/pkg/validation"
)

Expand Down Expand Up @@ -92,7 +93,7 @@ func setupBloomStore(t *testing.T) *bloomshipper.BloomStore {

reg := prometheus.NewRegistry()
blocksCache := bloomshipper.NewFsBlocksCache(storageCfg.BloomShipperConfig.BlocksCache, nil, logger)
store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, cm, nil, blocksCache, reg, logger)
store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, cm, nil, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
require.NoError(t, err)
t.Cleanup(store.Stop)

Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config.
// after iteration for performance (alloc reduction).
// This is safe to do here because we do not capture
// the underlying bloom []byte outside of iteration
bloomshipper.WithPool(true),
bloomshipper.WithPool(p.store.Allocator()),
)
duration = time.Since(startBlocks)
level.Debug(p.logger).Log("msg", "fetched blocks", "count", len(refs), "duration", duration, "err", err)
Expand Down
21 changes: 16 additions & 5 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/mempool"
)

var _ bloomshipper.Store = &dummyStore{}
var _ bloomshipper.StoreBase = &dummyStore{}

// refs and blocks must be in 1-1 correspondence.
func newMockBloomStore(refs []bloomshipper.BlockRef, blocks []*v1.Block, metas []bloomshipper.Meta) *dummyStore {
allocator := mempool.New("bloompages", mempool.Buckets{
{Size: 32, Capacity: 512 << 10},
}, nil)
return &dummyStore{
refs: refs,
blocks: blocks,
metas: metas,
refs: refs,
blocks: blocks,
metas: metas,
allocator: allocator,
}
}

Expand All @@ -38,6 +43,8 @@ type dummyStore struct {
refs []bloomshipper.BlockRef
blocks []*v1.Block

allocator mempool.Allocator

// mock how long it takes to serve block queriers
delay time.Duration
// mock response error when serving block queriers in ForEach
Expand Down Expand Up @@ -76,6 +83,10 @@ func (s *dummyStore) Client(_ model.Time) (bloomshipper.Client, error) {
return nil, nil
}

func (s *dummyStore) Allocator() mempool.Allocator {
return s.allocator
}

func (s *dummyStore) Stop() {
}

Expand All @@ -92,7 +103,7 @@ func (s *dummyStore) FetchBlocks(_ context.Context, refs []bloomshipper.BlockRef
if ref.Bounds.Equal(s.refs[i].Bounds) {
blockCopy := *block
bq := &bloomshipper.CloseableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(&blockCopy, false, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(&blockCopy, s.Allocator(), v1.DefaultMaxPageSize),
BlockRef: s.refs[i],
}
result = append(result, bq)
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type blockWithSeries struct {
}

type defaultBlockResolver struct {
store bloomshipper.Store
store bloomshipper.StoreBase
logger log.Logger
}

Expand Down Expand Up @@ -123,7 +123,7 @@ func unassignedSeries(mapped []blockWithSeries, series []*logproto.GroupedChunkR
return skipped
}

func NewBlockResolver(store bloomshipper.Store, logger log.Logger) BlockResolver {
func NewBlockResolver(store bloomshipper.StoreBase, logger log.Logger) BlockResolver {
return &defaultBlockResolver{
store: store,
logger: logger,
Expand Down
1 change: 1 addition & 0 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
// t.Log(i, j, string(keys[i][j]))
// }
// }

blocks = append(blocks, block)
metas = append(metas, meta)
blockRefs = append(blockRefs, blockRef)
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ type Loki struct {
querierAPI *querier.QuerierAPI
ingesterQuerier *querier.IngesterQuerier
Store storage.Store
BloomStore bloomshipper.StoreWithMetrics
BloomStore bloomshipper.Store
tableManager *index.TableManager
frontend Frontend
ruler *base_ruler.Ruler
Expand Down
20 changes: 19 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import (
"github.com/grafana/loki/v3/pkg/util/httpreq"
"github.com/grafana/loki/v3/pkg/util/limiter"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/mempool"
"github.com/grafana/loki/v3/pkg/util/querylimits"
lokiring "github.com/grafana/loki/v3/pkg/util/ring"
serverutil "github.com/grafana/loki/v3/pkg/util/server"
Expand Down Expand Up @@ -754,7 +755,24 @@ func (t *Loki) initBloomStore() (services.Service, error) {
level.Warn(logger).Log("msg", "failed to preload blocks cache", "err", err)
}

t.BloomStore, err = bloomshipper.NewBloomStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics, metasCache, blocksCache, reg, logger)
var pageAllocator mempool.Allocator

// Set global BloomPageAllocator variable
switch bsCfg.MemoryManagement.BloomPageAllocationType {
case "simple":
pageAllocator = &mempool.SimpleHeapAllocator{}
case "dynamic":
// sync buffer pool for bloom pages
// 128KB 256KB 512KB 1MB 2MB 4MB 8MB 16MB 32MB 64MB 128MB
pageAllocator = mempool.NewBytePoolAllocator(128<<10, 128<<20, 2)
case "fixed":
pageAllocator = mempool.New("bloom-page-pool", bsCfg.MemoryManagement.BloomPageMemPoolBuckets, reg)
default:
// should not happen as the type is validated upfront
return nil, fmt.Errorf("failed to create bloom store: invalid allocator type")
}

t.BloomStore, err = bloomshipper.NewBloomStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics, metasCache, blocksCache, pageAllocator, reg, logger)
if err != nil {
return nil, fmt.Errorf("failed to create bloom store: %w", err)
}
Expand Down
23 changes: 15 additions & 8 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"

"github.com/pkg/errors"

"github.com/grafana/loki/v3/pkg/util/mempool"
)

type BlockMetadata struct {
Expand Down Expand Up @@ -110,17 +112,18 @@ type BlockQuerier struct {
}

// NewBlockQuerier returns a new BlockQuerier for the given block.
// WARNING: If noCapture is true, the underlying byte slice of the bloom page
// will be returned to the pool for efficiency. This can only safely be used
// when the underlying bloom bytes don't escape the decoder, i.e.
// when loading blooms for querying (bloom-gw) but not for writing (bloom-compactor).
// When usePool is true, the bloom MUST NOT be captured by the caller. Rather,
// it should be discarded before another call to Next().
func NewBlockQuerier(b *Block, usePool bool, maxPageSize int) *BlockQuerier {
// WARNING: You can pass an implementation of Allocator that is responsible for
// whether the underlying byte slice of the bloom page will be returned to the
// pool for efficiency or not. Returning to the pool can only safely be used
// when the underlying bloom bytes don't escape the decoder, i.e. when loading
// blooms for querying (bloom-gateway), but not for writing (bloom-compactor).
// Therefore, when calling NewBlockQuerier on the write path, you should always
// pass the SimpleHeapAllocator implementation of the Allocator interface.
func NewBlockQuerier(b *Block, alloc mempool.Allocator, maxPageSize int) *BlockQuerier {
return &BlockQuerier{
block: b,
LazySeriesIter: NewLazySeriesIter(b),
blooms: NewLazyBloomIter(b, usePool, maxPageSize),
blooms: NewLazyBloomIter(b, alloc, maxPageSize),
}
}

Expand All @@ -144,6 +147,10 @@ func (bq *BlockQuerier) Err() error {
return bq.blooms.Err()
}

func (bq *BlockQuerier) Close() {
bq.blooms.Close()
}

type BlockQuerierIter struct {
*BlockQuerier
}
Expand Down
Loading

0 comments on commit 4117b6c

Please sign in to comment.