Skip to content

Commit

Permalink
Implement code review suggestion
Browse files Browse the repository at this point in the history
Move `Allocatior()` function from base interface to "extended" store
interface, like it was done for `BloomMetrics()`.

Additionally renamed `Store` to `StoreBase` (which is implemented by all
stores, also the individual store entries) and `StoreWithMetrics` to
`Store`, because this is the main interface that is used.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Jun 20, 2024
1 parent 40ccf84 commit 5221ff9
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 34 deletions.
4 changes: 2 additions & 2 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Builder struct {
logger log.Logger

tsdbStore common.TSDBStore
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreBase
chunkLoader ChunkLoader

client protos.PlannerForBuilderClient
Expand All @@ -51,7 +51,7 @@ func New(
storeCfg storage.Config,
storageMetrics storage.ClientMetrics,
fetcherProvider stores.ChunkFetcherProvider,
bloomStore bloomshipper.Store,
bloomStore bloomshipper.StoreBase,
logger log.Logger,
r prometheus.Registerer,
) (*Builder, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Planner struct {
schemaCfg config.SchemaConfig

tsdbStore common.TSDBStore
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreBase

tasksQueue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
Expand All @@ -57,7 +57,7 @@ func New(
schemaCfg config.SchemaConfig,
storeCfg storage.Config,
storageMetrics storage.ClientMetrics,
bloomStore bloomshipper.Store,
bloomStore bloomshipper.StoreBase,
logger log.Logger,
r prometheus.Registerer,
) (*Planner, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Compactor struct {
retentionManager *RetentionManager

// temporary workaround until bloomStore has implemented read/write shipper interface
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreBase

sharding util_ring.TenantSharding

Expand All @@ -69,7 +69,7 @@ func New(
ring ring.ReadRing,
ringLifeCycler *ring.BasicLifecycler,
limits Limits,
store bloomshipper.StoreWithMetrics,
store bloomshipper.Store,
logger log.Logger,
r prometheus.Registerer,
) (*Compactor, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

type SimpleBloomController struct {
tsdbStore TSDBStore
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreBase
chunkLoader ChunkLoader
metrics *Metrics
limits Limits
Expand All @@ -32,7 +32,7 @@ type SimpleBloomController struct {

func NewSimpleBloomController(
tsdbStore TSDBStore,
blockStore bloomshipper.Store,
blockStore bloomshipper.StoreBase,
chunkLoader ChunkLoader,
limits Limits,
metrics *Metrics,
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type RetentionLimits interface {
type RetentionManager struct {
cfg RetentionConfig
limits RetentionLimits
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreBase
sharding retentionSharding
metrics *Metrics
logger log.Logger
Expand All @@ -108,7 +108,7 @@ type RetentionManager struct {
func NewRetentionManager(
cfg RetentionConfig,
limits RetentionLimits,
bloomStore bloomshipper.Store,
bloomStore bloomshipper.StoreBase,
sharding retentionSharding,
metrics *Metrics,
logger log.Logger,
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Gateway struct {

queue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
bloomStore bloomshipper.StoreWithMetrics
bloomStore bloomshipper.Store

pendingTasks *atomic.Int64

Expand All @@ -72,7 +72,7 @@ func (l *fixedQueueLimits) MaxConsumers(_ string, _ int) int {
}

// New returns a new instance of the Bloom Gateway.
func New(cfg Config, store bloomshipper.StoreWithMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
utillog.WarnExperimentalUse("Bloom Gateway", logger)
g := &Gateway{
cfg: cfg,
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/grafana/loki/v3/pkg/util/mempool"
)

var _ bloomshipper.Store = &dummyStore{}
var _ bloomshipper.StoreBase = &dummyStore{}

// refs and blocks must be in 1-1 correspondence.
func newMockBloomStore(refs []bloomshipper.BlockRef, blocks []*v1.Block, metas []bloomshipper.Meta) *dummyStore {
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type blockWithSeries struct {
}

type defaultBlockResolver struct {
store bloomshipper.Store
store bloomshipper.StoreBase
logger log.Logger
}

Expand Down Expand Up @@ -123,7 +123,7 @@ func unassignedSeries(mapped []blockWithSeries, series []*logproto.GroupedChunkR
return skipped
}

func NewBlockResolver(store bloomshipper.Store, logger log.Logger) BlockResolver {
func NewBlockResolver(store bloomshipper.StoreBase, logger log.Logger) BlockResolver {
return &defaultBlockResolver{
store: store,
logger: logger,
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ type Loki struct {
querierAPI *querier.QuerierAPI
ingesterQuerier *querier.IngesterQuerier
Store storage.Store
BloomStore bloomshipper.StoreWithMetrics
BloomStore bloomshipper.Store
tableManager *index.TableManager
frontend Frontend
ruler *base_ruler.Ruler
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/shipper/bloomshipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ type Interface interface {
}

type Shipper struct {
store Store
store StoreBase
}

func NewShipper(client Store) *Shipper {
func NewShipper(client StoreBase) *Shipper {
return &Shipper{store: client}
}

Expand Down
25 changes: 9 additions & 16 deletions pkg/storage/stores/shipper/bloomshipper/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
errNoStore = errors.New("no store found for time")
)

type Store interface {
type StoreBase interface {
ResolveMetas(ctx context.Context, params MetaSearchParams) ([][]MetaRef, []*Fetcher, error)
FetchMetas(ctx context.Context, params MetaSearchParams) ([]Meta, error)
FetchBlocks(ctx context.Context, refs []BlockRef, opts ...FetchOption) ([]*CloseableBlockQuerier, error)
Expand All @@ -39,13 +39,13 @@ type Store interface {
) (map[string][]client.StorageObject, error)
Fetcher(ts model.Time) (*Fetcher, error)
Client(ts model.Time) (Client, error)
Allocator() mempool.Allocator
Stop()
}

type StoreWithMetrics interface {
Store
type Store interface {
StoreBase
BloomMetrics() *v1.Metrics
Allocator() mempool.Allocator
}

type bloomStoreConfig struct {
Expand All @@ -55,7 +55,7 @@ type bloomStoreConfig struct {
}

// Compiler check to ensure bloomStoreEntry implements the Store interface
var _ Store = &bloomStoreEntry{}
var _ StoreBase = &bloomStoreEntry{}

type bloomStoreEntry struct {
start model.Time
Expand Down Expand Up @@ -267,21 +267,14 @@ func (b *bloomStoreEntry) Client(_ model.Time) (Client, error) {
return b.bloomClient, nil
}

// Allocator implements Store.
// While bloomStoreEntry implements the Store interface, this method must never
// be used directly and therefore can safely return nil.
func (b *bloomStoreEntry) Allocator() mempool.Allocator {
return nil
}

// Stop implements Store.
func (b bloomStoreEntry) Stop() {
b.bloomClient.Stop()
b.fetcher.Close()
}

// Compiler check to ensure BloomStore implements the Store interface
var _ StoreWithMetrics = &BloomStore{}
var _ Store = &BloomStore{}

type BloomStore struct {
stores []*bloomStoreEntry
Expand Down Expand Up @@ -415,7 +408,7 @@ func (b *BloomStore) TenantFilesForInterval(
) (map[string][]client.StorageObject, error) {
var allTenants map[string][]client.StorageObject

err := b.forStores(ctx, interval, func(innerCtx context.Context, interval Interval, store Store) error {
err := b.forStores(ctx, interval, func(innerCtx context.Context, interval Interval, store StoreBase) error {
tenants, err := store.TenantFilesForInterval(innerCtx, interval, filter)
if err != nil {
return err
Expand Down Expand Up @@ -462,7 +455,7 @@ func (b *BloomStore) ResolveMetas(ctx context.Context, params MetaSearchParams)
refs := make([][]MetaRef, 0, len(b.stores))
fetchers := make([]*Fetcher, 0, len(b.stores))

err := b.forStores(ctx, params.Interval, func(innerCtx context.Context, interval Interval, store Store) error {
err := b.forStores(ctx, params.Interval, func(innerCtx context.Context, interval Interval, store StoreBase) error {
newParams := params
newParams.Interval = interval
metas, fetcher, err := store.ResolveMetas(innerCtx, newParams)
Expand Down Expand Up @@ -596,7 +589,7 @@ func (b *BloomStore) storeDo(ts model.Time, f func(s *bloomStoreEntry) error) er
return fmt.Errorf("no store found for timestamp %s", ts.Time())
}

func (b *BloomStore) forStores(ctx context.Context, interval Interval, f func(innerCtx context.Context, interval Interval, store Store) error) error {
func (b *BloomStore) forStores(ctx context.Context, interval Interval, f func(innerCtx context.Context, interval Interval, store StoreBase) error) error {
if len(b.stores) == 0 {
return nil
}
Expand Down

0 comments on commit 5221ff9

Please sign in to comment.