/
schema_caching.go
80 lines (67 loc) · 2.75 KB
/
schema_caching.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package index
import (
"time"
"github.com/grafana/dskit/mtime"
"github.com/prometheus/common/model"
)
type schemaCaching struct {
SeriesStoreSchema
cacheOlderThan time.Duration
}
func NewSchemaCaching(schema SeriesStoreSchema, cacheOlderThan time.Duration) SeriesStoreSchema {
return &schemaCaching{
SeriesStoreSchema: schema,
cacheOlderThan: cacheOlderThan,
}
}
func (s *schemaCaching) GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]Query, error) {
queries, err := s.SeriesStoreSchema.GetReadQueriesForMetric(from, through, userID, metricName)
if err != nil {
return nil, err
}
return s.setImmutability(from, through, queries), nil
}
func (s *schemaCaching) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]Query, error) {
queries, err := s.SeriesStoreSchema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName)
if err != nil {
return nil, err
}
return s.setImmutability(from, through, queries), nil
}
func (s *schemaCaching) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]Query, error) {
queries, err := s.SeriesStoreSchema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue)
if err != nil {
return nil, err
}
return s.setImmutability(from, through, queries), nil
}
// If the query resulted in series IDs, use this method to find chunks.
func (s *schemaCaching) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]Query, error) {
queries, err := s.SeriesStoreSchema.GetChunksForSeries(from, through, userID, seriesID)
if err != nil {
return nil, err
}
return s.setImmutability(from, through, queries), nil
}
func (s *schemaCaching) GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]Query, error) {
queries, err := s.SeriesStoreSchema.GetLabelNamesForSeries(from, through, userID, seriesID)
if err != nil {
return nil, err
}
return s.setImmutability(from, through, queries), nil
}
func (s *schemaCaching) setImmutability(_, through model.Time, queries []Query) []Query {
cacheBefore := model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())
// If the entire query is cacheable then cache it.
// While not super effective stand-alone, when combined with query-frontend and splitting,
// old queries will mostly be all behind boundary.
// To cleanly split cacheable and non-cacheable ranges, we'd need bucket start and end times
// which we don't know.
// See: https://github.com/cortexproject/cortex/issues/1698
if through.Before(cacheBefore) {
for i := range queries {
queries[i].Immutable = true
}
}
return queries
}