Skip to content

Commit fcf6640

Browse files
authored
Fix issues around multiple registrations (#2309)
Move all the caches up and don't create new ones for each schema. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
1 parent a1fc64d commit fcf6640

File tree

10 files changed

+48
-31
lines changed

10 files changed

+48
-31
lines changed

pkg/chunk/cache/cache_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,7 @@ func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []
113113
}
114114

115115
func testChunkFetcher(t *testing.T, c cache.Cache, keys []string, chunks []chunk.Chunk) {
116-
fetcher, err := chunk.NewChunkFetcher(cache.Config{
117-
Cache: c,
118-
}, false, nil)
116+
fetcher, err := chunk.NewChunkFetcher(c, false, nil)
119117
require.NoError(t, err)
120118
defer fetcher.Stop()
121119

pkg/chunk/cache/memcached_client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,16 @@ func NewMemcachedClient(cfg MemcachedClientConfig, name string, r prometheus.Reg
101101
serverList: selector,
102102
hostname: cfg.Host,
103103
service: cfg.Service,
104-
addresses: strings.Split(cfg.Addresses, ","),
105104
provider: dns.NewProvider(util.Logger, dnsProviderRegisterer, dns.GolangResolverType),
106105
quit: make(chan struct{}),
107106

108107
numServers: memcacheServersDiscovered.WithLabelValues(name),
109108
}
110109

110+
if len(cfg.Addresses) > 0 {
111+
newClient.addresses = strings.Split(cfg.Addresses, ",")
112+
}
113+
111114
err := newClient.updateMemcacheServers()
112115
if err != nil {
113116
level.Error(util.Logger).Log("msg", "error setting memcache servers to host", "host", cfg.Host, "err", err)

pkg/chunk/cache/mock.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,14 @@ func (m *mockCache) Fetch(ctx context.Context, keys []string) (found []string, b
3636
func (m *mockCache) Stop() {
3737
}
3838

39-
// NewMockCache makes a new MockCache
39+
// NewMockCache makes a new MockCache.
4040
func NewMockCache() Cache {
4141
return &mockCache{
4242
cache: map[string][]byte{},
4343
}
4444
}
45+
46+
// NewNoopCache returns a no-op cache.
47+
func NewNoopCache() Cache {
48+
return NewTiered(nil)
49+
}

pkg/chunk/chunk_store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ type store struct {
8484
*Fetcher
8585
}
8686

87-
func newStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Client, limits StoreLimits) (Store, error) {
88-
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, cfg.chunkCacheStubs, chunks)
87+
func newStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Client, limits StoreLimits, chunksCache cache.Cache) (Store, error) {
88+
fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, chunks)
8989
if err != nil {
9090
return nil, err
9191
}

pkg/chunk/chunk_store_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,13 @@ func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg Sto
8282
overrides, err := validation.NewOverrides(limits, nil)
8383
require.NoError(t, err)
8484

85+
chunksCache, err := cache.New(storeCfg.ChunkCacheConfig)
86+
require.NoError(t, err)
87+
writeDedupeCache, err := cache.New(storeCfg.WriteDedupeCacheConfig)
88+
require.NoError(t, err)
89+
8590
store := NewCompositeStore()
86-
err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides)
91+
err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides, chunksCache, writeDedupeCache)
8792
require.NoError(t, err)
8893
return store
8994
}

pkg/chunk/chunk_store_utils.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,16 +99,10 @@ type decodeResponse struct {
9999
}
100100

101101
// NewChunkFetcher makes a new ChunkFetcher.
102-
func NewChunkFetcher(cfg cache.Config, cacheStubs bool, storage Client) (*Fetcher, error) {
103-
cfg.Prefix = "chunks"
104-
cache, err := cache.New(cfg)
105-
if err != nil {
106-
return nil, err
107-
}
108-
102+
func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, storage Client) (*Fetcher, error) {
109103
c := &Fetcher{
110104
storage: storage,
111-
cache: cache,
105+
cache: cacher,
112106
cacheStubs: cacheStubs,
113107
decodeRequests: make(chan decodeRequest),
114108
}

pkg/chunk/composite_store.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77

88
"github.com/prometheus/common/model"
99
"github.com/prometheus/prometheus/pkg/labels"
10+
11+
"github.com/cortexproject/cortex/pkg/chunk/cache"
1012
)
1113

1214
// StoreLimits helps get Limits specific to Queries for Stores
@@ -56,15 +58,15 @@ func NewCompositeStore() CompositeStore {
5658
}
5759

5860
// AddPeriod adds the configuration for a period of time to the CompositeStore
59-
func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks Client, limits StoreLimits) error {
61+
func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) error {
6062
schema := cfg.CreateSchema()
6163
var store Store
6264
var err error
6365
switch cfg.Schema {
6466
case "v9", "v10", "v11":
65-
store, err = newSeriesStore(storeCfg, schema, index, chunks, limits)
67+
store, err = newSeriesStore(storeCfg, schema, index, chunks, limits, chunksCache, writeDedupeCache)
6668
default:
67-
store, err = newStore(storeCfg, schema, index, chunks, limits)
69+
store, err = newStore(storeCfg, schema, index, chunks, limits, chunksCache)
6870
}
6971
if err != nil {
7072
return err

pkg/chunk/series_store.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,8 @@ type seriesStore struct {
6868
writeDedupeCache cache.Cache
6969
}
7070

71-
func newSeriesStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Client, limits StoreLimits) (Store, error) {
72-
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, cfg.chunkCacheStubs, chunks)
73-
if err != nil {
74-
return nil, err
75-
}
76-
77-
writeDedupeCache, err := cache.New(cfg.WriteDedupeCacheConfig)
71+
func newSeriesStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) (Store, error) {
72+
fetcher, err := NewChunkFetcher(chunksCache, cfg.chunkCacheStubs, chunks)
7873
if err != nil {
7974
return nil, err
8075
}

pkg/chunk/storage/factory.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,28 @@ func (cfg *Config) Validate() error {
9595

9696
// NewStore makes the storage clients based on the configuration.
9797
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits) (chunk.Store, error) {
98-
tieredCache, err := cache.New(cfg.IndexQueriesCacheConfig)
98+
indexReadCache, err := cache.New(cfg.IndexQueriesCacheConfig)
99+
if err != nil {
100+
return nil, err
101+
}
102+
103+
writeDedupeCache, err := cache.New(storeCfg.WriteDedupeCacheConfig)
104+
if err != nil {
105+
return nil, err
106+
}
107+
108+
chunkCacheCfg := storeCfg.ChunkCacheConfig
109+
chunkCacheCfg.Prefix = "chunks"
110+
chunksCache, err := cache.New(chunkCacheCfg)
99111
if err != nil {
100112
return nil, err
101113
}
102114

103115
// Cache is shared by multiple stores, which means they will try and Stop
104116
// it more than once. Wrap in a StopOnce to prevent this.
105-
tieredCache = cache.StopOnce(tieredCache)
117+
indexReadCache = cache.StopOnce(indexReadCache)
118+
chunksCache = cache.StopOnce(chunksCache)
119+
writeDedupeCache = cache.StopOnce(writeDedupeCache)
106120

107121
err = schemaCfg.Load()
108122
if err != nil {
@@ -115,7 +129,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
115129
if err != nil {
116130
return nil, errors.Wrap(err, "error creating index client")
117131
}
118-
index = newCachingIndexClient(index, tieredCache, cfg.IndexCacheValidity, limits)
132+
index = newCachingIndexClient(index, indexReadCache, cfg.IndexCacheValidity, limits)
119133

120134
objectStoreType := s.ObjectType
121135
if objectStoreType == "" {
@@ -126,7 +140,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
126140
return nil, errors.Wrap(err, "error creating object client")
127141
}
128142

129-
err = stores.AddPeriod(storeCfg, s, index, chunks, limits)
143+
err = stores.AddPeriod(storeCfg, s, index, chunks, limits, chunksCache, writeDedupeCache)
130144
if err != nil {
131145
return nil, err
132146
}

pkg/chunk/testutils/testutils.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/prometheus/prometheus/pkg/labels"
1313

1414
"github.com/cortexproject/cortex/pkg/chunk"
15+
"github.com/cortexproject/cortex/pkg/chunk/cache"
1516
promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding"
1617
"github.com/cortexproject/cortex/pkg/ingester/client"
1718
"github.com/cortexproject/cortex/pkg/util/flagext"
@@ -135,7 +136,7 @@ func SetupTestChunkStore() (chunk.Store, error) {
135136
flagext.DefaultValues(&storeCfg)
136137

137138
store := chunk.NewCompositeStore()
138-
err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides)
139+
err = store.AddPeriod(storeCfg, schemaCfg.Configs[0], storage, storage, overrides, cache.NewNoopCache(), cache.NewNoopCache())
139140
if err != nil {
140141
return nil, err
141142
}

0 commit comments

Comments
 (0)