Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions pkg/chunk/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []
}

func testChunkFetcher(t *testing.T, c cache.Cache, keys []string, chunks []chunk.Chunk) {
fetcher, err := chunk.NewChunkFetcher(cache.Config{
Cache: c,
}, false, nil)
fetcher, err := chunk.NewChunkFetcher(c, false, nil)
require.NoError(t, err)
defer fetcher.Stop()

Expand Down
5 changes: 4 additions & 1 deletion pkg/chunk/cache/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,16 @@ func NewMemcachedClient(cfg MemcachedClientConfig, name string, r prometheus.Reg
serverList: selector,
hostname: cfg.Host,
service: cfg.Service,
addresses: strings.Split(cfg.Addresses, ","),
provider: dns.NewProvider(util.Logger, dnsProviderRegisterer, dns.GolangResolverType),
quit: make(chan struct{}),

numServers: memcacheServersDiscovered.WithLabelValues(name),
}

if len(cfg.Addresses) > 0 {
newClient.addresses = strings.Split(cfg.Addresses, ",")
}

err := newClient.updateMemcacheServers()
if err != nil {
level.Error(util.Logger).Log("msg", "error setting memcache servers to host", "host", cfg.Host, "err", err)
Expand Down
7 changes: 6 additions & 1 deletion pkg/chunk/cache/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,14 @@ func (m *mockCache) Fetch(ctx context.Context, keys []string) (found []string, b
func (m *mockCache) Stop() {
}

// NewMockCache makes a new MockCache
// NewMockCache makes a new MockCache.
func NewMockCache() Cache {
return &mockCache{
cache: map[string][]byte{},
}
}

// NewNoopCache returns a no-op cache.
func NewNoopCache() Cache {
return NewTiered(nil)
}
4 changes: 2 additions & 2 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ type store struct {
*Fetcher
}

func newStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Client, limits StoreLimits) (Store, error) {
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, cfg.chunkCacheStubs, chunks)
func newStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Client, limits StoreLimits, chunksCache cache.Cache) (Store, error) {
fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, chunks)
if err != nil {
return nil, err
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,13 @@ func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg Sto
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

chunksCache, err := cache.New(storeCfg.ChunkCacheConfig)
require.NoError(t, err)
writeDedupeCache, err := cache.New(storeCfg.WriteDedupeCacheConfig)
require.NoError(t, err)

store := NewCompositeStore()
err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides)
err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides, chunksCache, writeDedupeCache)
require.NoError(t, err)
return store
}
Expand Down
10 changes: 2 additions & 8 deletions pkg/chunk/chunk_store_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,10 @@ type decodeResponse struct {
}

// NewChunkFetcher makes a new ChunkFetcher.
func NewChunkFetcher(cfg cache.Config, cacheStubs bool, storage Client) (*Fetcher, error) {
cfg.Prefix = "chunks"
cache, err := cache.New(cfg)
if err != nil {
return nil, err
}

func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, storage Client) (*Fetcher, error) {
c := &Fetcher{
storage: storage,
cache: cache,
cache: cacher,
cacheStubs: cacheStubs,
decodeRequests: make(chan decodeRequest),
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/chunk/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/cortexproject/cortex/pkg/chunk/cache"
)

// StoreLimits helps get Limits specific to Queries for Stores
Expand Down Expand Up @@ -56,15 +58,15 @@ func NewCompositeStore() CompositeStore {
}

// AddPeriod adds the configuration for a period of time to the CompositeStore
func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks Client, limits StoreLimits) error {
func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) error {
schema := cfg.CreateSchema()
var store Store
var err error
switch cfg.Schema {
case "v9", "v10", "v11":
store, err = newSeriesStore(storeCfg, schema, index, chunks, limits)
store, err = newSeriesStore(storeCfg, schema, index, chunks, limits, chunksCache, writeDedupeCache)
default:
store, err = newStore(storeCfg, schema, index, chunks, limits)
store, err = newStore(storeCfg, schema, index, chunks, limits, chunksCache)
}
if err != nil {
return err
Expand Down
9 changes: 2 additions & 7 deletions pkg/chunk/series_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,8 @@ type seriesStore struct {
writeDedupeCache cache.Cache
}

func newSeriesStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Client, limits StoreLimits) (Store, error) {
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, cfg.chunkCacheStubs, chunks)
if err != nil {
return nil, err
}

writeDedupeCache, err := cache.New(cfg.WriteDedupeCacheConfig)
func newSeriesStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) (Store, error) {
fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, chunks)
if err != nil {
return nil, err
}
Expand Down
22 changes: 18 additions & 4 deletions pkg/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,28 @@ func (cfg *Config) Validate() error {

// NewStore makes the storage clients based on the configuration.
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits) (chunk.Store, error) {
tieredCache, err := cache.New(cfg.IndexQueriesCacheConfig)
indexReadCache, err := cache.New(cfg.IndexQueriesCacheConfig)
if err != nil {
return nil, err
}

writeDedupeCache, err := cache.New(storeCfg.WriteDedupeCacheConfig)
if err != nil {
return nil, err
}

chunkCacheCfg := storeCfg.ChunkCacheConfig
chunkCacheCfg.Prefix = "chunks"
chunksCache, err := cache.New(chunkCacheCfg)
if err != nil {
return nil, err
}

// Cache is shared by multiple stores, which means they will try and Stop
// it more than once. Wrap in a StopOnce to prevent this.
tieredCache = cache.StopOnce(tieredCache)
indexReadCache = cache.StopOnce(indexReadCache)
chunksCache = cache.StopOnce(chunksCache)
writeDedupeCache = cache.StopOnce(writeDedupeCache)
Comment on lines 115 to +119
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a strange design... if store isn't initializing the cache, it should not stop it. Cache should only be stopped after the last store stops using it, not by first store. Something to look at in the future I guess.


err = schemaCfg.Load()
if err != nil {
Expand All @@ -115,7 +129,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
if err != nil {
return nil, errors.Wrap(err, "error creating index client")
}
index = newCachingIndexClient(index, tieredCache, cfg.IndexCacheValidity, limits)
index = newCachingIndexClient(index, indexReadCache, cfg.IndexCacheValidity, limits)

objectStoreType := s.ObjectType
if objectStoreType == "" {
Expand All @@ -126,7 +140,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
return nil, errors.Wrap(err, "error creating object client")
}

err = stores.AddPeriod(storeCfg, s, index, chunks, limits)
err = stores.AddPeriod(storeCfg, s, index, chunks, limits, chunksCache, writeDedupeCache)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/chunk/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/cache"
promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -135,7 +136,7 @@ func SetupTestChunkStore() (chunk.Store, error) {
flagext.DefaultValues(&storeCfg)

store := chunk.NewCompositeStore()
err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides)
err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides, cache.NewNoopCache(), cache.NewNoopCache())
if err != nil {
return nil, err
}
Expand Down