/
fetcher.go
379 lines (328 loc) · 11.4 KB
/
fetcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
package fetcher
import (
"context"
"errors"
"sync"
"time"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util/constants"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
)
var (
errAsyncBufferFull = errors.New("the async buffer is full")
skipped = promauto.NewCounter(prometheus.CounterOpts{
Name: "loki_chunk_fetcher_cache_skipped_buffer_full_total",
Help: "Total number of operations against cache that have been skipped.",
})
chunkFetcherCacheQueueEnqueue = promauto.NewCounter(prometheus.CounterOpts{
Name: "loki_chunk_fetcher_cache_enqueued_total",
Help: "Total number of chunks enqueued to a buffer to be asynchronously written back to the chunk cache.",
})
chunkFetcherCacheQueueDequeue = promauto.NewCounter(prometheus.CounterOpts{
Name: "loki_chunk_fetcher_cache_dequeued_total",
Help: "Total number of chunks asynchronously dequeued from a buffer and written back to the chunk cache.",
})
cacheCorrupt = promauto.NewCounter(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "cache_corrupt_chunks_total",
Help: "Total count of corrupt chunks found in cache.",
})
chunkFetchedSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "chunk_fetcher",
Name: "fetched_size_bytes",
Help: "Compressed chunk size distribution fetched from storage.",
// TODO: expand these buckets if we ever make larger chunks
// TODO: consider adding `chunk_target_size` to this list in case users set very large chunk sizes
Buckets: []float64{128, 1024, 16 * 1024, 64 * 1024, 128 * 1024, 256 * 1024, 512 * 1024, 1024 * 1024, 1.5 * 1024 * 1024, 2 * 1024 * 1024, 4 * 1024 * 1024},
}, []string{"source"})
)
const chunkDecodeParallelism = 16
// Fetcher deals with fetching chunk contents from the cache/store,
// and writing back any misses to the cache. Also responsible for decoding
// chunks from the cache, in parallel.
type Fetcher struct {
schema config.SchemaConfig
storage client.Client
cache cache.Cache
cachel2 cache.Cache
cacheStubs bool
l2CacheHandoff time.Duration
wait sync.WaitGroup
decodeRequests chan decodeRequest
maxAsyncConcurrency int
maxAsyncBufferSize int
asyncQueue chan []chunk.Chunk
stopOnce sync.Once
stop chan struct{}
}
type decodeRequest struct {
chunk chunk.Chunk
buf []byte
responses chan decodeResponse
}
type decodeResponse struct {
chunk chunk.Chunk
err error
}
// New makes a new ChunkFetcher.
func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, maxAsyncConcurrency int, maxAsyncBufferSize int, l2CacheHandoff time.Duration) (*Fetcher, error) {
c := &Fetcher{
schema: schema,
storage: storage,
cache: cache,
cachel2: cachel2,
l2CacheHandoff: l2CacheHandoff,
cacheStubs: cacheStubs,
decodeRequests: make(chan decodeRequest),
maxAsyncConcurrency: maxAsyncConcurrency,
maxAsyncBufferSize: maxAsyncBufferSize,
stop: make(chan struct{}),
}
c.wait.Add(chunkDecodeParallelism)
for i := 0; i < chunkDecodeParallelism; i++ {
go c.worker()
}
// Start a number of goroutines - processing async operations - equal
// to the max concurrency we have.
c.asyncQueue = make(chan []chunk.Chunk, c.maxAsyncBufferSize)
for i := 0; i < c.maxAsyncConcurrency; i++ {
go c.asyncWriteBackCacheQueueProcessLoop()
}
return c, nil
}
func (c *Fetcher) writeBackCacheAsync(fromStorage []chunk.Chunk) error {
select {
case c.asyncQueue <- fromStorage:
chunkFetcherCacheQueueEnqueue.Add(float64(len(fromStorage)))
return nil
default:
return errAsyncBufferFull
}
}
func (c *Fetcher) asyncWriteBackCacheQueueProcessLoop() {
for {
select {
case fromStorage := <-c.asyncQueue:
chunkFetcherCacheQueueDequeue.Add(float64(len(fromStorage)))
cacheErr := c.WriteBackCache(context.Background(), fromStorage)
if cacheErr != nil {
level.Warn(util_log.Logger).Log("msg", "could not write fetched chunks from storage into chunk cache", "err", cacheErr)
}
case <-c.stop:
return
}
}
}
// Stop the ChunkFetcher.
func (c *Fetcher) Stop() {
c.stopOnce.Do(func() {
close(c.decodeRequests)
c.wait.Wait()
c.cache.Stop()
close(c.stop)
})
}
func (c *Fetcher) worker() {
defer c.wait.Done()
decodeContext := chunk.NewDecodeContext()
for req := range c.decodeRequests {
err := req.chunk.Decode(decodeContext, req.buf)
if err != nil {
cacheCorrupt.Inc()
}
req.responses <- decodeResponse{
chunk: req.chunk,
err: err,
}
}
}
func (c *Fetcher) Cache() cache.Cache {
return c.cache
}
func (c *Fetcher) Client() client.Client {
return c.storage
}
// FetchChunks fetches a set of chunks from cache and store. Note, returned chunks are not in the same order they are passed in
func (c *Fetcher) FetchChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
sp, ctx := opentracing.StartSpanFromContext(ctx, "ChunkStore.FetchChunks")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Span.Finish()
// Extend the extendedHandoff to be 10% larger to allow for some overlap because this is a sliding window
// and the l1 cache may be oversized enough to allow for some extra chunks
extendedHandoff := c.l2CacheHandoff + (c.l2CacheHandoff / 10)
keys := make([]string, 0, len(chunks))
l2OnlyChunks := make([]chunk.Chunk, 0, len(chunks))
for _, m := range chunks {
// Similar to below, this is an optimization to not bother looking in the l1 cache if there isn't a reasonable
// expectation to find it there.
if c.l2CacheHandoff > 0 && m.From.Time().Before(time.Now().UTC().Add(-extendedHandoff)) {
l2OnlyChunks = append(l2OnlyChunks, m)
continue
}
chunkKey := c.schema.ExternalKey(m.ChunkRef)
keys = append(keys, chunkKey)
}
// Fetch from L1 chunk cache
cacheHits, cacheBufs, _, err := c.cache.Fetch(ctx, keys)
if err != nil {
level.Warn(log).Log("msg", "error fetching from cache", "err", err)
}
for _, buf := range cacheBufs {
chunkFetchedSize.WithLabelValues("cache").Observe(float64(len(buf)))
}
if c.l2CacheHandoff > 0 {
// Fetch missing from L2 chunks cache
missingL1Keys := make([]string, 0, len(l2OnlyChunks))
for _, m := range l2OnlyChunks {
// A small optimization to prevent looking up a chunk in l2 cache that can't possibly be there
if m.From.Time().After(time.Now().UTC().Add(-c.l2CacheHandoff)) {
continue
}
chunkKey := c.schema.ExternalKey(m.ChunkRef)
missingL1Keys = append(missingL1Keys, chunkKey)
}
cacheHitsL2, cacheBufsL2, _, err := c.cachel2.Fetch(ctx, missingL1Keys)
if err != nil {
level.Warn(log).Log("msg", "error fetching from cache", "err", err)
}
for _, buf := range cacheBufsL2 {
chunkFetchedSize.WithLabelValues("cache_l2").Observe(float64(len(buf)))
}
cacheHits = append(cacheHits, cacheHitsL2...)
cacheBufs = append(cacheBufs, cacheBufsL2...)
}
// processCacheResponse will decode all the fetched chunks and also provide us with a list of
// missing chunks that we need to fetch from the storage layer
fromCache, missing, err := c.processCacheResponse(ctx, chunks, cacheHits, cacheBufs)
if err != nil {
level.Warn(log).Log("msg", "error process response from cache", "err", err)
}
// Fetch missing from storage
var fromStorage []chunk.Chunk
if len(missing) > 0 {
fromStorage, err = c.storage.GetChunks(ctx, missing)
}
// normally these stats would be collected by the cache.statsCollector wrapper, but chunks are written back
// to the cache asynchronously in the background and we lose the context
var bytes int
for _, c := range fromStorage {
size := c.Data.Size()
bytes += size
chunkFetchedSize.WithLabelValues("store").Observe(float64(size))
}
st := stats.FromContext(ctx)
st.AddCacheEntriesStored(stats.ChunkCache, len(fromStorage))
st.AddCacheBytesSent(stats.ChunkCache, bytes)
// Always cache any chunks we did get
if cacheErr := c.writeBackCacheAsync(fromStorage); cacheErr != nil {
if cacheErr == errAsyncBufferFull {
skipped.Inc()
}
level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr)
}
if err != nil {
// Don't rely on Cortex error translation here.
return nil, promql.ErrStorage{Err: err}
}
allChunks := append(fromCache, fromStorage...)
return allChunks, nil
}
func (c *Fetcher) WriteBackCache(ctx context.Context, chunks []chunk.Chunk) error {
keys := make([]string, 0, len(chunks))
bufs := make([][]byte, 0, len(chunks))
keysL2 := make([]string, 0, len(chunks))
bufsL2 := make([][]byte, 0, len(chunks))
for i := range chunks {
var encoded []byte
var err error
if !c.cacheStubs {
encoded, err = chunks[i].Encoded()
// TODO don't fail, just log and continue?
if err != nil {
return err
}
}
// Determine which cache we should write to
if c.l2CacheHandoff == 0 || chunks[i].From.Time().After(time.Now().UTC().Add(-c.l2CacheHandoff)) {
// Write to L1 cache
keys = append(keys, c.schema.ExternalKey(chunks[i].ChunkRef))
bufs = append(bufs, encoded)
} else {
// Write to L2 cache
keysL2 = append(keysL2, c.schema.ExternalKey(chunks[i].ChunkRef))
bufsL2 = append(bufsL2, encoded)
}
}
err := c.cache.Store(ctx, keys, bufs)
if err != nil {
level.Warn(util_log.Logger).Log("msg", "writeBackCache cache store fail", "err", err)
}
if len(keysL2) > 0 {
err = c.cachel2.Store(ctx, keysL2, bufsL2)
if err != nil {
level.Warn(util_log.Logger).Log("msg", "writeBackCacheL2 cache store fail", "err", err)
}
}
return nil
}
// ProcessCacheResponse decodes the chunks coming back from the cache, separating
// hits and misses.
func (c *Fetcher) processCacheResponse(ctx context.Context, chunks []chunk.Chunk, keys []string, bufs [][]byte) ([]chunk.Chunk, []chunk.Chunk, error) {
var (
requests = make([]decodeRequest, 0, len(keys))
responses = make(chan decodeResponse)
missing []chunk.Chunk
logger = util_log.WithContext(ctx, util_log.Logger)
)
cm := make(map[string][]byte, len(chunks))
for i, k := range keys {
cm[k] = bufs[i]
}
for i, ck := range chunks {
if b, ok := cm[c.schema.ExternalKey(ck.ChunkRef)]; ok {
requests = append(requests, decodeRequest{
chunk: chunks[i],
buf: b,
responses: responses,
})
} else {
missing = append(missing, chunks[i])
}
}
level.Debug(logger).Log("chunks", len(chunks), "decodeRequests", len(requests), "missing", len(missing))
go func() {
for _, request := range requests {
c.decodeRequests <- request
}
}()
var err error
found := make([]chunk.Chunk, 0, len(requests))
for i := 0; i < len(requests); i++ {
response := <-responses
// Don't exit early, as we don't want to block the workers.
if response.err != nil {
err = response.err
} else {
found = append(found, response.chunk)
}
}
return found, missing, err
}
func (c *Fetcher) IsChunkNotFoundErr(err error) bool {
return c.storage.IsChunkNotFoundErr(err)
}