-
Notifications
You must be signed in to change notification settings - Fork 475
/
cache.go
142 lines (114 loc) · 3.81 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package diskcache
import (
"context"
"fmt"
"os"
"strconv"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type bloomMissFunc func(ctx context.Context, blockID uuid.UUID, tenantID string, bloomShard int) ([]byte, error)
type indexMissFunc func(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error)
const (
typeBloom = "bloom"
typeIndex = "index"
)
var (
metricDiskCacheMiss = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempodb",
Name: "disk_cache_miss_total",
Help: "Total number of times the disk cache missed.",
}, []string{"type"})
metricDiskCache = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempodb",
Name: "disk_cache_total",
Help: "Total number of times the disk cache was queried.",
}, []string{"type", "status"})
metricDiskCacheClean = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempodb",
Name: "disk_cache_clean_total",
Help: "Total number of times a disk clean has occurred.",
}, []string{"status"})
)
type reader struct {
cfg *Config
next backend.Reader
logger log.Logger
stopCh chan struct{}
}
func New(next backend.Reader, cfg *Config, logger log.Logger) (backend.Reader, error) {
// cleanup disk cache dir
err := os.RemoveAll(cfg.Path)
if err != nil {
return nil, err
}
err = os.MkdirAll(cfg.Path, os.ModePerm)
if err != nil {
return nil, err
}
if cfg.DiskPruneCount == 0 {
return nil, fmt.Errorf("must specify disk prune count")
}
if cfg.DiskCleanRate == 0 {
return nil, fmt.Errorf("must specify a clean rate")
}
if cfg.MaxDiskMBs == 0 {
return nil, fmt.Errorf("must specify a maximum number of MBs to save")
}
r := &reader{
cfg: cfg,
next: next,
stopCh: make(chan struct{}),
logger: logger,
}
r.startJanitor()
return r, nil
}
func (r *reader) Tenants(ctx context.Context) ([]string, error) {
return r.next.Tenants(ctx)
}
func (r *reader) Blocks(ctx context.Context, tenantID string) ([]uuid.UUID, error) {
return r.next.Blocks(ctx, tenantID)
}
func (r *reader) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*encoding.BlockMeta, error) {
return r.next.BlockMeta(ctx, blockID, tenantID)
}
func (r *reader) Bloom(ctx context.Context, blockID uuid.UUID, tenantID string, bloomShard int) ([]byte, error) {
b, skippableErr, err := r.readOrCacheBloom(ctx, blockID, tenantID, bloomShard, r.next.Bloom)
if skippableErr != nil {
metricDiskCache.WithLabelValues(typeBloom, "error").Inc()
level.Error(r.logger).Log("err", skippableErr)
} else {
metricDiskCache.WithLabelValues(typeBloom, "success").Inc()
}
return b, err
}
func (r *reader) Index(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
b, skippableErr, err := r.readOrCacheIndex(ctx, blockID, tenantID, r.next.Index)
if skippableErr != nil {
metricDiskCache.WithLabelValues(typeIndex, "error").Inc()
level.Error(r.logger).Log("err", skippableErr)
} else {
metricDiskCache.WithLabelValues(typeIndex, "success").Inc()
}
return b, err
}
func (r *reader) Object(ctx context.Context, blockID uuid.UUID, tenantID string, start uint64, buffer []byte) error {
// not attempting to cache these...yet...
return r.next.Object(ctx, blockID, tenantID, start, buffer)
}
func (r *reader) Shutdown() {
r.stopCh <- struct{}{}
r.next.Shutdown()
}
func key(blockID uuid.UUID, tenantID string, t string) string {
return blockID.String() + ":" + tenantID + ":" + t
}
func bloomKey(blockID uuid.UUID, tenantID string, t string, shardNum int) string {
return blockID.String() + ":" + tenantID + ":" + t + ":" + strconv.Itoa(shardNum)
}