forked from cortexproject/cortex
/
caching_index_client.go
308 lines (264 loc) · 8.52 KB
/
caching_index_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
package storage
import (
"context"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/user"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/cache"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)
var (
cacheCorruptErrs = promauto.NewCounter(prometheus.CounterOpts{
Name: "querier_index_cache_corruptions_total",
Help: "The number of cache corruptions for the index cache.",
})
cacheHits = promauto.NewCounter(prometheus.CounterOpts{
Name: "querier_index_cache_hits_total",
Help: "The number of cache hits for the index cache.",
})
cacheGets = promauto.NewCounter(prometheus.CounterOpts{
Name: "querier_index_cache_gets_total",
Help: "The number of gets for the index cache.",
})
cachePuts = promauto.NewCounter(prometheus.CounterOpts{
Name: "querier_index_cache_puts_total",
Help: "The number of puts for the index cache.",
})
cacheEncodeErrs = promauto.NewCounter(prometheus.CounterOpts{
Name: "querier_index_cache_encode_errors_total",
Help: "The number of errors for the index cache while encoding the body.",
})
)
type cachingIndexClient struct {
chunk.IndexClient
cache cache.Cache
validity time.Duration
limits StoreLimits
logger log.Logger
}
func newCachingIndexClient(client chunk.IndexClient, c cache.Cache, validity time.Duration, limits StoreLimits, logger log.Logger) chunk.IndexClient {
if c == nil || cache.IsEmptyTieredCache(c) {
return client
}
return &cachingIndexClient{
IndexClient: client,
cache: cache.NewSnappy(c, logger),
validity: validity,
limits: limits,
logger: logger,
}
}
func (s *cachingIndexClient) Stop() {
s.cache.Stop()
s.IndexClient.Stop()
}
func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error {
// We cache the entire row, so filter client side.
callback = chunk_util.QueryFilter(callback)
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return err
}
cardinalityLimit := int32(s.limits.CardinalityLimit(userID))
// Build list of keys to lookup in the cache.
keys := make([]string, 0, len(queries))
queriesByKey := make(map[string][]chunk.IndexQuery, len(queries))
for _, query := range queries {
key := queryKey(query)
keys = append(keys, key)
queriesByKey[key] = append(queriesByKey[key], query)
}
batches, misses := s.cacheFetch(ctx, keys)
for _, batch := range batches {
if cardinalityLimit > 0 && batch.Cardinality > cardinalityLimit {
return chunk.CardinalityExceededError{
Size: batch.Cardinality,
Limit: cardinalityLimit,
}
}
queries := queriesByKey[batch.Key]
for _, query := range queries {
callback(query, batch)
}
}
if len(misses) == 0 {
return nil
}
// Build list of cachable queries for the queries that missed the cache.
var (
resultsMtx sync.Mutex
results = make(map[string]ReadBatch, len(misses))
cacheableMissed = make([]chunk.IndexQuery, 0, len(misses))
expiryTime = time.Now().Add(s.validity)
)
for _, key := range misses {
// Only need to consider one of the queries as they have the same table & hash.
queries := queriesByKey[key]
cacheableMissed = append(cacheableMissed, chunk.IndexQuery{
TableName: queries[0].TableName,
HashValue: queries[0].HashValue,
})
rb := ReadBatch{
Key: key,
Expiry: expiryTime.UnixNano(),
}
// If the query is cacheable forever, nil the expiry.
if queries[0].Immutable {
rb.Expiry = 0
}
results[key] = rb
}
err = s.IndexClient.QueryPages(ctx, cacheableMissed, func(cacheableQuery chunk.IndexQuery, r chunk.ReadBatch) bool {
resultsMtx.Lock()
defer resultsMtx.Unlock()
key := queryKey(cacheableQuery)
existing := results[key]
for iter := r.Iterator(); iter.Next(); {
existing.Entries = append(existing.Entries, Entry{Column: iter.RangeValue(), Value: iter.Value()})
}
results[key] = existing
return true
})
if err != nil {
return err
}
{
resultsMtx.Lock()
defer resultsMtx.Unlock()
keys := make([]string, 0, len(results))
batches := make([]ReadBatch, 0, len(results))
var cardinalityErr error
for key, batch := range results {
cardinality := int32(len(batch.Entries))
if cardinalityLimit > 0 && cardinality > cardinalityLimit {
batch.Cardinality = cardinality
batch.Entries = nil
cardinalityErr = chunk.CardinalityExceededError{
Size: cardinality,
Limit: cardinalityLimit,
}
}
keys = append(keys, key)
batches = append(batches, batch)
if cardinalityErr != nil {
continue
}
queries := queriesByKey[key]
for _, query := range queries {
callback(query, batch)
}
}
s.cacheStore(ctx, keys, batches)
return cardinalityErr
}
}
// Iterator implements chunk.ReadBatch.
func (b ReadBatch) Iterator() chunk.ReadBatchIterator {
return &readBatchIterator{
index: -1,
readBatch: b,
}
}
type readBatchIterator struct {
index int
readBatch ReadBatch
}
// Len implements chunk.ReadBatchIterator.
func (b *readBatchIterator) Next() bool {
b.index++
return b.index < len(b.readBatch.Entries)
}
// RangeValue implements chunk.ReadBatchIterator.
func (b *readBatchIterator) RangeValue() []byte {
return b.readBatch.Entries[b.index].Column
}
// Value implements chunk.ReadBatchIterator.
func (b *readBatchIterator) Value() []byte {
return b.readBatch.Entries[b.index].Value
}
func queryKey(q chunk.IndexQuery) string {
const sep = "\xff"
return q.TableName + sep + q.HashValue
}
func (s *cachingIndexClient) cacheStore(ctx context.Context, keys []string, batches []ReadBatch) {
cachePuts.Add(float64(len(keys)))
// We're doing the hashing to handle unicode and key len properly.
// Memcache fails for unicode keys and keys longer than 250 Bytes.
hashed := make([]string, 0, len(keys))
bufs := make([][]byte, 0, len(batches))
for i := range keys {
hashed = append(hashed, cache.HashKey(keys[i]))
out, err := proto.Marshal(&batches[i])
if err != nil {
level.Warn(s.logger).Log("msg", "error marshalling ReadBatch", "err", err)
cacheEncodeErrs.Inc()
return
}
bufs = append(bufs, out)
}
s.cache.Store(ctx, hashed, bufs)
}
func (s *cachingIndexClient) cacheFetch(ctx context.Context, keys []string) (batches []ReadBatch, missed []string) {
log, ctx := spanlogger.New(ctx, "cachingIndexClient.cacheFetch")
defer log.Finish()
cacheGets.Add(float64(len(keys)))
// Build a map from hash -> key; NB there can be collisions here; we'll fetch
// the last hash.
hashedKeys := make(map[string]string, len(keys))
for _, key := range keys {
hashedKeys[cache.HashKey(key)] = key
}
// Build a list of hashes; could be less than keys due to collisions.
hashes := make([]string, 0, len(keys))
for hash := range hashedKeys {
hashes = append(hashes, hash)
}
// Look up the hashes in a single batch. If we get an error, we just "miss" all
// of the keys. Eventually I want to push all the errors to the leafs of the cache
// tree, to the caches only return found & missed.
foundHashes, bufs, _ := s.cache.Fetch(ctx, hashes)
// Reverse the hash, unmarshal the index entries, check we got what we expected
// and that its still valid.
batches = make([]ReadBatch, 0, len(foundHashes))
for j, foundHash := range foundHashes {
key := hashedKeys[foundHash]
var readBatch ReadBatch
if err := proto.Unmarshal(bufs[j], &readBatch); err != nil {
level.Warn(log).Log("msg", "error unmarshalling index entry from cache", "err", err)
cacheCorruptErrs.Inc()
continue
}
// Make sure the hash(key) is not a collision in the cache by looking at the
// key in the value.
if key != readBatch.Key {
level.Debug(log).Log("msg", "dropping index cache entry due to key collision", "key", key, "readBatch.Key", readBatch.Key, "expiry")
continue
}
if readBatch.Expiry != 0 && time.Now().After(time.Unix(0, readBatch.Expiry)) {
continue
}
cacheHits.Inc()
batches = append(batches, readBatch)
}
// Finally work out what we're missing.
misses := make(map[string]struct{}, len(keys))
for _, key := range keys {
misses[key] = struct{}{}
}
for i := range batches {
delete(misses, batches[i].Key)
}
missed = make([]string, 0, len(misses))
for miss := range misses {
missed = append(missed, miss)
}
level.Debug(log).Log("hits", len(batches), "misses", len(misses))
return batches, missed
}